// brpc - A framework to host and access services throughout Baidu.
// Copyright (c) 2014 Baidu, Inc.

// Date: Sun Jul 13 15:04:18 CST 2014

#include <sys/ioctl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <gtest/gtest.h>
#include <gflags/gflags.h>
#include <google/protobuf/descriptor.h>
#include "butil/time.h"
#include "butil/macros.h"
#include "butil/gperftools_profiler.h"
#include "brpc/socket.h"
#include "brpc/acceptor.h"
#include "brpc/server.h"
#include "brpc/policy/hulu_pbrpc_meta.pb.h"
#include "brpc/policy/hulu_pbrpc_protocol.h"
#include "brpc/policy/most_common_message.h"
#include "brpc/controller.h"
#include "echo.pb.h"

int main(int argc, char* argv[]) {
    testing::InitGoogleTest(&argc, argv);
    GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
    return RUN_ALL_TESTS();
}

namespace {

static const std::string EXP_REQUEST = "hello";
static const std::string EXP_RESPONSE = "world";

static const std::string MOCK_CREDENTIAL = "mock credential";
static const std::string MOCK_USER = "mock user";

class MyAuthenticator : public brpc::Authenticator {
public:
    MyAuthenticator() {}

    int GenerateCredential(std::string* auth_str) const {
        *auth_str = MOCK_CREDENTIAL;
        return 0;
    }

    int VerifyCredential(const std::string& auth_str,
                         const butil::EndPoint&,
                         brpc::AuthContext* ctx) const {
        EXPECT_EQ(MOCK_CREDENTIAL, auth_str);
        ctx->set_user(MOCK_USER);
        return 0;
    }
};

class MyEchoService : public ::test::EchoService {
    void Echo(::google::protobuf::RpcController* cntl_base,
              const ::test::EchoRequest* req,
              ::test::EchoResponse* res,
              ::google::protobuf::Closure* done) {
        brpc::Controller* cntl =
            static_cast<brpc::Controller*>(cntl_base);
        brpc::ClosureGuard done_guard(done);

        if (req->close_fd()) {
            cntl->CloseConnection("Close connection according to request");
            return;
        }
        if (cntl->auth_context()) {
            EXPECT_EQ(MOCK_USER, cntl->auth_context()->user());
        }
        EXPECT_EQ(EXP_REQUEST, req->message());
        if (!cntl->request_attachment().empty()) {
            EXPECT_EQ(EXP_REQUEST, cntl->request_attachment().to_string());
            cntl->response_attachment().append(EXP_RESPONSE);
        }
        res->set_message(EXP_RESPONSE);
    }
};
    
class HuluTest : public ::testing::Test{
protected:
    HuluTest() {
        EXPECT_EQ(0, _server.AddService(
            &_svc, brpc::SERVER_DOESNT_OWN_SERVICE));
        // Hack: Regard `_server' as running 
        _server._status = brpc::Server::RUNNING;
        _server._options.auth = &_auth;
        
        EXPECT_EQ(0, pipe(_pipe_fds));

        brpc::SocketId id;
        brpc::SocketOptions options;
        options.fd = _pipe_fds[1];
        EXPECT_EQ(0, brpc::Socket::Create(options, &id));
        EXPECT_EQ(0, brpc::Socket::Address(id, &_socket));
    };

    virtual ~HuluTest() {};
    virtual void SetUp() {};
    virtual void TearDown() {};

    void VerifyMessage(brpc::InputMessageBase* msg) {
        if (msg->_socket == NULL) {
            _socket->ReAddress(&msg->_socket);
        }
        msg->_arg = &_server;
        EXPECT_TRUE(brpc::policy::VerifyHuluRequest(msg));
    }

    void ProcessMessage(void (*process)(brpc::InputMessageBase*),
                        brpc::InputMessageBase* msg, bool set_eof) {
        if (msg->_socket == NULL) {
            _socket.get()->ReAddress(&msg->_socket);
        }
        msg->_arg = &_server;
        _socket->PostponeEOF();
        if (set_eof) {
            _socket->SetEOF();
        }
        (*process)(msg);
    }

    brpc::policy::MostCommonMessage* MakeRequestMessage(
        const brpc::policy::HuluRpcRequestMeta& meta) {
        brpc::policy::MostCommonMessage* msg =
                brpc::policy::MostCommonMessage::Get();
        butil::IOBufAsZeroCopyOutputStream meta_stream(&msg->meta);
        EXPECT_TRUE(meta.SerializeToZeroCopyStream(&meta_stream));

        test::EchoRequest req;
        req.set_message(EXP_REQUEST);
        butil::IOBufAsZeroCopyOutputStream req_stream(&msg->payload);
        EXPECT_TRUE(req.SerializeToZeroCopyStream(&req_stream));
        return msg;
    }

