// brpc - A framework to host and access services throughout Baidu.
// Copyright (c) 2014 Baidu, Inc.

// Date: Sun Jul 13 15:04:18 CST 2014

#include <sys/types.h>
#include <sys/socket.h>
#include <gtest/gtest.h>
#include <gflags/gflags.h>
#include <google/protobuf/descriptor.h>
#include "butil/time.h"
#include "butil/macros.h"
#include "butil/logging.h"
#include "butil/files/temp_file.h"
#include "brpc/socket.h"
#include "brpc/acceptor.h"
#include "brpc/server.h"
#include "brpc/policy/baidu_rpc_protocol.h"
#include "brpc/policy/baidu_rpc_meta.pb.h"
#include "brpc/policy/most_common_message.h"
#include "brpc/channel.h"
#include "brpc/details/load_balancer_with_naming.h"
#include "brpc/parallel_channel.h"
#include "brpc/selective_channel.h"
#include "brpc/socket_map.h"
#include "brpc/controller.h"
#include "echo.pb.h"
#include "brpc/options.pb.h"

namespace brpc {
DECLARE_int32(idle_timeout_second);
DECLARE_int32(max_connection_pool_size);
class Server;
class MethodStatus;
namespace policy {
void SendRpcResponse(int64_t correlation_id, Controller* cntl, 
                     const google::protobuf::Message* req,
                     const google::protobuf::Message* res,
                     const Server* server_raw, MethodStatus *, int64_t);
} // policy
} // brpc

int main(int argc, char* argv[]) {
    brpc::FLAGS_idle_timeout_second = 0;
    brpc::FLAGS_max_connection_pool_size = 0;
    testing::InitGoogleTest(&argc, argv);
    GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
    return RUN_ALL_TESTS();
}

namespace {
void* RunClosure(void* arg) {
    google::protobuf::Closure* done = (google::protobuf::Closure*)arg;
    done->Run();
    return NULL;
}

class DeleteOnlyOnceChannel : public brpc::Channel {
public:
    DeleteOnlyOnceChannel() : _c(1) {
    }
    ~DeleteOnlyOnceChannel() {
        if (_c.fetch_sub(1) != 1) {
            LOG(ERROR) << "Delete more than once!";
            abort();
        }
    }
private:
    butil::atomic<int> _c;
};

static std::string MOCK_CREDENTIAL = "mock credential";
static std::string MOCK_CONTEXT = "mock context";

class MyAuthenticator : public brpc::Authenticator {
public:
    MyAuthenticator() : count(0) {}

    int GenerateCredential(std::string* auth_str) const {
        *auth_str = MOCK_CREDENTIAL;
        count.fetch_add(1, butil::memory_order_relaxed);
        return 0;
    }

    int VerifyCredential(const std::string&,
                         const butil::EndPoint&,
                         brpc::AuthContext* ctx) const {
        ctx->set_user(MOCK_CONTEXT);
        ctx->set_group(MOCK_CONTEXT);
        ctx->set_roles(MOCK_CONTEXT);
        ctx->set_starter(MOCK_CONTEXT);
        ctx->set_is_service(true);
        return 0;
    }
    mutable butil::atomic<int32_t> count;
};

static bool VerifyMyRequest(const brpc::InputMessageBase* msg_base) {
    const brpc::policy::MostCommonMessage* msg = 
        static_cast<const brpc::policy::MostCommonMessage*>(msg_base);
    brpc::Socket* ptr = msg->socket();
    
    brpc::policy::RpcMeta meta;
    butil::IOBufAsZeroCopyInputStream wrapper(msg->meta);
    EXPECT_TRUE(meta.ParseFromZeroCopyStream(&wrapper));

    if (meta.has_authentication_data()) {
        // Credential MUST only appear in the first packet
        EXPECT_TRUE(NULL == ptr->auth_context());
        EXPECT_EQ(meta.authentication_data(), MOCK_CREDENTIAL);
        MyAuthenticator authenticator;
        return authenticator.VerifyCredential(
            "", butil::EndPoint(), ptr->mutable_auth_context()) == 0;
    }
    return true;
}

class MyEchoService : public ::test::EchoService {
    void Echo(google::protobuf::RpcController* cntl_base,
              const ::test::EchoRequest* req,
              ::test::EchoResponse* res,
              google::protobuf::Closure* done) {
        brpc::Controller* cntl =
            static_cast<brpc::Controller*>(cntl_base);
        brpc::ClosureGuard done_guard(done);
        if (req->server_fail()) {
            cntl->SetFailed(req->server_fail(), "Server fail1");
            cntl->SetFailed(req->server_fail(), "Server fail2");
            return;
        }
        if (req->close_fd()) {
            LOG(INFO) << "close fd...";
            cntl->CloseConnection("Close connection according to request");
            return;
        }
        if (req->sleep_us() > 0) {
            LOG(INFO) << "sleep " << req->sleep_us() << "us...";
            bthread_usleep(req->sleep_us());
        }
        res->set_message("received " + req->message());
        if (req->code() != 0) {
            res->add_code_list(req->code());
        }
        res->set_receiving_socket_id(cntl->_current_call.sending_sock->id());
    }
};

pthread_once_t register_mock_protocol = PTHREAD_ONCE_INIT;

class ChannelTest : public ::testing::Test{
protected:
    ChannelTest() 
        : _ep(butil::IP_ANY, 8787)
        , _close_fd_once(false) {
        pthread_once(&register_mock_protocol, register_protocol);
        const brpc::InputMessageHandler pairs[] = {
            { brpc::policy::ParseRpcMessage, 
              ProcessRpcRequest, VerifyMyRequest, this, "baidu_std" }
        };
        EXPECT_EQ(0, _messenger.AddHandler(pairs[0]));

        EXPECT_EQ(0, _server_list.save(butil::endpoint2str(_ep).c_str()));           
        _naming_url = std::string("File://") + _server_list.fname();
    };

    virtual ~ChannelTest(){};
    virtual void SetUp() {
    };
    virtual void TearDown() {
        StopAndJoin();
    };

    static void register_protocol() {
        brpc::Protocol dummy_protocol = 
                                 { brpc::policy::ParseRpcMessage,
                                   brpc::SerializeRequestDefault, 
                                   brpc::policy::PackRpcRequest,
                                   NULL, ProcessRpcRequest,
                                   VerifyMyRequest, NULL, NULL,
                                   brpc::CONNECTION_TYPE_ALL, "baidu_std" };
        ASSERT_EQ(0,  RegisterProtocol((brpc::ProtocolType)30, dummy_protocol));
    }

    static void ProcessRpcRequest(brpc::InputMessageBase* msg_base) {
        brpc::DestroyingPtr<brpc::policy::MostCommonMessage> msg(
            static_cast<brpc::policy::MostCommonMessage*>(msg_base));
        brpc::SocketUniquePtr ptr(msg->ReleaseSocket());
        const brpc::AuthContext* auth = ptr->auth_context();
        if (auth) {
            EXPECT_EQ(MOCK_CONTEXT, auth->user());
            EXPECT_EQ(MOCK_CONTEXT, auth->group());
            EXPECT_EQ(MOCK_CONTEXT, auth->roles());
            EXPECT_EQ(MOCK_CONTEXT, auth->starter());
            EXPECT_TRUE(auth->is_service());
        }
        ChannelTest* ts = (ChannelTest*)msg_base->arg();
        if (ts->_close_fd_once) {
            ts->_close_fd_once = false;
            ptr->SetFailed();
            return;
        }
        
        brpc::policy::RpcMeta meta;
        butil::IOBufAsZeroCopyInputStream wrapper(msg->meta);
        EXPECT_TRUE(meta.ParseFromZeroCopyStream(&wrapper));
        const brpc::policy::RpcRequestMeta& req_meta = meta.request();
        ASSERT_EQ(ts->_svc.descriptor()->full_name(), req_meta.service_name());
        const google::protobuf::MethodDescriptor* method =
            ts->_svc.descriptor()->FindMethodByName(req_meta.method_name());
        google::protobuf::Message* req =
              ts->_svc.GetRequestPrototype(method).New();
        if (meta.attachment_size() != 0) {
            butil::IOBuf req_buf;
            msg->payload.cutn(&req_buf, msg->payload.size() - meta.attachment_size());
            butil::IOBufAsZeroCopyInputStream wrapper2(req_buf);
            EXPECT_TRUE(req->ParseFromZeroCopyStream(&wrapper2));
        } else {
            butil::IOBufAsZeroCopyInputStream wrapper2(msg->payload);
            EXPECT_TRUE(req->ParseFromZeroCopyStream(&wrapper2));
        }
        brpc::Controller* cntl = new brpc::Controller();
        cntl->_current_call.peer_id = ptr->id();
        cntl->_current_call.sending_sock.reset(ptr.release());
        cntl->_server = &ts->_dummy;

        google::protobuf::Message* res =
              ts->_svc.GetResponsePrototype(method).New();
        google::protobuf::Closure* done =
              brpc::NewCallback<
            int64_t, brpc::Controller*,
            const google::protobuf::Message*,
            const google::protobuf::Message*,
            const brpc::Server*,
            brpc::MethodStatus*, int64_t>(
                &brpc::policy::SendRpcResponse,
                meta.correlation_id(), cntl, NULL, res,
                &ts->_dummy, NULL, -1);
        ts->_svc.CallMethod(method, cntl, req, res, done);
    }

    int StartAccept(butil::EndPoint ep) {
        int listening_fd = -1;
        while ((listening_fd = tcp_listen(ep, true)) < 0) {
            if (errno == EADDRINUSE) {
                bthread_usleep(1000);
            } else {
                return -1;
            }
        }
        if (_messenger.StartAccept(listening_fd, -1, NULL) != 0) {
            return -1;
        }
        return 0;
    }

    void StopAndJoin() {
        _messenger.StopAccept(0);
        _messenger.Join();
    }

    void SetUpChannel(brpc::Channel* channel, 
                      bool single_server,
                      bool short_connection,
                      const brpc::Authenticator* auth = NULL,
                      std::string connection_group = std::string()) {
        brpc::ChannelOptions opt;
        if (short_connection) {
            opt.connection_type = brpc::CONNECTION_TYPE_SHORT;
        }
        opt.auth = auth;
        opt.max_retry = 0;
        opt.connection_group = connection_group;
        if (single_server) {
            EXPECT_EQ(0, channel->Init(_ep, &opt)); 
        } else {                                                 
            EXPECT_EQ(0, channel->Init(_naming_url.c_str(), "rR", &opt));
        }                                         
    }
    
    void CallMethod(brpc::ChannelBase* channel, 
                    brpc::Controller* cntl,
                    test::EchoRequest* req, test::EchoResponse* res,
                    bool async, bool destroy = false) {
        google::protobuf::Closure* done = NULL;                     
        brpc::CallId sync_id = { 0 };
        if (async) {
            sync_id = cntl->call_id();
            done = brpc::DoNothing();
        }
        ::test::EchoService::Stub(channel).Echo(cntl, req, res, done);
        if (async) {
            if (destroy) {
                delete channel;
            }
            // Callback MUST be called for once and only once
            bthread_id_join(sync_id);
        }
    }

    void CallMethod(brpc::ChannelBase* channel, 
                    brpc::Controller* cntl,
                    test::ComboRequest* req, test::ComboResponse* res,
                    bool async, bool destroy = false) {
        google::protobuf::Closure* done = NULL;
        brpc::CallId sync_id = { 0 };
        if (async) {
            sync_id = cntl->call_id();
            done = brpc::DoNothing();
        }
        ::test::EchoService::Stub(channel).ComboEcho(cntl, req, res, done);
        if (async) {
            if (destroy) {
                delete channel;
            }
            // Callback MUST be called for once and only once
            bthread_id_join(sync_id);
        }
    }

    void TestConnectionFailed(bool single_server, bool async, 
                              bool short_connection) {
        std::cout << " *** single=" << single_server
                  << " async=" << async
                  << " short=" << short_connection << std::endl;

        brpc::Channel channel;
        SetUpChannel(&channel, single_server, short_connection);

        brpc::Controller cntl;
        test::EchoRequest req;
        test::EchoResponse res;
        req.set_message(__FUNCTION__);
        CallMethod(&channel, &cntl, &req, &res, async);
        
        EXPECT_EQ(ECONNREFUSED, cntl.ErrorCode()) << cntl.ErrorText();
    }
    