    brpc::policy::MostCommonMessage* MakeResponseMessage(
        const brpc::policy::HuluRpcResponseMeta& meta) {
        brpc::policy::MostCommonMessage* msg =
                brpc::policy::MostCommonMessage::Get();
        butil::IOBufAsZeroCopyOutputStream meta_stream(&msg->meta);
        EXPECT_TRUE(meta.SerializeToZeroCopyStream(&meta_stream));

        test::EchoResponse res;
        res.set_message(EXP_RESPONSE);
        butil::IOBufAsZeroCopyOutputStream res_stream(&msg->payload);
        EXPECT_TRUE(res.SerializeToZeroCopyStream(&res_stream));
        return msg;
    }

    void CheckResponseCode(bool expect_empty, int expect_code) {
        int bytes_in_pipe = 0;
        ioctl(_pipe_fds[0], FIONREAD, &bytes_in_pipe);
        if (expect_empty) {
            EXPECT_EQ(0, bytes_in_pipe);
            return;
        }

        EXPECT_GT(bytes_in_pipe, 0);
        butil::IOPortal buf;
        EXPECT_EQ((ssize_t)bytes_in_pipe,
                  buf.append_from_file_descriptor(_pipe_fds[0], 1024));
        brpc::ParseResult pr = brpc::policy::ParseHuluMessage(&buf, NULL, false, NULL);
        EXPECT_EQ(brpc::PARSE_OK, pr.error());
        brpc::policy::MostCommonMessage* msg =
            static_cast<brpc::policy::MostCommonMessage*>(pr.message());

        brpc::policy::HuluRpcResponseMeta meta;
        butil::IOBufAsZeroCopyInputStream meta_stream(msg->meta);
        EXPECT_TRUE(meta.ParseFromZeroCopyStream(&meta_stream));
        EXPECT_EQ(expect_code, meta.error_code());
    }

    void TestHuluCompress(brpc::CompressType type) {
        butil::IOBuf request_buf;
        butil::IOBuf total_buf;
        brpc::Controller cntl;
        test::EchoRequest req;
        test::EchoResponse res;
        cntl._response = &res;

        req.set_message(EXP_REQUEST);
        cntl.set_request_compress_type(type);
        brpc::SerializeRequestDefault(&request_buf, &cntl, &req);
        ASSERT_FALSE(cntl.Failed());
        brpc::policy::PackHuluRequest(
            &total_buf, NULL, cntl.call_id().value,
            test::EchoService::descriptor()->method(0),
            &cntl, request_buf, &_auth);
        ASSERT_FALSE(cntl.Failed());

        brpc::ParseResult req_pr =
                brpc::policy::ParseHuluMessage(&total_buf, NULL, false, NULL);
        ASSERT_EQ(brpc::PARSE_OK, req_pr.error());
        brpc::InputMessageBase* req_msg = req_pr.message();
        ProcessMessage(brpc::policy::ProcessHuluRequest, req_msg, false);
        CheckResponseCode(false, 0);
    }

    int _pipe_fds[2];
    brpc::SocketUniquePtr _socket;
    brpc::Server _server;