    void TestConnectionFailedParallel(bool single_server, bool async, 
                                      bool short_connection) {
        std::cout << " *** single=" << single_server
                  << " async=" << async
                  << " short=" << short_connection << std::endl;

        const size_t NCHANS = 8;
        brpc::Channel subchans[NCHANS];
        brpc::ParallelChannel channel;
        for (size_t i = 0; i < NCHANS; ++i) {
            SetUpChannel(&subchans[i], single_server, short_connection);
            ASSERT_EQ(0, channel.AddChannel(
                          &subchans[i], brpc::DOESNT_OWN_CHANNEL,
                          NULL, NULL));
        }

        brpc::Controller cntl;
        test::EchoRequest req;
        test::EchoResponse res;
        req.set_message(__FUNCTION__);
        CallMethod(&channel, &cntl, &req, &res, async);
        
        EXPECT_TRUE(brpc::ETOOMANYFAILS == cntl.ErrorCode() ||
                    ECONNREFUSED == cntl.ErrorCode()) << cntl.ErrorText();
        LOG(INFO) << cntl.ErrorText();
    }

    void TestConnectionFailedSelective(bool single_server, bool async, 
                                       bool short_connection) {
        std::cout << " *** single=" << single_server
                  << " async=" << async
                  << " short=" << short_connection << std::endl;

        const size_t NCHANS = 8;
        brpc::SelectiveChannel channel;
        brpc::ChannelOptions options;
        options.max_retry = 0;
        ASSERT_EQ(0, channel.Init("rr", &options));
        for (size_t i = 0; i < NCHANS; ++i) {
            brpc::Channel* subchan = new brpc::Channel;
            SetUpChannel(subchan, single_server, short_connection);
            ASSERT_EQ(0, channel.AddChannel(subchan, NULL)) << "i=" << i;
        }

        brpc::Controller cntl;
        test::EchoRequest req;
        test::EchoResponse res;
        req.set_message(__FUNCTION__);
        CallMethod(&channel, &cntl, &req, &res, async);
        
        EXPECT_EQ(ECONNREFUSED, cntl.ErrorCode()) << cntl.ErrorText();
        ASSERT_EQ(1, cntl.sub_count());
        EXPECT_EQ(ECONNREFUSED, cntl.sub(0)->ErrorCode())
            << cntl.sub(0)->ErrorText();
        LOG(INFO) << cntl.ErrorText();
    }
    
    void TestSuccess(bool single_server, bool async, bool short_connection) {
        std::cout << " *** single=" << single_server
                  << " async=" << async
                  << " short=" << short_connection << std::endl;

        ASSERT_EQ(0, StartAccept(_ep));
        brpc::Channel channel;
        SetUpChannel(&channel, single_server, short_connection);
                
        brpc::Controller cntl;
        test::EchoRequest req;
        test::EchoResponse res;
        req.set_message(__FUNCTION__);
        CallMethod(&channel, &cntl, &req, &res, async);

        EXPECT_EQ(0, cntl.ErrorCode()) 
            << single_server << ", " << async << ", " << short_connection;
        const uint64_t receiving_socket_id = res.receiving_socket_id();
        EXPECT_EQ(0, cntl.sub_count());
        EXPECT_TRUE(NULL == cntl.sub(-1));
        EXPECT_TRUE(NULL == cntl.sub(0));
        EXPECT_TRUE(NULL == cntl.sub(1));
        EXPECT_EQ("received " + std::string(__FUNCTION__), res.message());
        if (short_connection) {
            // Sleep to let `_messenger' detect `Socket' being `SetFailed'
            const int64_t start_time = butil::gettimeofday_us();
            while (_messenger.ConnectionCount() != 0) {
                EXPECT_LT(butil::gettimeofday_us(), start_time + 100000L/*100ms*/);
                bthread_usleep(1000);
            }
        } else {
            EXPECT_GE(1ul, _messenger.ConnectionCount());
        }
        if (single_server && !short_connection) {
            // Reuse the connection
            brpc::Channel channel2;
            SetUpChannel(&channel2, single_server, short_connection);
            cntl.Reset();
            req.Clear();
            res.Clear();
            req.set_message(__FUNCTION__);
            CallMethod(&channel2, &cntl, &req, &res, async);
            EXPECT_EQ(0, cntl.ErrorCode())
                << single_server << ", " << async << ", " << short_connection;
            EXPECT_EQ(receiving_socket_id, res.receiving_socket_id());

            // A different connection_group does not reuse the connection
            brpc::Channel channel3;
            SetUpChannel(&channel3, single_server, short_connection,
                         NULL, "another_group");
            cntl.Reset();
            req.Clear();
            res.Clear();
            req.set_message(__FUNCTION__);
            CallMethod(&channel3, &cntl, &req, &res, async);
            EXPECT_EQ(0, cntl.ErrorCode())
                << single_server << ", " << async << ", " << short_connection;
            const uint64_t receiving_socket_id2 = res.receiving_socket_id();
            EXPECT_NE(receiving_socket_id, receiving_socket_id2);

            // Channel in the same connection_group reuses the connection
            // note that the leading/trailing spaces should be trimed.
            brpc::Channel channel4;
            SetUpChannel(&channel4, single_server, short_connection,
                         NULL, " another_group ");
            cntl.Reset();
            req.Clear();
            res.Clear();
            req.set_message(__FUNCTION__);
            CallMethod(&channel4, &cntl, &req, &res, async);
            EXPECT_EQ(0, cntl.ErrorCode())
                << single_server << ", " << async << ", " << short_connection;
            EXPECT_EQ(receiving_socket_id2, res.receiving_socket_id());
        }
        StopAndJoin();
    }

    class SetCode : public brpc::CallMapper {
    public:
        brpc::SubCall Map(
            int channel_index,
            const google::protobuf::MethodDescriptor* method,
            const google::protobuf::Message* req_base,
            google::protobuf::Message* response) {
            test::EchoRequest* req = brpc::Clone<test::EchoRequest>(req_base);
            req->set_code(channel_index + 1/*non-zero*/);
            return brpc::SubCall(method, req, response->New(),
                                brpc::DELETE_REQUEST | brpc::DELETE_RESPONSE);
        }
    };

    class SetCodeOnEven : public SetCode {
    public:
        brpc::SubCall Map(
            int channel_index,
            const google::protobuf::MethodDescriptor* method,
            const google::protobuf::Message* req_base,
            google::protobuf::Message* response) {
            if (channel_index % 2) {
                return brpc::SubCall::Skip();
            }
            return SetCode::Map(channel_index, method, req_base, response);
        }
    };


    class GetReqAndAddRes : public brpc::CallMapper {
        brpc::SubCall Map(
            int channel_index,
            const google::protobuf::MethodDescriptor* method,
            const google::protobuf::Message* req_base,
            google::protobuf::Message* res_base) {
            const test::ComboRequest* req =
                dynamic_cast<const test::ComboRequest*>(req_base);
            test::ComboResponse* res = dynamic_cast<test::ComboResponse*>(res_base);
            if (method->name() != "ComboEcho" ||
                res == NULL || req == NULL ||
                req->requests_size() <= channel_index) {
                return brpc::SubCall::Bad();
            }
            return brpc::SubCall(::test::EchoService::descriptor()->method(0),
                                &req->requests(channel_index),
                                res->add_responses(), 0);
        }
    };

    class MergeNothing : public brpc::ResponseMerger {
        Result Merge(google::protobuf::Message* /*response*/,
                     const google::protobuf::Message* /*sub_response*/) {
            return brpc::ResponseMerger::MERGED;
        }
    };

    void TestSuccessParallel(bool single_server, bool async, bool short_connection) {
        std::cout << " *** single=" << single_server
                  << " async=" << async
                  << " short=" << short_connection << std::endl;

        ASSERT_EQ(0, StartAccept(_ep));
        const size_t NCHANS = 8;
        brpc::Channel subchans[NCHANS];
        brpc::ParallelChannel channel;
        for (size_t i = 0; i < NCHANS; ++i) {
            SetUpChannel(&subchans[i], single_server, short_connection);
            ASSERT_EQ(0, channel.AddChannel(
                          &subchans[i], brpc::DOESNT_OWN_CHANNEL,
                          new SetCode, NULL));
        }
        brpc::Controller cntl;
        test::EchoRequest req;
        test::EchoResponse res;
        req.set_message(__FUNCTION__);
        req.set_code(23);
        CallMethod(&channel, &cntl, &req, &res, async);

        EXPECT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText();
        EXPECT_EQ(NCHANS, (size_t)cntl.sub_count());
        for (int i = 0; i < cntl.sub_count(); ++i) {
            EXPECT_TRUE(cntl.sub(i) && !cntl.sub(i)->Failed()) << "i=" << i;
        }
        EXPECT_EQ("received " + std::string(__FUNCTION__), res.message());
        ASSERT_EQ(NCHANS, (size_t)res.code_list_size());
        for (size_t i = 0; i < NCHANS; ++i) {
            ASSERT_EQ((int)i+1, res.code_list(i));
        }
        if (short_connection) {
            // Sleep to let `_messenger' detect `Socket' being `SetFailed'
            const int64_t start_time = butil::gettimeofday_us();
            while (_messenger.ConnectionCount() != 0) {
                EXPECT_LT(butil::gettimeofday_us(), start_time + 100000L/*100ms*/);
                bthread_usleep(1000);
            }
        } else {
            EXPECT_GE(1ul, _messenger.ConnectionCount());
        }
        StopAndJoin();
    }

    void TestSuccessDuplicatedParallel(
        bool single_server, bool async, bool short_connection) {
        std::cout << " *** single=" << single_server
                  << " async=" << async
                  << " short=" << short_connection << std::endl;

        ASSERT_EQ(0, StartAccept(_ep));
        const size_t NCHANS = 8;
        brpc::Channel* subchan = new DeleteOnlyOnceChannel;
        SetUpChannel(subchan, single_server, short_connection);
        brpc::ParallelChannel channel;
        // Share the CallMapper and ResponseMerger should be fine because
        // they're intrusively shared.
        SetCode* set_code = new SetCode;
        for (size_t i = 0; i < NCHANS; ++i) {
            ASSERT_EQ(0, channel.AddChannel(
                          subchan,
                          // subchan should be deleted (for only once)
                          ((i % 2) ? brpc::DOESNT_OWN_CHANNEL : brpc::OWNS_CHANNEL),
                          set_code, NULL));
        }
        ASSERT_EQ((int)NCHANS, set_code->ref_count());
        brpc::Controller cntl;
        test::EchoRequest req;
        test::EchoResponse res;
        req.set_message(__FUNCTION__);
        req.set_code(23);
        CallMethod(&channel, &cntl, &req, &res, async);

        EXPECT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText();
        EXPECT_EQ(NCHANS, (size_t)cntl.sub_count());
        for (int i = 0; i < cntl.sub_count(); ++i) {
            EXPECT_TRUE(cntl.sub(i) && !cntl.sub(i)->Failed()) << "i=" << i;
        }
        EXPECT_EQ("received " + std::string(__FUNCTION__), res.message());
        ASSERT_EQ(NCHANS, (size_t)res.code_list_size());
        for (size_t i = 0; i < NCHANS; ++i) {
            ASSERT_EQ((int)i+1, res.code_list(i));
        }
        if (short_connection) {
            // Sleep to let `_messenger' detect `Socket' being `SetFailed'
            const int64_t start_time = butil::gettimeofday_us();
            while (_messenger.ConnectionCount() != 0) {
                EXPECT_LT(butil::gettimeofday_us(), start_time + 100000L/*100ms*/);
                bthread_usleep(1000);
            }
        } else {
            EXPECT_GE(1ul, _messenger.ConnectionCount());
        }
        StopAndJoin();
    }
    
    void TestSuccessSelective(bool single_server, bool async, bool short_connection) {
        std::cout << " *** single=" << single_server
                  << " async=" << async
                  << " short=" << short_connection << std::endl;

        const size_t NCHANS = 8;
        ASSERT_EQ(0, StartAccept(_ep));
        brpc::SelectiveChannel channel;
        brpc::ChannelOptions options;
        options.max_retry = 0;
        ASSERT_EQ(0, channel.Init("rr", &options));
        for (size_t i = 0; i < NCHANS; ++i) {
            brpc::Channel* subchan = new brpc::Channel;
            SetUpChannel(subchan, single_server, short_connection);
            ASSERT_EQ(0, channel.AddChannel(subchan, NULL)) << "i=" << i;
        }
        brpc::Controller cntl;
        test::EchoRequest req;
        test::EchoResponse res;
        req.set_message(__FUNCTION__);
        req.set_code(23);
        CallMethod(&channel, &cntl, &req, &res, async);

        EXPECT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText();
        EXPECT_EQ(1, cntl.sub_count());
        ASSERT_EQ(0, cntl.sub(0)->ErrorCode());
        EXPECT_EQ("received " + std::string(__FUNCTION__), res.message());
        ASSERT_EQ(1, res.code_list_size());
        ASSERT_EQ(req.code(), res.code_list(0));
        ASSERT_EQ(_ep, cntl.remote_side());
        
        if (short_connection) {
            // Sleep to let `_messenger' detect `Socket' being `SetFailed'
            const int64_t start_time = butil::gettimeofday_us();
            while (_messenger.ConnectionCount() != 0) {
                EXPECT_LT(butil::gettimeofday_us(), start_time + 100000L/*100ms*/);
                bthread_usleep(1000);
            }
        } else {
            EXPECT_GE(1ul, _messenger.ConnectionCount());
        }
        StopAndJoin();
    }

    void TestSkipParallel(bool single_server, bool async, bool short_connection) {
        std::cout << " *** single=" << single_server
                  << " async=" << async
                  << " short=" << short_connection << std::endl;

        ASSERT_EQ(0, StartAccept(_ep));
        const size_t NCHANS = 8;
        brpc::Channel subchans[NCHANS];
        brpc::ParallelChannel channel;
        for (size_t i = 0; i < NCHANS; ++i) {
            SetUpChannel(&subchans[i], single_server, short_connection);
            ASSERT_EQ(0, channel.AddChannel(
                          &subchans[i], brpc::DOESNT_OWN_CHANNEL,
                          new SetCodeOnEven, NULL));
        }
        brpc::Controller cntl;
        test::EchoRequest req;
        test::EchoResponse res;
        req.set_message(__FUNCTION__);
        req.set_code(23);
        CallMethod(&channel, &cntl, &req, &res, async);

        EXPECT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText();
        EXPECT_EQ("received " + std::string(__FUNCTION__), res.message());
        EXPECT_EQ(NCHANS, (size_t)cntl.sub_count());
        for (int i = 0; i < cntl.sub_count(); ++i) {
            if (i % 2) {
                EXPECT_TRUE(NULL == cntl.sub(i)) << "i=" << i;
            } else {
                EXPECT_TRUE(cntl.sub(i) && !cntl.sub(i)->Failed()) << "i=" << i;
            }
        }
        ASSERT_EQ(NCHANS / 2, (size_t)res.code_list_size());
        for (int i = 0; i < res.code_list_size(); ++i) {
            ASSERT_EQ(i*2 + 1, res.code_list(i));
        }
        if (short_connection) {
            // Sleep to let `_messenger' detect `Socket' being `SetFailed'
            const int64_t start_time = butil::gettimeofday_us();
            while (_messenger.ConnectionCount() != 0) {
                EXPECT_LT(butil::gettimeofday_us(), start_time + 100000L/*100ms*/);
                bthread_usleep(1000);
            }
        } else {
            EXPECT_GE(1ul, _messenger.ConnectionCount());
        }
        StopAndJoin();
    }

    void TestSuccessParallel2(bool single_server, bool async, bool short_connection) {
        std::cout << " *** single=" << single_server
                  << " async=" << async
                  << " short=" << short_connection << std::endl;

        ASSERT_EQ(0, StartAccept(_ep));
        const size_t NCHANS = 8;
        brpc::Channel subchans[NCHANS];
        brpc::ParallelChannel channel;
        for (size_t i = 0; i < NCHANS; ++i) {
            SetUpChannel(&subchans[i], single_server, short_connection);
            ASSERT_EQ(0, channel.AddChannel(
                          &subchans[i], brpc::DOESNT_OWN_CHANNEL,
                          new GetReqAndAddRes, new MergeNothing));
        }
        brpc::Controller cntl;
        test::ComboRequest req;
        test::ComboResponse res;
        CallMethod(&channel, &cntl, &req, &res, false);
        ASSERT_TRUE(cntl.Failed()); // req does not have .requests
        ASSERT_EQ(brpc::EREQUEST, cntl.ErrorCode());

        for (size_t i = 0; i < NCHANS; ++i) {
            ::test::EchoRequest* sub_req = req.add_requests();
            sub_req->set_message(butil::string_printf("hello_%llu", (long long)i));
            sub_req->set_code(i + 1);
        }

        // non-parallel channel does not work.
        cntl.Reset();
        CallMethod(&subchans[0], &cntl, &req, &res, false);
        ASSERT_TRUE(cntl.Failed());
        ASSERT_EQ(brpc::EINTERNAL, cntl.ErrorCode()) << cntl.ErrorText();
        ASSERT_TRUE(butil::StringPiece(cntl.ErrorText()).ends_with("Method ComboEcho() not implemented."));

        // do the rpc call.
        cntl.Reset();
        CallMethod(&channel, &cntl, &req, &res, async);

        EXPECT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText();
        ASSERT_GT(cntl.latency_us(), 0);
        ASSERT_EQ((int)NCHANS, res.responses_size());
        for (int i = 0; i < res.responses_size(); ++i) {
            EXPECT_EQ(butil::string_printf("received hello_%d", i),
                      res.responses(i).message());
            ASSERT_EQ(1, res.responses(i).code_list_size());
            EXPECT_EQ(i + 1, res.responses(i).code_list(0));
        }
        if (short_connection) {
            // Sleep to let `_messenger' detect `Socket' being `SetFailed'
            const int64_t start_time = butil::gettimeofday_us();
            while (_messenger.ConnectionCount() != 0) {
                EXPECT_LT(butil::gettimeofday_us(), start_time + 100000L/*100ms*/);
                bthread_usleep(1000);
            }
        } else {
            EXPECT_GE(1ul, _messenger.ConnectionCount());
        }
        StopAndJoin();
    }
    
    struct CancelerArg {
        int64_t sleep_before_cancel_us;
        brpc::CallId cid;
    };

    static void* Canceler(void* void_arg) {
        CancelerArg* arg = static_cast<CancelerArg*>(void_arg);
        if (arg->sleep_before_cancel_us > 0) {
            bthread_usleep(arg->sleep_before_cancel_us);
        }
        LOG(INFO) << "Start to cancel cid=" << arg->cid.value;
        brpc::StartCancel(arg->cid);
        return NULL;
    }


    void CancelBeforeCallMethod(
        bool single_server, bool async, bool short_connection) {
        std::cout << " *** single=" << single_server
                  << " async=" << async
                  << " short=" << short_connection << std::endl;

        ASSERT_EQ(0, StartAccept(_ep));
        brpc::Channel channel;
        SetUpChannel(&channel, single_server, short_connection);
                
        brpc::Controller cntl;
        test::EchoRequest req;
        test::EchoResponse res;
        req.set_message(__FUNCTION__);
        const brpc::CallId cid = cntl.call_id();
        ASSERT_TRUE(cid.value != 0);
        brpc::StartCancel(cid);
        CallMethod(&channel, &cntl, &req, &res, async);
        EXPECT_EQ(ECANCELED, cntl.ErrorCode()) << cntl.ErrorText();
        StopAndJoin();
    }

    void CancelBeforeCallMethodParallel(
        bool single_server, bool async, bool short_connection) {
        std::cout << " *** single=" << single_server
                  << " async=" << async
                  << " short=" << short_connection << std::endl;

        ASSERT_EQ(0, StartAccept(_ep));

        const size_t NCHANS = 8;
        brpc::Channel subchans[NCHANS];
        brpc::ParallelChannel channel;
        for (size_t i = 0; i < NCHANS; ++i) {
            SetUpChannel(&subchans[i], single_server, short_connection);
            ASSERT_EQ(0, channel.AddChannel(
                          &subchans[i], brpc::DOESNT_OWN_CHANNEL,
                          NULL, NULL));
        }
                
        brpc::Controller cntl;
        test::EchoRequest req;
        test::EchoResponse res;
        req.set_message(__FUNCTION__);
        const brpc::CallId cid = cntl.call_id();
        ASSERT_TRUE(cid.value != 0);
        brpc::StartCancel(cid);
        CallMethod(&channel, &cntl, &req, &res, async);
        EXPECT_EQ(ECANCELED, cntl.ErrorCode()) << cntl.ErrorText();
        EXPECT_EQ(NCHANS, (size_t)cntl.sub_count());
        EXPECT_TRUE(NULL == cntl.sub(1));
        EXPECT_TRUE(NULL == cntl.sub(0));
        StopAndJoin();
    }

    void CancelBeforeCallMethodSelective(
        bool single_server, bool async, bool short_connection) {
        std::cout << " *** single=" << single_server
                  << " async=" << async
                  << " short=" << short_connection << std::endl;

        ASSERT_EQ(0, StartAccept(_ep));

        const size_t NCHANS = 8;
        brpc::SelectiveChannel channel;
        ASSERT_EQ(0, channel.Init("rr", NULL));
        for (size_t i = 0; i < NCHANS; ++i) {
            brpc::Channel* subchan = new brpc::Channel;
            SetUpChannel(subchan, single_server, short_connection);
            ASSERT_EQ(0, channel.AddChannel(subchan, NULL)) << "i=" << i;
        }
                
        brpc::Controller cntl;
        test::EchoRequest req;
        test::EchoResponse res;
        req.set_message(__FUNCTION__);
        const brpc::CallId cid = cntl.call_id();
        ASSERT_TRUE(cid.value != 0);
        brpc::StartCancel(cid);
        CallMethod(&channel, &cntl, &req, &res, async);
        EXPECT_EQ(ECANCELED, cntl.ErrorCode()) << cntl.ErrorText();
        StopAndJoin();
    }

    void CancelDuringCallMethod(
        bool single_server, bool async, bool short_connection) {
        std::cout << " *** single=" << single_server
                  << " async=" << async
                  << " short=" << short_connection << std::endl;

        ASSERT_EQ(0, StartAccept(_ep));
        brpc::Channel channel;
        SetUpChannel(&channel, single_server, short_connection);
                
        brpc::Controller cntl;
        test::EchoRequest req;
        test::EchoResponse res;
        req.set_message(__FUNCTION__);
        const brpc::CallId cid = cntl.call_id();
        ASSERT_TRUE(cid.value != 0);
        pthread_t th;
        CancelerArg carg = { 10000, cid };
        ASSERT_EQ(0, pthread_create(&th, NULL, Canceler, &carg));
        req.set_sleep_us(carg.sleep_before_cancel_us * 2);
        butil::Timer tm;
        tm.start();
        CallMethod(&channel, &cntl, &req, &res, async);
        tm.stop();
        EXPECT_LT(labs(tm.u_elapsed() - carg.sleep_before_cancel_us), 10000);
        ASSERT_EQ(0, pthread_join(th, NULL));
        EXPECT_EQ(ECANCELED, cntl.ErrorCode());
        EXPECT_EQ(0, cntl.sub_count());
        EXPECT_TRUE(NULL == cntl.sub(1));
        EXPECT_TRUE(NULL == cntl.sub(0));
        StopAndJoin();
    }
    
    void CancelDuringCallMethodParallel(
        bool single_server, bool async, bool short_connection) {
        std::cout << " *** single=" << single_server
                  << " async=" << async
                  << " short=" << short_connection << std::endl;

        ASSERT_EQ(0, StartAccept(_ep));

        const size_t NCHANS = 8;
        brpc::Channel subchans[NCHANS];
        brpc::ParallelChannel channel;
        for (size_t i = 0; i < NCHANS; ++i) {
            SetUpChannel(&subchans[i], single_server, short_connection);
            ASSERT_EQ(0, channel.AddChannel(
                          &subchans[i], brpc::DOESNT_OWN_CHANNEL,
                          NULL, NULL));
        }
                
        brpc::Controller cntl;
        test::EchoRequest req;
        test::EchoResponse res;
        req.set_message(__FUNCTION__);
        const brpc::CallId cid = cntl.call_id();
        ASSERT_TRUE(cid.value != 0);
        pthread_t th;
        CancelerArg carg = { 10000, cid };
        ASSERT_EQ(0, pthread_create(&th, NULL, Canceler, &carg));
        req.set_sleep_us(carg.sleep_before_cancel_us * 2);
        butil::Timer tm;
        tm.start();
        CallMethod(&channel, &cntl, &req, &res, async);
        tm.stop();
        EXPECT_LT(labs(tm.u_elapsed() - carg.sleep_before_cancel_us), 10000);
        ASSERT_EQ(0, pthread_join(th, NULL));
        EXPECT_EQ(ECANCELED, cntl.ErrorCode());
        EXPECT_EQ(NCHANS, (size_t)cntl.sub_count());
        for (int i = 0; i < cntl.sub_count(); ++i) {
            EXPECT_EQ(ECANCELED, cntl.sub(i)->ErrorCode()) << "i=" << i;
        }
        EXPECT_LT(labs(cntl.latency_us() - carg.sleep_before_cancel_us), 10000);
        StopAndJoin();
    }