    MyEchoService _svc;
    MyAuthenticator _auth;
};

TEST_F(HuluTest, process_request_failed_socket) {
    brpc::policy::HuluRpcRequestMeta meta;
    meta.set_service_name("EchoService");
    meta.set_method_index(0);
    brpc::policy::MostCommonMessage* msg = MakeRequestMessage(meta);
    _socket->SetFailed();
    ProcessMessage(brpc::policy::ProcessHuluRequest, msg, false);
    ASSERT_EQ(0ll, _server._nerror.get_value());
    CheckResponseCode(true, 0);
}

TEST_F(HuluTest, process_request_logoff) {
    brpc::policy::HuluRpcRequestMeta meta;
    meta.set_service_name("EchoService");
    meta.set_method_index(0);
    brpc::policy::MostCommonMessage* msg = MakeRequestMessage(meta);
    _server._status = brpc::Server::READY;
    ProcessMessage(brpc::policy::ProcessHuluRequest, msg, false);
    ASSERT_EQ(1ll, _server._nerror.get_value());
    CheckResponseCode(false, brpc::ELOGOFF);
}

TEST_F(HuluTest, process_request_wrong_method) {
    brpc::policy::HuluRpcRequestMeta meta;
    meta.set_service_name("EchoService");
    meta.set_method_index(10);
    brpc::policy::MostCommonMessage* msg = MakeRequestMessage(meta);
    ProcessMessage(brpc::policy::ProcessHuluRequest, msg, false);
    ASSERT_EQ(1ll, _server._nerror.get_value());
    CheckResponseCode(false, brpc::ENOMETHOD);
}

TEST_F(HuluTest, process_response_after_eof) {
    brpc::policy::HuluRpcResponseMeta meta;
    test::EchoResponse res;
    brpc::Controller cntl;
    meta.set_correlation_id(cntl.call_id().value);
    cntl._response = &res;
    brpc::policy::MostCommonMessage* msg = MakeResponseMessage(meta);
    ProcessMessage(brpc::policy::ProcessHuluResponse, msg, true);
    ASSERT_EQ(EXP_RESPONSE, res.message());
    ASSERT_TRUE(_socket->Failed());
}

TEST_F(HuluTest, process_response_error_code) {
    const int ERROR_CODE = 12345;
    brpc::policy::HuluRpcResponseMeta meta;
    brpc::Controller cntl;
    meta.set_correlation_id(cntl.call_id().value);
    meta.set_error_code(ERROR_CODE);
    brpc::policy::MostCommonMessage* msg = MakeResponseMessage(meta);
    ProcessMessage(brpc::policy::ProcessHuluResponse, msg, false);
    ASSERT_EQ(ERROR_CODE, cntl.ErrorCode());
}

TEST_F(HuluTest, complete_flow) {
    butil::IOBuf request_buf;
    butil::IOBuf total_buf;
    brpc::Controller cntl;
    test::EchoRequest req;
    test::EchoResponse res;
    cntl._response = &res;

    // Send request
    req.set_message(EXP_REQUEST);
    brpc::SerializeRequestDefault(&request_buf, &cntl, &req);
    ASSERT_FALSE(cntl.Failed());
    cntl.request_attachment().append(EXP_REQUEST);
    brpc::policy::PackHuluRequest(
        &total_buf, NULL, cntl.call_id().value,
        test::EchoService::descriptor()->method(0), &cntl, request_buf, &_auth);
    ASSERT_FALSE(cntl.Failed());

    // Verify and handle request
    brpc::ParseResult req_pr =
            brpc::policy::ParseHuluMessage(&total_buf, NULL, false, NULL);
    ASSERT_EQ(brpc::PARSE_OK, req_pr.error());
    brpc::InputMessageBase* req_msg = req_pr.message();
    VerifyMessage(req_msg);
    ProcessMessage(brpc::policy::ProcessHuluRequest, req_msg, false);

    // Read response from pipe
    butil::IOPortal response_buf;
    response_buf.append_from_file_descriptor(_pipe_fds[0], 1024);
    brpc::ParseResult res_pr =
            brpc::policy::ParseHuluMessage(&response_buf, NULL, false, NULL);
    ASSERT_EQ(brpc::PARSE_OK, res_pr.error());
    brpc::InputMessageBase* res_msg = res_pr.message();
    ProcessMessage(brpc::policy::ProcessHuluResponse, res_msg, false);

    ASSERT_FALSE(cntl.Failed());
    ASSERT_EQ(EXP_RESPONSE, res.message());
}

TEST_F(HuluTest, close_in_callback) {
    butil::IOBuf request_buf;
    butil::IOBuf total_buf;
    brpc::Controller cntl;
    test::EchoRequest req;

    // Send request
    req.set_message(EXP_REQUEST);
    req.set_close_fd(true);
    brpc::SerializeRequestDefault(&request_buf, &cntl, &req);
    ASSERT_FALSE(cntl.Failed());
    brpc::policy::PackHuluRequest(
        &total_buf, NULL, cntl.call_id().value,
        test::EchoService::descriptor()->method(0), &cntl, request_buf, &_auth);
    ASSERT_FALSE(cntl.Failed());

    // Handle request
    brpc::ParseResult req_pr =
            brpc::policy::ParseHuluMessage(&total_buf, NULL, false, NULL);
    ASSERT_EQ(brpc::PARSE_OK, req_pr.error());
    brpc::InputMessageBase* req_msg = req_pr.message();
    ProcessMessage(brpc::policy::ProcessHuluRequest, req_msg, false);

    // Socket should be closed
    ASSERT_TRUE(_socket->Failed());
}

TEST_F(HuluTest, hulu_compress) {
    TestHuluCompress(brpc::COMPRESS_TYPE_SNAPPY);
    TestHuluCompress(brpc::COMPRESS_TYPE_GZIP);
    TestHuluCompress(brpc::COMPRESS_TYPE_ZLIB);
}
} //namespace