    void CancelDuringCallMethodSelective(
        bool single_server, bool async, bool short_connection) {
        std::cout << " *** single=" << single_server
                  << " async=" << async
                  << " short=" << short_connection << std::endl;

        ASSERT_EQ(0, StartAccept(_ep));

        const size_t NCHANS = 8;
        brpc::SelectiveChannel channel;
        ASSERT_EQ(0, channel.Init("rr", NULL));
        for (size_t i = 0; i < NCHANS; ++i) {
            brpc::Channel* subchan = new brpc::Channel;
            SetUpChannel(subchan, single_server, short_connection);
            ASSERT_EQ(0, channel.AddChannel(subchan, NULL)) << "i=" << i;
        }
                
        brpc::Controller cntl;
        test::EchoRequest req;
        test::EchoResponse res;
        req.set_message(__FUNCTION__);
        const brpc::CallId cid = cntl.call_id();
        ASSERT_TRUE(cid.value != 0);
        pthread_t th;
        CancelerArg carg = { 10000, cid };
        ASSERT_EQ(0, pthread_create(&th, NULL, Canceler, &carg));
        req.set_sleep_us(carg.sleep_before_cancel_us * 2);
        butil::Timer tm;
        tm.start();
        CallMethod(&channel, &cntl, &req, &res, async);
        tm.stop();
        EXPECT_LT(labs(tm.u_elapsed() - carg.sleep_before_cancel_us), 10000);
        ASSERT_EQ(0, pthread_join(th, NULL));
        EXPECT_EQ(ECANCELED, cntl.ErrorCode());
        EXPECT_EQ(1, cntl.sub_count());
        EXPECT_EQ(ECANCELED, cntl.sub(0)->ErrorCode());
        StopAndJoin();
    }
    
    void CancelAfterCallMethod(
        bool single_server, bool async, bool short_connection) {
        std::cout << " *** single=" << single_server
                  << " async=" << async
                  << " short=" << short_connection << std::endl;

        ASSERT_EQ(0, StartAccept(_ep));
        brpc::Channel channel;
        SetUpChannel(&channel, single_server, short_connection);
                
        brpc::Controller cntl;
        test::EchoRequest req;
        test::EchoResponse res;
        req.set_message(__FUNCTION__);
        const brpc::CallId cid = cntl.call_id();
        ASSERT_TRUE(cid.value != 0);
        CallMethod(&channel, &cntl, &req, &res, async);
        EXPECT_EQ(0, cntl.ErrorCode());
        EXPECT_EQ(0, cntl.sub_count());
        ASSERT_EQ(EINVAL, bthread_id_error(cid, ECANCELED));
        StopAndJoin();
    }

    void CancelAfterCallMethodParallel(
        bool single_server, bool async, bool short_connection) {
        std::cout << " *** single=" << single_server
                  << " async=" << async
                  << " short=" << short_connection << std::endl;

        ASSERT_EQ(0, StartAccept(_ep));

        const size_t NCHANS = 8;
        brpc::Channel subchans[NCHANS];
        brpc::ParallelChannel channel;
        for (size_t i = 0; i < NCHANS; ++i) {
            SetUpChannel(&subchans[i], single_server, short_connection);
            ASSERT_EQ(0, channel.AddChannel(
                          &subchans[i], brpc::DOESNT_OWN_CHANNEL,
                          NULL, NULL));
        }
                
        brpc::Controller cntl;
        test::EchoRequest req;
        test::EchoResponse res;
        req.set_message(__FUNCTION__);
        const brpc::CallId cid = cntl.call_id();
        ASSERT_TRUE(cid.value != 0);
        CallMethod(&channel, &cntl, &req, &res, async);
        EXPECT_EQ(0, cntl.ErrorCode());
        EXPECT_EQ(NCHANS, (size_t)cntl.sub_count());
        for (int i = 0; i < cntl.sub_count(); ++i) {
            EXPECT_TRUE(cntl.sub(i) && !cntl.sub(i)->Failed()) << "i=" << i;
        }
        ASSERT_EQ(EINVAL, bthread_id_error(cid, ECANCELED));
        StopAndJoin();
    }

    void TestAttachment(bool async, bool short_connection) {
        ASSERT_EQ(0, StartAccept(_ep));
        brpc::Channel channel;
        SetUpChannel(&channel, true, short_connection);
                
        brpc::Controller cntl;
        cntl.request_attachment().append("attachment");
        test::EchoRequest req;
        test::EchoResponse res;
        req.set_message(__FUNCTION__);
        CallMethod(&channel, &cntl, &req, &res, async);
        
        EXPECT_EQ(0, cntl.ErrorCode())  << short_connection;
        EXPECT_FALSE(cntl.request_attachment().empty())
            << ", " << async << ", " << short_connection;
        EXPECT_EQ("received " + std::string(__FUNCTION__), res.message());
        if (short_connection) {
            // Sleep to let `_messenger' detect `Socket' being `SetFailed'
            const int64_t start_time = butil::gettimeofday_us();
            while (_messenger.ConnectionCount() != 0) {
                EXPECT_LT(butil::gettimeofday_us(), start_time + 100000L/*100ms*/);
                bthread_usleep(1000);
            }
        } else {
            EXPECT_GE(1ul, _messenger.ConnectionCount());
        }            
        StopAndJoin();
    }

    void TestRequestNotInit(bool single_server, bool async,
                            bool short_connection) {
        std::cout << " *** single=" << single_server
                  << " async=" << async
                  << " short=" << short_connection << std::endl;
        ASSERT_EQ(0, StartAccept(_ep));
        brpc::Channel channel;
        SetUpChannel(&channel, single_server, short_connection);
                
        brpc::Controller cntl;
        test::EchoRequest req;
        test::EchoResponse res;
        CallMethod(&channel, &cntl, &req, &res, async);
        EXPECT_EQ(brpc::EREQUEST, cntl.ErrorCode()) << cntl.ErrorText();
        StopAndJoin();
    }

    void TestRequestNotInitParallel(bool single_server, bool async,
                                    bool short_connection) {
        std::cout << " *** single=" << single_server
                  << " async=" << async
                  << " short=" << short_connection << std::endl;
        ASSERT_EQ(0, StartAccept(_ep));

        const size_t NCHANS = 8;
        brpc::Channel subchans[NCHANS];
        brpc::ParallelChannel channel;
        for (size_t i = 0; i < NCHANS; ++i) {
            SetUpChannel(&subchans[i], single_server, short_connection);
            ASSERT_EQ(0, channel.AddChannel(
                          &subchans[i], brpc::DOESNT_OWN_CHANNEL,
                          NULL, NULL));
        }
        
        brpc::Controller cntl;
        test::EchoRequest req;
        test::EchoResponse res;
        CallMethod(&channel, &cntl, &req, &res, async);
        EXPECT_EQ(brpc::EREQUEST, cntl.ErrorCode()) << cntl.ErrorText();
        LOG(WARNING) << cntl.ErrorText();
        StopAndJoin();
    }

    void TestRequestNotInitSelective(bool single_server, bool async,
                                     bool short_connection) {
        std::cout << " *** single=" << single_server
                  << " async=" << async
                  << " short=" << short_connection << std::endl;
        ASSERT_EQ(0, StartAccept(_ep));

        const size_t NCHANS = 8;
        brpc::SelectiveChannel channel;
        ASSERT_EQ(0, channel.Init("rr", NULL));
        for (size_t i = 0; i < NCHANS; ++i) {
            brpc::Channel* subchan = new brpc::Channel;
            SetUpChannel(subchan, single_server, short_connection);
            ASSERT_EQ(0, channel.AddChannel(subchan, NULL)) << "i=" << i;
        }
        
        brpc::Controller cntl;
        test::EchoRequest req;
        test::EchoResponse res;
        CallMethod(&channel, &cntl, &req, &res, async);
        EXPECT_EQ(brpc::EREQUEST, cntl.ErrorCode()) << cntl.ErrorText();
        LOG(WARNING) << cntl.ErrorText();
        ASSERT_EQ(1, cntl.sub_count());
        ASSERT_EQ(brpc::EREQUEST, cntl.sub(0)->ErrorCode());
        StopAndJoin();
    }
    
    void TestRPCTimeout(bool single_server, bool async, bool short_connection) {
        std::cout << " *** single=" << single_server
                  << " async=" << async
                  << " short=" << short_connection << std::endl;
        ASSERT_EQ(0, StartAccept(_ep));
        brpc::Channel channel;
        SetUpChannel(&channel, single_server, short_connection);
                
        brpc::Controller cntl;
        test::EchoRequest req;
        test::EchoResponse res;
        req.set_message(__FUNCTION__);
        req.set_sleep_us(70000); // 70ms
        cntl.set_timeout_ms(17);
        butil::Timer tm;
        tm.start();
        CallMethod(&channel, &cntl, &req, &res, async);
        tm.stop();
        EXPECT_EQ(brpc::ERPCTIMEDOUT, cntl.ErrorCode()) << cntl.ErrorText();
        EXPECT_LT(labs(tm.m_elapsed() - cntl.timeout_ms()), 10);
        StopAndJoin();
    }

    void TestRPCTimeoutParallel(
        bool single_server, bool async, bool short_connection) {
        std::cout << " *** single=" << single_server
                  << " async=" << async
                  << " short=" << short_connection << std::endl;
        ASSERT_EQ(0, StartAccept(_ep));
        
        const size_t NCHANS = 8;
        brpc::Channel subchans[NCHANS];
        brpc::ParallelChannel channel;
        for (size_t i = 0; i < NCHANS; ++i) {
            SetUpChannel(&subchans[i], single_server, short_connection);
            ASSERT_EQ(0, channel.AddChannel(
                          &subchans[i], brpc::DOESNT_OWN_CHANNEL,
                          NULL, NULL));
        }
                
        brpc::Controller cntl;
        test::EchoRequest req;
        test::EchoResponse res;
        req.set_message(__FUNCTION__);
        cntl.set_timeout_ms(17);
        req.set_sleep_us(70000); // 70ms
        butil::Timer tm;
        tm.start();
        CallMethod(&channel, &cntl, &req, &res, async);
        tm.stop();
        EXPECT_EQ(brpc::ERPCTIMEDOUT, cntl.ErrorCode()) << cntl.ErrorText();
        EXPECT_EQ(NCHANS, (size_t)cntl.sub_count());
        for (int i = 0; i < cntl.sub_count(); ++i) {
            EXPECT_EQ(ECANCELED, cntl.sub(i)->ErrorCode()) << "i=" << i;
        }
        EXPECT_LT(labs(tm.m_elapsed() - cntl.timeout_ms()), 10);
        StopAndJoin();
    }

    class MakeTheRequestTimeout : public brpc::CallMapper {
    public:
        brpc::SubCall Map(
            int /*channel_index*/,
            const google::protobuf::MethodDescriptor* method,
            const google::protobuf::Message* req_base,
            google::protobuf::Message* response) {
            test::EchoRequest* req = brpc::Clone<test::EchoRequest>(req_base);
            req->set_sleep_us(70000); // 70ms
            return brpc::SubCall(method, req, response->New(),
                                brpc::DELETE_REQUEST | brpc::DELETE_RESPONSE);
        }
    };

    void TimeoutStillChecksSubChannelsParallel(
        bool single_server, bool async, bool short_connection) {
        std::cout << " *** single=" << single_server
                  << " async=" << async
                  << " short=" << short_connection << std::endl;
        ASSERT_EQ(0, StartAccept(_ep));
        
        const size_t NCHANS = 8;
        brpc::Channel subchans[NCHANS];
        brpc::ParallelChannel channel;
        for (size_t i = 0; i < NCHANS; ++i) {
            SetUpChannel(&subchans[i], single_server, short_connection);
            ASSERT_EQ(0, channel.AddChannel(
                          &subchans[i], brpc::DOESNT_OWN_CHANNEL,
                          ((i % 2) ? new MakeTheRequestTimeout : NULL), NULL));
        }
                
        brpc::Controller cntl;
        test::EchoRequest req;
        test::EchoResponse res;
        req.set_message(__FUNCTION__);
        cntl.set_timeout_ms(30);
        butil::Timer tm;
        tm.start();
        CallMethod(&channel, &cntl, &req, &res, async);
        tm.stop();
        EXPECT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText();
        EXPECT_EQ(NCHANS, (size_t)cntl.sub_count());
        for (int i = 0; i < cntl.sub_count(); ++i) {
            if (i % 2) {
                EXPECT_EQ(ECANCELED, cntl.sub(i)->ErrorCode());
            } else {
                EXPECT_EQ(0, cntl.sub(i)->ErrorCode());
            }
        }
        EXPECT_LT(labs(tm.m_elapsed() - cntl.timeout_ms()), 10);
        StopAndJoin();
    }

    void TestRPCTimeoutSelective(
        bool single_server, bool async, bool short_connection) {
        std::cout << " *** single=" << single_server
                  << " async=" << async
                  << " short=" << short_connection << std::endl;
        ASSERT_EQ(0, StartAccept(_ep));

        const size_t NCHANS = 8;
        brpc::SelectiveChannel channel;
        ASSERT_EQ(0, channel.Init("rr", NULL));
        for (size_t i = 0; i < NCHANS; ++i) {
            brpc::Channel* subchan = new brpc::Channel;
            SetUpChannel(subchan, single_server, short_connection);
            ASSERT_EQ(0, channel.AddChannel(subchan, NULL)) << "i=" << i;
        }
                
        brpc::Controller cntl;
        test::EchoRequest req;
        test::EchoResponse res;
        req.set_message(__FUNCTION__);
        cntl.set_timeout_ms(17);
        req.set_sleep_us(70000); // 70ms
        butil::Timer tm;
        tm.start();
        CallMethod(&channel, &cntl, &req, &res, async);
        tm.stop();
        EXPECT_EQ(brpc::ERPCTIMEDOUT, cntl.ErrorCode()) << cntl.ErrorText();
        EXPECT_EQ(1, cntl.sub_count());
        EXPECT_EQ(brpc::ERPCTIMEDOUT, cntl.sub(0)->ErrorCode());
        EXPECT_LT(labs(tm.m_elapsed() - cntl.timeout_ms()), 10);
        StopAndJoin();
    }
    
    void TestCloseFD(bool single_server, bool async, bool short_connection) {
        std::cout << " *** single=" << single_server
                  << " async=" << async
                  << " short=" << short_connection << std::endl;

        ASSERT_EQ(0, StartAccept(_ep));
        brpc::Channel channel;
        SetUpChannel(&channel, single_server, short_connection);
                
        brpc::Controller cntl;
        test::EchoRequest req;
        test::EchoResponse res;
        req.set_message(__FUNCTION__);
        req.set_close_fd(true);
        CallMethod(&channel, &cntl, &req, &res, async);
        
        EXPECT_EQ(brpc::EEOF, cntl.ErrorCode()) << cntl.ErrorText();
        StopAndJoin();
    }

    void TestCloseFDParallel(bool single_server, bool async, bool short_connection) {
        std::cout << " *** single=" << single_server
                  << " async=" << async
                  << " short=" << short_connection << std::endl;

        ASSERT_EQ(0, StartAccept(_ep));

        const size_t NCHANS = 8;
        brpc::Channel subchans[NCHANS];
        brpc::ParallelChannel channel;
        for (size_t i = 0; i < NCHANS; ++i) {
            SetUpChannel(&subchans[i], single_server, short_connection);
            ASSERT_EQ(0, channel.AddChannel(
                          &subchans[i], brpc::DOESNT_OWN_CHANNEL,
                          NULL, NULL));
        }

        brpc::Controller cntl;
        test::EchoRequest req;
        test::EchoResponse res;
        req.set_message(__FUNCTION__);
        req.set_close_fd(true);
        CallMethod(&channel, &cntl, &req, &res, async);
        
        EXPECT_TRUE(brpc::EEOF == cntl.ErrorCode() ||
                    brpc::ETOOMANYFAILS == cntl.ErrorCode() ||
                    ECONNRESET == cntl.ErrorCode()) << cntl.ErrorText();
        StopAndJoin();
    }

    void TestCloseFDSelective(bool single_server, bool async, bool short_connection) {
        std::cout << " *** single=" << single_server
                  << " async=" << async
                  << " short=" << short_connection << std::endl;

        ASSERT_EQ(0, StartAccept(_ep));

        const size_t NCHANS = 8;
        brpc::SelectiveChannel channel;
        brpc::ChannelOptions options;
        options.max_retry = 0;
        ASSERT_EQ(0, channel.Init("rr", &options));
        for (size_t i = 0; i < NCHANS; ++i) {
            brpc::Channel* subchan = new brpc::Channel;
            SetUpChannel(subchan, single_server, short_connection);
            ASSERT_EQ(0, channel.AddChannel(subchan, NULL)) << "i=" << i;
        }

        brpc::Controller cntl;
        test::EchoRequest req;
        test::EchoResponse res;
        req.set_message(__FUNCTION__);
        req.set_close_fd(true);
        CallMethod(&channel, &cntl, &req, &res, async);
        
        EXPECT_EQ(brpc::EEOF, cntl.ErrorCode()) << cntl.ErrorText();
        ASSERT_EQ(1, cntl.sub_count());
        ASSERT_EQ(brpc::EEOF, cntl.sub(0)->ErrorCode());

        StopAndJoin();
    }
    
    void TestServerFail(bool single_server, bool async, bool short_connection) {
        std::cout << " *** single=" << single_server
                  << " async=" << async
                  << " short=" << short_connection << std::endl;

        ASSERT_EQ(0, StartAccept(_ep));
        brpc::Channel channel;
        SetUpChannel(&channel, single_server, short_connection);
                
        brpc::Controller cntl;
        test::EchoRequest req;
        test::EchoResponse res;
        req.set_message(__FUNCTION__);
        req.set_server_fail(brpc::EINTERNAL);
        CallMethod(&channel, &cntl, &req, &res, async);
        
        EXPECT_EQ(brpc::EINTERNAL, cntl.ErrorCode()) << cntl.ErrorText();
        StopAndJoin();
    }

    void TestServerFailParallel(bool single_server, bool async, bool short_connection) {
        std::cout << " *** single=" << single_server
                  << " async=" << async
                  << " short=" << short_connection << std::endl;

        ASSERT_EQ(0, StartAccept(_ep));

        const size_t NCHANS = 8;
        brpc::Channel subchans[NCHANS];
        brpc::ParallelChannel channel;
        for (size_t i = 0; i < NCHANS; ++i) {
            SetUpChannel(&subchans[i], single_server, short_connection);
            ASSERT_EQ(0, channel.AddChannel(
                          &subchans[i], brpc::DOESNT_OWN_CHANNEL,
                          NULL, NULL));
        }

        brpc::Controller cntl;
        test::EchoRequest req;
        test::EchoResponse res;
        req.set_message(__FUNCTION__);
        req.set_server_fail(brpc::EINTERNAL);
        CallMethod(&channel, &cntl, &req, &res, async);
        
        EXPECT_EQ(brpc::EINTERNAL, cntl.ErrorCode()) << cntl.ErrorText();
        LOG(INFO) << cntl.ErrorText();
        StopAndJoin();
    }

    void TestServerFailSelective(bool single_server, bool async, bool short_connection) {
        std::cout << " *** single=" << single_server
                  << " async=" << async
                  << " short=" << short_connection << std::endl;

        ASSERT_EQ(0, StartAccept(_ep));

        const size_t NCHANS = 5;
        brpc::SelectiveChannel channel;
        ASSERT_EQ(0, channel.Init("rr", NULL));
        for (size_t i = 0; i < NCHANS; ++i) {
            brpc::Channel* subchan = new brpc::Channel;
            SetUpChannel(subchan, single_server, short_connection);
            ASSERT_EQ(0, channel.AddChannel(subchan, NULL)) << "i=" << i;
        }

        brpc::Controller cntl;
        test::EchoRequest req;
        test::EchoResponse res;
        req.set_message(__FUNCTION__);
        req.set_server_fail(brpc::EINTERNAL);
        CallMethod(&channel, &cntl, &req, &res, async);
        
        EXPECT_EQ(brpc::EINTERNAL, cntl.ErrorCode()) << cntl.ErrorText();
        ASSERT_EQ(1, cntl.sub_count());
        ASSERT_EQ(brpc::EINTERNAL, cntl.sub(0)->ErrorCode());

        LOG(INFO) << cntl.ErrorText();
        StopAndJoin();
    }
    
    void TestDestroyChannel(bool single_server, bool short_connection) {
        std::cout << "*** single=" << single_server
                  << ", short=" << short_connection << std::endl;

        ASSERT_EQ(0, StartAccept(_ep));
        brpc::Channel* channel = new brpc::Channel();
        SetUpChannel(channel, single_server, short_connection);
                
        brpc::Controller cntl;
        test::EchoRequest req;
        test::EchoResponse res;
        req.set_message(__FUNCTION__);
        req.set_sleep_us(10000);
        CallMethod(channel, &cntl, &req, &res, true, true/*destroy*/);

        EXPECT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText();
        EXPECT_EQ("received " + std::string(__FUNCTION__), res.message());
        // Sleep to let `_messenger' detect `Socket' being `SetFailed'
        const int64_t start_time = butil::gettimeofday_us();
        while (_messenger.ConnectionCount() != 0) {
            EXPECT_LT(butil::gettimeofday_us(), start_time + 100000L/*100ms*/);
            bthread_usleep(1000);
        }

        StopAndJoin();
    }
    
    void TestDestroyChannelParallel(bool single_server, bool short_connection) {
        std::cout << "*** single=" << single_server
                  << ", short=" << short_connection << std::endl;

        const size_t NCHANS = 5;
        ASSERT_EQ(0, StartAccept(_ep));
        brpc::ParallelChannel* channel = new brpc::ParallelChannel;
        for (size_t i = 0; i < NCHANS; ++i) {
            brpc::Channel* subchan = new brpc::Channel();
            SetUpChannel(subchan, single_server, short_connection);
            ASSERT_EQ(0, channel->AddChannel(
                          subchan, brpc::OWNS_CHANNEL, NULL, NULL));
        }
                
        brpc::Controller cntl;
        test::EchoRequest req;
        test::EchoResponse res;
        req.set_sleep_us(10000);
        req.set_message(__FUNCTION__);
        CallMethod(channel, &cntl, &req, &res, true, true/*destroy*/);
        
        EXPECT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText();
        EXPECT_EQ("received " + std::string(__FUNCTION__), res.message());
        // Sleep to let `_messenger' detect `Socket' being `SetFailed'
        const int64_t start_time = butil::gettimeofday_us();
        while (_messenger.ConnectionCount() != 0) {
            EXPECT_LT(butil::gettimeofday_us(), start_time + 100000L/*100ms*/);
            bthread_usleep(1000);
        }
        StopAndJoin();
    }

    void TestDestroyChannelSelective(bool single_server, bool short_connection) {
        std::cout << "*** single=" << single_server
                  << ", short=" << short_connection << std::endl;

        const size_t NCHANS = 5;
        ASSERT_EQ(0, StartAccept(_ep));
        brpc::SelectiveChannel* channel = new brpc::SelectiveChannel;
        ASSERT_EQ(0, channel->Init("rr", NULL));
        for (size_t i = 0; i < NCHANS; ++i) {
            brpc::Channel* subchan = new brpc::Channel();
            SetUpChannel(subchan, single_server, short_connection);
            ASSERT_EQ(0, channel->AddChannel(subchan, NULL));
        }
                
        brpc::Controller cntl;
        test::EchoRequest req;
        test::EchoResponse res;
        req.set_sleep_us(10000);
        req.set_message(__FUNCTION__);
        CallMethod(channel, &cntl, &req, &res, true, true/*destroy*/);
        
        EXPECT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText();
        EXPECT_EQ("received " + std::string(__FUNCTION__), res.message());
        ASSERT_EQ(_ep, cntl.remote_side());
        ASSERT_EQ(1, cntl.sub_count());
        ASSERT_EQ(0, cntl.sub(0)->ErrorCode());

        // Sleep to let `_messenger' detect `Socket' being `SetFailed'
        const int64_t start_time = butil::gettimeofday_us();
        while (_messenger.ConnectionCount() != 0) {
            EXPECT_LT(butil::gettimeofday_us(), start_time + 100000L/*100ms*/);
            bthread_usleep(1000);
        }
        StopAndJoin();
    }
    
    void RPCThread(brpc::ChannelBase* channel, bool async) {
        brpc::Controller cntl;
        test::EchoRequest req;
        test::EchoResponse res;
        req.set_message(__FUNCTION__);
        CallMethod(channel, &cntl, &req, &res, async);

        ASSERT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText();
        EXPECT_EQ("received " + std::string(__FUNCTION__), res.message());
    }

    void RPCThread(brpc::ChannelBase* channel, bool async, int count) {
        brpc::Controller cntl;
        for (int i = 0; i < count; ++i) {
            test::EchoRequest req;
            test::EchoResponse res;
            req.set_message(__FUNCTION__);
            CallMethod(channel, &cntl, &req, &res, async);
            
            ASSERT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText();
            ASSERT_EQ("received " + std::string(__FUNCTION__), res.message());
            cntl.Reset();
        }
    }

    void RPCThread(bool single_server, bool async, bool short_connection,
                   const brpc::Authenticator* auth, int count) {
        brpc::Channel channel;
        SetUpChannel(&channel, single_server, short_connection, auth);
        brpc::Controller cntl;
        for (int i = 0; i < count; ++i) {
            test::EchoRequest req;
            test::EchoResponse res;
            req.set_message(__FUNCTION__);
            CallMethod(&channel, &cntl, &req, &res, async);

            ASSERT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText();
            ASSERT_EQ("received " + std::string(__FUNCTION__), res.message());
            cntl.Reset();
        }
    }

    void TestAuthentication(bool single_server, 
                            bool async, bool short_connection) {
        std::cout << " *** single=" << single_server
                  << " async=" << async
                  << " short=" << short_connection << std::endl;

        ASSERT_EQ(0, StartAccept(_ep));
        MyAuthenticator auth;
        brpc::Channel channel;
        SetUpChannel(&channel, single_server, short_connection, &auth);

        const int NUM = 10;
        pthread_t tids[NUM];
        for (int i = 0; i < NUM; ++i) {
            google::protobuf::Closure* thrd_func = 
                brpc::NewCallback(
                    this, &ChannelTest::RPCThread, (brpc::ChannelBase*)&channel, async);
            EXPECT_EQ(0, pthread_create(&tids[i], NULL,
                                        RunClosure, thrd_func));
        }
        for (int i = 0; i < NUM; ++i) {
            pthread_join(tids[i], NULL);
        }
        
        if (short_connection) {
            EXPECT_EQ(NUM, auth.count.load());
        } else {
            EXPECT_EQ(1, auth.count.load());
        }
        StopAndJoin();
    }

    void TestAuthenticationParallel(bool single_server, 
                                    bool async, bool short_connection) {
        std::cout << " *** single=" << single_server
                  << " async=" << async
                  << " short=" << short_connection << std::endl;

        ASSERT_EQ(0, StartAccept(_ep));
        MyAuthenticator auth;

        const int NCHANS = 5;
        brpc::Channel subchans[NCHANS];
        brpc::ParallelChannel channel;
        for (int i = 0; i < NCHANS; ++i) {
            SetUpChannel(&subchans[i], single_server, short_connection, &auth);
            ASSERT_EQ(0, channel.AddChannel(
                          &subchans[i], brpc::DOESNT_OWN_CHANNEL,
                          NULL, NULL));
        }
        
        const int NUM = 10;
        pthread_t tids[NUM];
        for (int i = 0; i < NUM; ++i) {
            google::protobuf::Closure* thrd_func = 
                brpc::NewCallback(
                    this, &ChannelTest::RPCThread, (brpc::ChannelBase*)&channel, async);
            EXPECT_EQ(0, pthread_create(&tids[i], NULL,
                                        RunClosure, thrd_func));
        }
        for (int i = 0; i < NUM; ++i) {
            pthread_join(tids[i], NULL);
        }
        
        if (short_connection) {
            EXPECT_EQ(NUM * NCHANS, auth.count.load());
        } else {
            EXPECT_EQ(1, auth.count.load());
        }
        StopAndJoin();
    }

    void TestAuthenticationSelective(bool single_server, 
                                    bool async, bool short_connection) {
        std::cout << " *** single=" << single_server
                  << " async=" << async
                  << " short=" << short_connection << std::endl;

        ASSERT_EQ(0, StartAccept(_ep));
        MyAuthenticator auth;

        const size_t NCHANS = 5;
        brpc::SelectiveChannel channel;
        ASSERT_EQ(0, channel.Init("rr", NULL));
        for (size_t i = 0; i < NCHANS; ++i) {
            brpc::Channel* subchan = new brpc::Channel;
            SetUpChannel(subchan, single_server, short_connection, &auth);
            ASSERT_EQ(0, channel.AddChannel(subchan, NULL)) << "i=" << i;
        }
        
        const int NUM = 10;
        pthread_t tids[NUM];
        for (int i = 0; i < NUM; ++i) {
            google::protobuf::Closure* thrd_func = 
                brpc::NewCallback(
                    this, &ChannelTest::RPCThread, (brpc::ChannelBase*)&channel, async);
            EXPECT_EQ(0, pthread_create(&tids[i], NULL,
                                        RunClosure, thrd_func));
        }
        for (int i = 0; i < NUM; ++i) {
            pthread_join(tids[i], NULL);
        }
        
        if (short_connection) {
            EXPECT_EQ(NUM, auth.count.load());
        } else {
            EXPECT_EQ(1, auth.count.load());
        }
        StopAndJoin();
    }
    
    void TestRetry(bool single_server, bool async, bool short_connection) {
        std::cout << " *** single=" << single_server
                  << " async=" << async
                  << " short=" << short_connection << std::endl;

        ASSERT_EQ(0, StartAccept(_ep));
        brpc::Channel channel;
        SetUpChannel(&channel, single_server, short_connection);

        const int RETRY_NUM = 3;
        test::EchoRequest req;
        test::EchoResponse res;
        brpc::Controller cntl;
        req.set_message(__FUNCTION__);

        // No retry when timeout
        cntl.set_max_retry(RETRY_NUM);
        cntl.set_timeout_ms(10);  // 10ms
        req.set_sleep_us(70000); // 70ms
        CallMethod(&channel, &cntl, &req, &res, async);
        EXPECT_EQ(brpc::ERPCTIMEDOUT, cntl.ErrorCode()) << cntl.ErrorText();
        EXPECT_EQ(0, cntl.retried_count());
        bthread_usleep(100000);  // wait for the sleep task to finish

        // Retry when connection broken
        cntl.Reset();
        cntl.set_max_retry(RETRY_NUM);
        _close_fd_once = true;
        req.set_sleep_us(0);
        CallMethod(&channel, &cntl, &req, &res, async);

        if (short_connection) {
            // Always succeed
            EXPECT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText();
            EXPECT_EQ(1, cntl.retried_count());

            const int64_t start_time = butil::gettimeofday_us();
            while (_messenger.ConnectionCount() != 0) {
                EXPECT_LT(butil::gettimeofday_us(), start_time + 100000L/*100ms*/);
                bthread_usleep(1000);
            }
        } else {
            // May fail if health checker can't revive in time
            if (cntl.Failed()) {
                EXPECT_EQ(EHOSTDOWN, cntl.ErrorCode()) << single_server << ", " << async;
                EXPECT_EQ(RETRY_NUM, cntl.retried_count());
            } else {
                EXPECT_TRUE(cntl.retried_count() > 0);
            }
        }   
        StopAndJoin();
        bthread_usleep(100000);  // wait for stop
        
        // Retry when connection failed
        cntl.Reset();
        cntl.set_max_retry(RETRY_NUM);
        CallMethod(&channel, &cntl, &req, &res, async);
        EXPECT_EQ(EHOSTDOWN, cntl.ErrorCode());
        EXPECT_EQ(RETRY_NUM, cntl.retried_count());
    }

    void TestRetryOtherServer(bool async, bool short_connection) {
        ASSERT_EQ(0, StartAccept(_ep));

        brpc::Channel channel;
        brpc::ChannelOptions opt;
        if (short_connection) {
            opt.connection_type = brpc::CONNECTION_TYPE_SHORT;
        }
        butil::TempFile server_list;                                        
        EXPECT_EQ(0, server_list.save_format(
                      "127.0.0.1:100\n"
                      "127.0.0.1:200\n"
                      "%s", endpoint2str(_ep).c_str()));
        std::string naming_url = std::string("fIle://")
            + server_list.fname();
        EXPECT_EQ(0, channel.Init(naming_url.c_str(), "RR", &opt)); 

        const int RETRY_NUM = 3;
        test::EchoRequest req;
        test::EchoResponse res;
        brpc::Controller cntl;
        req.set_message(__FUNCTION__);
        cntl.set_max_retry(RETRY_NUM);
        CallMethod(&channel, &cntl, &req, &res, async);

        EXPECT_EQ(0, cntl.ErrorCode()) << async << ", " << short_connection;
        StopAndJoin();
    }

    butil::EndPoint _ep;
    butil::TempFile _server_list;                                        
    std::string _naming_url;
    
    brpc::Acceptor _messenger;
    // Dummy server for `Server::AddError'
    brpc::Server _dummy;
    std::string _mock_fail_str;

    bool _close_fd_once;
    
    MyEchoService _svc;
};

class MyShared : public brpc::SharedObject {
public:
    MyShared() { ++ nctor; }
    MyShared(const MyShared&) : brpc::SharedObject() { ++ nctor; }
    ~MyShared() { ++ ndtor; }

    static int nctor;
    static int ndtor;
};
int MyShared::nctor = 0;
int MyShared::ndtor = 0;

TEST_F(ChannelTest, intrusive_ptr_sanity) {
    MyShared::nctor = 0;
    MyShared::ndtor = 0;
    {
        MyShared* s1 = new MyShared;
        ASSERT_EQ(0, s1->ref_count());
        butil::intrusive_ptr<MyShared> p1 = s1;
        ASSERT_EQ(1, p1->ref_count());
        {
            butil::intrusive_ptr<MyShared> p2 = s1;
            ASSERT_EQ(2, p2->ref_count());
            ASSERT_EQ(2, p1->ref_count());
        }
        ASSERT_EQ(1, p1->ref_count());
    }
    ASSERT_EQ(1, MyShared::nctor);
    ASSERT_EQ(1, MyShared::ndtor);
}

TEST_F(ChannelTest, init_as_single_server) {
    {
        brpc::Channel channel;
        ASSERT_EQ(-1, channel.Init("127.0.0.1:12345:asdf", NULL));
        ASSERT_EQ(-1, channel.Init("127.0.0.1:99999", NULL)); 
        ASSERT_EQ(0, channel.Init("127.0.0.1:8888", NULL));
    }
    {
        brpc::Channel channel;
        ASSERT_EQ(-1, channel.Init("127.0.0.1asdf", 12345, NULL));
        ASSERT_EQ(-1, channel.Init("127.0.0.1", 99999, NULL));
        ASSERT_EQ(0, channel.Init("127.0.0.1", 8888, NULL));
    }

    butil::EndPoint ep;
    brpc::Channel channel;
    ASSERT_EQ(0, str2endpoint("127.0.0.1:8888", &ep));
    ASSERT_EQ(0, channel.Init(ep, NULL));
    ASSERT_TRUE(channel.SingleServer());
    ASSERT_EQ(ep, channel._server_address);

    brpc::SocketId id;
    ASSERT_EQ(0, brpc::SocketMapFind(brpc::SocketMapKey(ep), &id));
    ASSERT_EQ(id, channel._server_id);

    const int NUM = 10;
    brpc::Channel channels[NUM];
    for (int i = 0; i < 10; ++i) {
        ASSERT_EQ(0, channels[i].Init(ep, NULL));
        // Share the same server socket
        ASSERT_EQ(id, channels[i]._server_id);
    }
}

TEST_F(ChannelTest, init_using_unknown_naming_service) {
    brpc::Channel channel;
    ASSERT_EQ(-1, channel.Init("unknown://unknown", "unknown", NULL));
}

TEST_F(ChannelTest, init_using_unexist_fns) {
    brpc::Channel channel;
    ASSERT_EQ(-1, channel.Init("fiLe://no_such_file", "rr", NULL));
}

TEST_F(ChannelTest, init_using_empty_fns) {
    brpc::ChannelOptions opt;
    opt.succeed_without_server = false;
    brpc::Channel channel;
    butil::TempFile server_list;
    ASSERT_EQ(0, server_list.save(""));
    std::string naming_url = std::string("file://") + server_list.fname();
    // empty file list results in error.
    ASSERT_EQ(-1, channel.Init(naming_url.c_str(), "rr", &opt));

    ASSERT_EQ(0, server_list.save("blahblah"));
    // No valid address.
    ASSERT_EQ(-1, channel.Init(naming_url.c_str(), "rr", NULL));
}

TEST_F(ChannelTest, init_using_empty_lns) {
    brpc::ChannelOptions opt;
    opt.succeed_without_server = false;
    brpc::Channel channel;
    ASSERT_EQ(-1, channel.Init("list:// ", "rr", &opt));
    ASSERT_EQ(-1, channel.Init("list://", "rr", &opt));
    ASSERT_EQ(-1, channel.Init("list://blahblah", "rr", &opt)); 
}

TEST_F(ChannelTest, init_using_naming_service) {
    brpc::Channel* channel = new brpc::Channel();
    butil::TempFile server_list;
    ASSERT_EQ(0, server_list.save("127.0.0.1:8888"));
    std::string naming_url = std::string("filE://") + server_list.fname();
    // Rr are intended to test case-insensitivity.
    ASSERT_EQ(0, channel->Init(naming_url.c_str(), "Rr", NULL));
    ASSERT_FALSE(channel->SingleServer());

    brpc::LoadBalancerWithNaming* lb =
        dynamic_cast<brpc::LoadBalancerWithNaming*>(channel->_lb.get());
    ASSERT_TRUE(lb != NULL);
    brpc::NamingServiceThread* ns = lb->_nsthread_ptr.get();

    {
        const int NUM = 10;
        brpc::Channel channels[NUM];
        for (int i = 0; i < NUM; ++i) {
            // Share the same naming thread
            ASSERT_EQ(0, channels[i].Init(naming_url.c_str(), "rr", NULL));
            brpc::LoadBalancerWithNaming* lb2 =
                dynamic_cast<brpc::LoadBalancerWithNaming*>(channels[i]._lb.get());
            ASSERT_TRUE(lb2 != NULL);
            ASSERT_EQ(ns, lb2->_nsthread_ptr.get());
        }
    }

    // `lb' should be valid even if `channel' has destroyed
    // since we hold another reference to it
    butil::intrusive_ptr<brpc::SharedLoadBalancer>
        another_ctx = channel->_lb;
    delete channel;
    ASSERT_EQ(lb, another_ctx.get());
    ASSERT_EQ(1, another_ctx->_nref.load());
    // `lb' should be destroyed after
}

TEST_F(ChannelTest, connection_failed) {
    for (int i = 0; i <= 1; ++i) { // Flag SingleServer 
        for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
            for (int k = 0; k <=1; ++k) { // Flag ShortConnection
                TestConnectionFailed(i, j, k);
            }
        }
    }
}

TEST_F(ChannelTest, empty_parallel_channel) {
    brpc::ParallelChannel channel;

    brpc::Controller cntl;
    test::EchoRequest req;
    test::EchoResponse res;
    req.set_message(__FUNCTION__);
    CallMethod(&channel, &cntl, &req, &res, false);
    EXPECT_EQ(EPERM, cntl.ErrorCode()) << cntl.ErrorText();
}

TEST_F(ChannelTest, empty_selective_channel) {
    brpc::SelectiveChannel channel;
    ASSERT_EQ(0, channel.Init("rr", NULL));

    brpc::Controller cntl;
    test::EchoRequest req;
    test::EchoResponse res;
    req.set_message(__FUNCTION__);
    CallMethod(&channel, &cntl, &req, &res, false);
    EXPECT_EQ(ENODATA, cntl.ErrorCode()) << cntl.ErrorText();
}

class BadCall : public brpc::CallMapper {
    brpc::SubCall Map(int,
                     const google::protobuf::MethodDescriptor*,
                     const google::protobuf::Message*,
                     google::protobuf::Message*) {
        return brpc::SubCall::Bad();
    }
};

TEST_F(ChannelTest, returns_bad_parallel) {
    const size_t NCHANS = 5;
    brpc::ParallelChannel channel;
    for (size_t i = 0; i < NCHANS; ++i) {
        brpc::Channel* subchan = new brpc::Channel();
        SetUpChannel(subchan, true, false);
        ASSERT_EQ(0, channel.AddChannel(
                      subchan, brpc::OWNS_CHANNEL, new BadCall, NULL));
    }
                
    brpc::Controller cntl;
    test::EchoRequest req;
    test::EchoResponse res;
    req.set_message(__FUNCTION__);
    CallMethod(&channel, &cntl, &req, &res, false);
    EXPECT_EQ(brpc::EREQUEST, cntl.ErrorCode()) << cntl.ErrorText();
}

class SkipCall : public brpc::CallMapper {
    brpc::SubCall Map(int,
                     const google::protobuf::MethodDescriptor*,
                     const google::protobuf::Message*,
                     google::protobuf::Message*) {
        return brpc::SubCall::Skip();
    }
};

TEST_F(ChannelTest, skip_all_channels) {
    const size_t NCHANS = 5;
    brpc::ParallelChannel channel;
    for (size_t i = 0; i < NCHANS; ++i) {
        brpc::Channel* subchan = new brpc::Channel();
        SetUpChannel(subchan, true, false);
        ASSERT_EQ(0, channel.AddChannel(
                      subchan, brpc::OWNS_CHANNEL, new SkipCall, NULL));
    }
                
    brpc::Controller cntl;
    test::EchoRequest req;
    test::EchoResponse res;
    req.set_message(__FUNCTION__);
    CallMethod(&channel, &cntl, &req, &res, false);
        
    EXPECT_EQ(ECANCELED, cntl.ErrorCode()) << cntl.ErrorText();
    EXPECT_EQ((int)NCHANS, cntl.sub_count());
    for (int i = 0; i < cntl.sub_count(); ++i) {
        EXPECT_TRUE(NULL == cntl.sub(i)) << "i=" << i;
    }
}

TEST_F(ChannelTest, connection_failed_parallel) {
    for (int i = 0; i <= 1; ++i) { // Flag SingleServer 
        for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
            for (int k = 0; k <=1; ++k) { // Flag ShortConnection
                TestConnectionFailedParallel(i, j, k);
            }
        }
    }
}

TEST_F(ChannelTest, connection_failed_selective) {
    for (int i = 0; i <= 1; ++i) { // Flag SingleServer 
        for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
            for (int k = 0; k <=1; ++k) { // Flag ShortConnection
                TestConnectionFailedSelective(i, j, k);
            }
        }
    }
}

TEST_F(ChannelTest, success) {
    for (int i = 0; i <= 1; ++i) { // Flag SingleServer 
        for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
            for (int k = 0; k <=1; ++k) { // Flag ShortConnection
                TestSuccess(i, j, k);
            }
        }
    }
}

TEST_F(ChannelTest, success_parallel) {
    for (int i = 0; i <= 1; ++i) { // Flag SingleServer 
        for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
            for (int k = 0; k <=1; ++k) { // Flag ShortConnection
                TestSuccessParallel(i, j, k);
            }
        }
    }
}

TEST_F(ChannelTest, success_duplicated_parallel) {
    for (int i = 0; i <= 1; ++i) { // Flag SingleServer 
        for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
            for (int k = 0; k <=1; ++k) { // Flag ShortConnection
                TestSuccessDuplicatedParallel(i, j, k);
            }
        }
    }
}

TEST_F(ChannelTest, success_selective) {
    for (int i = 0; i <= 1; ++i) { // Flag SingleServer 
        for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
            for (int k = 0; k <=1; ++k) { // Flag ShortConnection
                TestSuccessSelective(i, j, k);
            }
        }
    }
}

TEST_F(ChannelTest, skip_parallel) {
    for (int i = 0; i <= 1; ++i) { // Flag SingleServer 
        for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
            for (int k = 0; k <=1; ++k) { // Flag ShortConnection
                TestSkipParallel(i, j, k);
            }
        }
    }
}

TEST_F(ChannelTest, success_parallel2) {
    for (int i = 0; i <= 1; ++i) { // Flag SingleServer 
        for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
            for (int k = 0; k <=1; ++k) { // Flag ShortConnection
                TestSuccessParallel2(i, j, k);
            }
        }
    }
}

TEST_F(ChannelTest, cancel_before_callmethod) {
    for (int i = 0; i <= 1; ++i) { // Flag SingleServer 
        for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
            for (int k = 0; k <=1; ++k) { // Flag ShortConnection
                CancelBeforeCallMethod(i, j, k);
            }
        }
    }
}

TEST_F(ChannelTest, cancel_before_callmethod_parallel) {
    for (int i = 0; i <= 1; ++i) { // Flag SingleServer 
        for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
            for (int k = 0; k <=1; ++k) { // Flag ShortConnection
                CancelBeforeCallMethodParallel(i, j, k);
            }
        }
    }
}

TEST_F(ChannelTest, cancel_before_callmethod_selective) {
    for (int i = 0; i <= 1; ++i) { // Flag SingleServer 
        for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
            for (int k = 0; k <=1; ++k) { // Flag ShortConnection
                CancelBeforeCallMethodSelective(i, j, k);
            }
        }
    }
}

TEST_F(ChannelTest, cancel_during_callmethod) {
    for (int i = 0; i <= 1; ++i) { // Flag SingleServer 
        for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
            for (int k = 0; k <=1; ++k) { // Flag ShortConnection
                CancelDuringCallMethod(i, j, k);
            }
        }
    }
}

TEST_F(ChannelTest, cancel_during_callmethod_parallel) {
    for (int i = 0; i <= 1; ++i) { // Flag SingleServer 
        for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
            for (int k = 0; k <=1; ++k) { // Flag ShortConnection
                CancelDuringCallMethodParallel(i, j, k);
            }
        }
    }
}

TEST_F(ChannelTest, cancel_during_callmethod_selective) {
    for (int i = 0; i <= 1; ++i) { // Flag SingleServer 
        for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
            for (int k = 0; k <=1; ++k) { // Flag ShortConnection
                CancelDuringCallMethodSelective(i, j, k);
            }
        }
    }
}

TEST_F(ChannelTest, cancel_after_callmethod) {
    for (int i = 0; i <= 1; ++i) { // Flag SingleServer 
        for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
            for (int k = 0; k <=1; ++k) { // Flag ShortConnection
                CancelAfterCallMethod(i, j, k);
            }
        }
    }
}

TEST_F(ChannelTest, cancel_after_callmethod_parallel) {
    for (int i = 0; i <= 1; ++i) { // Flag SingleServer 
        for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
            for (int k = 0; k <=1; ++k) { // Flag ShortConnection
                CancelAfterCallMethodParallel(i, j, k);
            }
        }
    }
}

TEST_F(ChannelTest, request_not_init) {
    for (int i = 0; i <= 1; ++i) { // Flag SingleServer 
        for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
            for (int k = 0; k <=1; ++k) { // Flag ShortConnection
                TestRequestNotInit(i, j, k);
            }
        }
    }
}

TEST_F(ChannelTest, request_not_init_parallel) {
    for (int i = 0; i <= 1; ++i) { // Flag SingleServer 
        for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
            for (int k = 0; k <=1; ++k) { // Flag ShortConnection
                TestRequestNotInitParallel(i, j, k);
            }
        }
    }
}

TEST_F(ChannelTest, request_not_init_selective) {
    for (int i = 0; i <= 1; ++i) { // Flag SingleServer 
        for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
            for (int k = 0; k <=1; ++k) { // Flag ShortConnection
                TestRequestNotInitSelective(i, j, k);
            }
        }
    }
}

TEST_F(ChannelTest, timeout) {
    for (int i = 0; i <= 1; ++i) { // Flag SingleServer 
        for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
            for (int k = 0; k <=1; ++k) { // Flag ShortConnection
                TestRPCTimeout(i, j, k);
            }
        }
    }
}

TEST_F(ChannelTest, timeout_parallel) {
    for (int i = 0; i <= 1; ++i) { // Flag SingleServer 
        for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
            for (int k = 0; k <=1; ++k) { // Flag ShortConnection
                TestRPCTimeoutParallel(i, j, k);
            }
        }
    }
}

TEST_F(ChannelTest, timeout_still_checks_sub_channels_parallel) {
    for (int i = 0; i <= 1; ++i) { // Flag SingleServer 
        for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
            for (int k = 0; k <=1; ++k) { // Flag ShortConnection
                TimeoutStillChecksSubChannelsParallel(i, j, k);
            }
        }
    }
}

TEST_F(ChannelTest, timeout_selective) {
    for (int i = 0; i <= 1; ++i) { // Flag SingleServer 
        for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
            for (int k = 0; k <=1; ++k) { // Flag ShortConnection
                TestRPCTimeoutSelective(i, j, k);
            }
        }
    }
}

TEST_F(ChannelTest, close_fd) {
    for (int i = 0; i <= 1; ++i) { // Flag SingleServer 
        for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
            for (int k = 0; k <=1; ++k) { // Flag ShortConnection
                TestCloseFD(i, j, k);
            }
        }
    }
}

TEST_F(ChannelTest, close_fd_parallel) {
    for (int i = 0; i <= 1; ++i) { // Flag SingleServer 
        for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
            for (int k = 0; k <=1; ++k) { // Flag ShortConnection
                TestCloseFDParallel(i, j, k);
            }
        }
    }
}

TEST_F(ChannelTest, close_fd_selective) {
    for (int i = 0; i <= 1; ++i) { // Flag SingleServer 
        for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
            for (int k = 0; k <=1; ++k) { // Flag ShortConnection
                TestCloseFDSelective(i, j, k);
            }
        }
    }
}

TEST_F(ChannelTest, server_fail) {
    for (int i = 0; i <= 1; ++i) { // Flag SingleServer 
        for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
            for (int k = 0; k <=1; ++k) { // Flag ShortConnection
                TestServerFail(i, j, k);
            }
        }
    }
}

TEST_F(ChannelTest, server_fail_parallel) {
    for (int i = 0; i <= 1; ++i) { // Flag SingleServer 
        for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
            for (int k = 0; k <=1; ++k) { // Flag ShortConnection
                TestServerFailParallel(i, j, k);
            }
        }
    }
}

TEST_F(ChannelTest, server_fail_selective) {
    for (int i = 0; i <= 1; ++i) { // Flag SingleServer 
        for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
            for (int k = 0; k <=1; ++k) { // Flag ShortConnection
                TestServerFailSelective(i, j, k);
            }
        }
    }
}

TEST_F(ChannelTest, authentication) {
    for (int i = 0; i <= 1; ++i) { // Flag SingleServer 
        for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
            for (int k = 0; k <=1; ++k) { // Flag ShortConnection
                TestAuthentication(i, j, k);
            }
        }
    }
}

TEST_F(ChannelTest, authentication_parallel) {
    for (int i = 0; i <= 1; ++i) { // Flag SingleServer 
        for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
            for (int k = 0; k <=1; ++k) { // Flag ShortConnection
                TestAuthenticationParallel(i, j, k);
            }
        }
    }
}

TEST_F(ChannelTest, authentication_selective) {
    for (int i = 0; i <= 1; ++i) { // Flag SingleServer 
        for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
            for (int k = 0; k <=1; ++k) { // Flag ShortConnection
                TestAuthenticationSelective(i, j, k);
            }
        }
    }
}

TEST_F(ChannelTest, retry) {
    for (int i = 0; i <= 1; ++i) { // Flag SingleServer 
        for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
            for (int k = 0; k <=1; ++k) { // Flag ShortConnection
                TestRetry(i, j, k);
            }
        }
    }
}

TEST_F(ChannelTest, retry_other_servers) {
    for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
        for (int k = 0; k <=1; ++k) { // Flag ShortConnection
            TestRetryOtherServer(j, k);
        }
    }
}

TEST_F(ChannelTest, multiple_threads_single_channel) {
    srand(time(NULL));
    ASSERT_EQ(0, StartAccept(_ep));
    MyAuthenticator auth;
    const int NUM = 10;
    const int COUNT = 10000;
    pthread_t tids[NUM];

    // Cause massive connect/close log if setting to true
    bool short_connection = false;
    for (int single_server = 0; single_server <= 1; ++single_server) {
        for (int need_auth = 0; need_auth <= 1; ++need_auth) {
            for (int async = 0; async <= 1; ++async) {
                std::cout << " *** short=" << short_connection
                          << " single=" << single_server
                          << " auth=" << need_auth
                          << " async=" << async << std::endl;
                brpc::Channel channel;
                SetUpChannel(&channel, single_server, 
                             short_connection, (need_auth ? &auth : NULL));
                for (int i = 0; i < NUM; ++i) {
                    google::protobuf::Closure* thrd_func = 
                        brpc::NewCallback(
                            this, &ChannelTest::RPCThread, 
                            (brpc::ChannelBase*)&channel,
                            (bool)async, COUNT);
                    EXPECT_EQ(0, pthread_create(&tids[i], NULL,
                                                RunClosure, thrd_func));
                }
                for (int i = 0; i < NUM; ++i) {
                    pthread_join(tids[i], NULL);
                }
            }
        }
    }
}

TEST_F(ChannelTest, multiple_threads_multiple_channels) {
    srand(time(NULL));
    ASSERT_EQ(0, StartAccept(_ep));
    MyAuthenticator auth;
    const int NUM = 10;
    const int COUNT = 10000;
    pthread_t tids[NUM];

    // Cause massive connect/close log if setting to true
    bool short_connection = false;

    for (int single_server = 0; single_server <= 1; ++single_server) {
        for (int need_auth = 0; need_auth <= 1; ++need_auth) {
            for (int async = 0; async <= 1; ++async) {
                std::cout << " *** short=" << short_connection
                          << " single=" << single_server
                          << " auth=" << need_auth
                          << " async=" << async << std::endl;
                for (int i = 0; i < NUM; ++i) {
                    google::protobuf::Closure* thrd_func = 
                        brpc::NewCallback<
                        ChannelTest, ChannelTest*,
                        bool, bool, bool, const brpc::Authenticator*, int>
                        (this, &ChannelTest::RPCThread, single_server,
                         async, short_connection, (need_auth ? &auth : NULL), COUNT);
                    EXPECT_EQ(0, pthread_create(&tids[i], NULL,
                                                RunClosure, thrd_func));
                }
                for (int i = 0; i < NUM; ++i) {
                    pthread_join(tids[i], NULL);
                }
            }
        }
    }
}

TEST_F(ChannelTest, clear_attachment_after_retry) {
    for (int j = 0; j <= 1; ++j) {
        for (int k = 0; k <= 1; ++k) {
            TestAttachment(j, k);
        }
    }
}

TEST_F(ChannelTest, destroy_channel) {
    for (int i = 0; i <= 1; ++i) {
        for (int j = 0; j <= 1; ++j) {
            TestDestroyChannel(i, j);
        }
    }
}

TEST_F(ChannelTest, destroy_channel_parallel) {
    for (int i = 0; i <= 1; ++i) {
        for (int j = 0; j <= 1; ++j) {
            TestDestroyChannelParallel(i, j);
        }
    }
}

TEST_F(ChannelTest, destroy_channel_selective) {
    for (int i = 0; i <= 1; ++i) {
        for (int j = 0; j <= 1; ++j) {
            TestDestroyChannelSelective(i, j);
        }
    }
}

TEST_F(ChannelTest, sizeof) {
    LOG(INFO) << "Size of Channel is " << sizeof(brpc::Channel)
               << ", Size of ParallelChannel is " << sizeof(brpc::ParallelChannel)
               << ", Size of Controller is " << sizeof(brpc::Controller)
               << ", Size of vector is " << sizeof(std::vector<brpc::Controller>);
}

brpc::Channel g_chan;

TEST_F(ChannelTest, global_channel_should_quit_successfully) {
    g_chan.Init("bns://qa-pbrpc.SAT.tjyx", "rr", NULL);
}

TEST_F(ChannelTest, unused_call_id) {
    {
        brpc::Controller cntl;
    }
    {
        brpc::Controller cntl;
        cntl.Reset();
    }
    brpc::CallId cid1 = { 0 };
    {
        brpc::Controller cntl;
        cid1 = cntl.call_id();
    }
    ASSERT_EQ(EINVAL, bthread_id_error(cid1, ECANCELED));

    {
        brpc::CallId cid2 = { 0 };
        brpc::Controller cntl;
        cid2 = cntl.call_id();
        cntl.Reset();
        ASSERT_EQ(EINVAL, bthread_id_error(cid2, ECANCELED));
    }
}

TEST_F(ChannelTest, adaptive_connection_type) {
    brpc::AdaptiveConnectionType ctype;
    ASSERT_EQ(brpc::CONNECTION_TYPE_UNKNOWN, ctype);
    ASSERT_FALSE(ctype.has_error());
    ASSERT_STREQ("unknown", ctype.name());

    ctype = brpc::CONNECTION_TYPE_SINGLE;
    ASSERT_EQ(brpc::CONNECTION_TYPE_SINGLE, ctype);
    ASSERT_STREQ("single", ctype.name());

    ctype = "shorT";
    ASSERT_EQ(brpc::CONNECTION_TYPE_SHORT, ctype);
    ASSERT_STREQ("short", ctype.name());
    
    ctype = "PooLed";
    ASSERT_EQ(brpc::CONNECTION_TYPE_POOLED, ctype);
    ASSERT_STREQ("pooled", ctype.name());

    ctype = "SINGLE";
    ASSERT_EQ(brpc::CONNECTION_TYPE_SINGLE, ctype);
    ASSERT_FALSE(ctype.has_error());
    ASSERT_STREQ("single", ctype.name());

    ctype = "blah";
    ASSERT_EQ(brpc::CONNECTION_TYPE_UNKNOWN, ctype);
    ASSERT_TRUE(ctype.has_error());
    ASSERT_STREQ("unknown", ctype.name());

    ctype = "single";
    ASSERT_EQ(brpc::CONNECTION_TYPE_SINGLE, ctype);
    ASSERT_FALSE(ctype.has_error());
    ASSERT_STREQ("single", ctype.name());
}

TEST_F(ChannelTest, adaptive_protocol_type) {
    brpc::AdaptiveProtocolType ptype;
    ASSERT_EQ(brpc::PROTOCOL_UNKNOWN, ptype);
    ASSERT_STREQ("unknown", ptype.name());
    ASSERT_FALSE(ptype.has_param());
    ASSERT_EQ("", ptype.param());

    ptype = brpc::PROTOCOL_HTTP;
    ASSERT_EQ(brpc::PROTOCOL_HTTP, ptype);
    ASSERT_STREQ("http", ptype.name());
    ASSERT_FALSE(ptype.has_param());
    ASSERT_EQ("", ptype.param());

    ptype = "http:xyz ";
    ASSERT_EQ(brpc::PROTOCOL_HTTP, ptype);
    ASSERT_STREQ("http", ptype.name());
    ASSERT_TRUE(ptype.has_param());
    ASSERT_EQ("xyz ", ptype.param());

    ptype = "HuLu_pbRPC";
    ASSERT_EQ(brpc::PROTOCOL_HULU_PBRPC, ptype);
    ASSERT_STREQ("hulu_pbrpc", ptype.name());
    ASSERT_FALSE(ptype.has_param());
    ASSERT_EQ("", ptype.param());
    
    ptype = "blah";
    ASSERT_EQ(brpc::PROTOCOL_UNKNOWN, ptype);
    ASSERT_STREQ("blah", ptype.name());
    ASSERT_FALSE(ptype.has_param());
    ASSERT_EQ("", ptype.param());

    ptype = "Baidu_STD";
    ASSERT_EQ(brpc::PROTOCOL_BAIDU_STD, ptype);
    ASSERT_STREQ("baidu_std", ptype.name());
    ASSERT_FALSE(ptype.has_param());
    ASSERT_EQ("", ptype.param());
}

} //namespace