// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // brpc - A framework to host and access services throughout Baidu. // Date: 2015/10/22 16:28:44 #include <gtest/gtest.h> #include "brpc/server.h" #include "brpc/controller.h" #include "brpc/channel.h" #include "brpc/stream_impl.h" #include "echo.pb.h" class AfterAcceptStream { public: virtual void action(brpc::StreamId) = 0; }; class MyServiceWithStream : public test::EchoService { public: MyServiceWithStream(const brpc::StreamOptions& options) : _options(options) , _after_accept_stream(NULL) {} MyServiceWithStream(const brpc::StreamOptions& options, AfterAcceptStream* after_accept_stream) : _options(options) , _after_accept_stream(after_accept_stream) {} MyServiceWithStream() : _options() , _after_accept_stream(NULL) {} void Echo(::google::protobuf::RpcController* controller, const ::test::EchoRequest* request, ::test::EchoResponse* response, ::google::protobuf::Closure* done) { brpc::ClosureGuard done_gurad(done); response->set_message(request->message()); brpc::Controller* cntl = (brpc::Controller*)controller; brpc::StreamId response_stream; ASSERT_EQ(0, StreamAccept(&response_stream, *cntl, &_options)); LOG(INFO) << "Created response_stream=" << response_stream; if (_after_accept_stream) { _after_accept_stream->action(response_stream); } } private: brpc::StreamOptions _options; AfterAcceptStream* _after_accept_stream; }; class StreamingRpcTest : public testing::Test { protected: test::EchoRequest request; test::EchoResponse response; void SetUp() { request.set_message("hello world"); } void TearDown() {} }; TEST_F(StreamingRpcTest, sanity) { brpc::Server server; MyServiceWithStream service; ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE)); ASSERT_EQ(0, server.Start(9007, NULL)); brpc::Channel channel; ASSERT_EQ(0, channel.Init("127.0.0.1:9007", NULL)); brpc::Controller cntl; brpc::StreamId request_stream; ASSERT_EQ(0, StreamCreate(&request_stream, cntl, NULL)); brpc::ScopedStream stream_guard(request_stream); test::EchoService_Stub stub(&channel); stub.Echo(&cntl, &request, &response, NULL); ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText() << " request_stream=" << request_stream; usleep(10); brpc::StreamClose(request_stream); server.Stop(0); server.Join(); } struct HandlerControl { HandlerControl() : block(false) {} bool block; }; class OrderedInputHandler : public brpc::StreamInputHandler { public: explicit OrderedInputHandler(HandlerControl *cntl = NULL) : _expected_next_value(0) , _failed(false) , _stopped(false) , _idle_times(0) , _cntl(cntl) { } int on_received_messages(brpc::StreamId /*id*/, butil::IOBuf *const messages[], size_t size) { if (_cntl && _cntl->block) { while (_cntl->block) { usleep(100); } } for (size_t i = 0; i < size; ++i) { CHECK(messages[i]->length() == sizeof(int)); int network = 0; messages[i]->cutn(&network, sizeof(int)); EXPECT_EQ((int)ntohl(network), _expected_next_value++); } return 0; } void on_idle_timeout(brpc::StreamId /*id*/) { ++_idle_times; } void on_closed(brpc::StreamId /*id*/) { ASSERT_FALSE(_stopped); _stopped = true; } bool failed() const { return _failed; } bool stopped() const { return _stopped; } int idle_times() const { return _idle_times; } private: int _expected_next_value; bool _failed; bool _stopped; int _idle_times; HandlerControl* _cntl; }; TEST_F(StreamingRpcTest, received_in_order) { OrderedInputHandler handler; brpc::StreamOptions opt; opt.handler = &handler; opt.messages_in_batch = 100; brpc::Server server; MyServiceWithStream service(opt); ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE)); ASSERT_EQ(0, server.Start(9007, NULL)); brpc::Channel channel; ASSERT_EQ(0, channel.Init("127.0.0.1:9007", NULL)); brpc::Controller cntl; brpc::StreamId request_stream; brpc::StreamOptions request_stream_options; request_stream_options.max_buf_size = 0; ASSERT_EQ(0, StreamCreate(&request_stream, cntl, &request_stream_options)); brpc::ScopedStream stream_guard(request_stream); test::EchoService_Stub stub(&channel); stub.Echo(&cntl, &request, &response, NULL); ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText() << " request_stream=" << request_stream; const int N = 10000; for (int i = 0; i < N; ++i) { int network = htonl(i); butil::IOBuf out; out.append(&network, sizeof(network)); ASSERT_EQ(0, brpc::StreamWrite(request_stream, out)) << "i=" << i; } ASSERT_EQ(0, brpc::StreamClose(request_stream)); server.Stop(0); server.Join(); while (!handler.stopped()) { usleep(100); } ASSERT_FALSE(handler.failed()); ASSERT_EQ(0, handler.idle_times()); ASSERT_EQ(N, handler._expected_next_value); } void on_writable(brpc::StreamId, void* arg, int error_code) { std::pair<bool, int>* p = (std::pair<bool, int>*)arg; p->first = true; p->second = error_code; LOG(INFO) << "error_code=" << error_code; } TEST_F(StreamingRpcTest, block) { HandlerControl hc; OrderedInputHandler handler(&hc); hc.block = true; brpc::StreamOptions opt; opt.handler = &handler; const int N = 10000; opt.max_buf_size = sizeof(uint32_t) * N; brpc::Server server; MyServiceWithStream service(opt); ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE)); ASSERT_EQ(0, server.Start(9007, NULL)); brpc::Channel channel; ASSERT_EQ(0, channel.Init("127.0.0.1:9007", NULL)); brpc::Controller cntl; brpc::StreamId request_stream; brpc::ScopedStream stream_guard(request_stream); brpc::StreamOptions request_stream_options; request_stream_options.max_buf_size = sizeof(uint32_t) * N; ASSERT_EQ(0, StreamCreate(&request_stream, cntl, &request_stream_options)); test::EchoService_Stub stub(&channel); stub.Echo(&cntl, &request, &response, NULL); ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText() << " request_stream=" << request_stream; for (int i = 0; i < N; ++i) { int network = htonl(i); butil::IOBuf out; out.append(&network, sizeof(network)); ASSERT_EQ(0, brpc::StreamWrite(request_stream, out)) << "i=" << i; } // sync wait int dummy = 102030123; butil::IOBuf out; out.append(&dummy, sizeof(dummy)); ASSERT_EQ(EAGAIN, brpc::StreamWrite(request_stream, out)); hc.block = false; ASSERT_EQ(0, brpc::StreamWait(request_stream, NULL)); // wait flushing all the pending messages while (handler._expected_next_value != N) { usleep(100); } // block hanlder again to test async wait hc.block = true; // async wait for (int i = N; i < N + N; ++i) { int network = htonl(i); butil::IOBuf out; out.append(&network, sizeof(network)); ASSERT_EQ(0, brpc::StreamWrite(request_stream, out)) << "i=" << i; } out.clear(); out.append(&dummy, sizeof(dummy)); ASSERT_EQ(EAGAIN, brpc::StreamWrite(request_stream, out)); hc.block = false; std::pair<bool, int> p = std::make_pair(false, 0); usleep(10); brpc::StreamWait(request_stream, NULL, on_writable, &p); while (!p.first) { usleep(100); } ASSERT_EQ(0, p.second); // wait flushing all the pending messages while (handler._expected_next_value != N + N) { usleep(100); } usleep(1000); LOG(INFO) << "Starting block"; hc.block = true; for (int i = N + N; i < N + N + N; ++i) { int network = htonl(i); butil::IOBuf out; out.append(&network, sizeof(network)); ASSERT_EQ(0, brpc::StreamWrite(request_stream, out)) << "i=" << i - N - N; } out.clear(); out.append(&dummy, sizeof(dummy)); ASSERT_EQ(EAGAIN, brpc::StreamWrite(request_stream, out)); timespec duetime = butil::microseconds_from_now(1); p.first = false; LOG(INFO) << "Start wait"; brpc::StreamWait(request_stream, &duetime, on_writable, &p); while (!p.first) { usleep(100); } ASSERT_TRUE(p.first); EXPECT_EQ(ETIMEDOUT, p.second); hc.block = false; ASSERT_EQ(0, brpc::StreamClose(request_stream)); while (!handler.stopped()) { usleep(100); } ASSERT_FALSE(handler.failed()); ASSERT_EQ(0, handler.idle_times()); ASSERT_EQ(N + N + N, handler._expected_next_value); } TEST_F(StreamingRpcTest, auto_close_if_host_socket_closed) { HandlerControl hc; OrderedInputHandler handler(&hc); hc.block = true; brpc::StreamOptions opt; opt.handler = &handler; const int N = 10000; opt.max_buf_size = sizeof(uint32_t) * N; brpc::Server server; MyServiceWithStream service(opt); ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE)); ASSERT_EQ(0, server.Start(9007, NULL)); brpc::Channel channel; ASSERT_EQ(0, channel.Init("127.0.0.1:9007", NULL)); brpc::Controller cntl; brpc::StreamId request_stream; brpc::StreamOptions request_stream_options; request_stream_options.max_buf_size = sizeof(uint32_t) * N; ASSERT_EQ(0, StreamCreate(&request_stream, cntl, &request_stream_options)); brpc::ScopedStream stream_guard(request_stream); test::EchoService_Stub stub(&channel); stub.Echo(&cntl, &request, &response, NULL); ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText() << " request_stream=" << request_stream; { brpc::SocketUniquePtr ptr; ASSERT_EQ(0, brpc::Socket::Address(request_stream, &ptr)); brpc::Stream* s = (brpc::Stream*)ptr->conn(); ASSERT_TRUE(s->_host_socket != NULL); s->_host_socket->SetFailed(); } usleep(100); butil::IOBuf out; out.append("test"); ASSERT_EQ(EINVAL, brpc::StreamWrite(request_stream, out)); while (!handler.stopped()) { usleep(100); } ASSERT_FALSE(handler.failed()); ASSERT_EQ(0, handler.idle_times()); ASSERT_EQ(0, handler._expected_next_value); } TEST_F(StreamingRpcTest, idle_timeout) { HandlerControl hc; OrderedInputHandler handler(&hc); hc.block = true; brpc::StreamOptions opt; opt.handler = &handler; opt.idle_timeout_ms = 2; const int N = 10000; opt.max_buf_size = sizeof(uint32_t) * N; brpc::Server server; MyServiceWithStream service(opt); ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE)); ASSERT_EQ(0, server.Start(9007, NULL)); brpc::Channel channel; ASSERT_EQ(0, channel.Init("127.0.0.1:9007", NULL)); brpc::Controller cntl; brpc::StreamId request_stream; brpc::StreamOptions request_stream_options; request_stream_options.max_buf_size = sizeof(uint32_t) * N; ASSERT_EQ(0, StreamCreate(&request_stream, cntl, &request_stream_options)); brpc::ScopedStream stream_guard(request_stream); test::EchoService_Stub stub(&channel); stub.Echo(&cntl, &request, &response, NULL); ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText() << " request_stream=" << request_stream; usleep(10 * 1000 + 800); ASSERT_EQ(0, brpc::StreamClose(request_stream)); while (!handler.stopped()) { usleep(100); } ASSERT_FALSE(handler.failed()); // ASSERT_TRUE(handler.idle_times() >= 4 && handler.idle_times() <= 6) // << handler.idle_times(); ASSERT_EQ(0, handler._expected_next_value); } class PingPongHandler : public brpc::StreamInputHandler { public: explicit PingPongHandler() : _expected_next_value(0) , _failed(false) , _stopped(false) , _idle_times(0) { } int on_received_messages(brpc::StreamId id, butil::IOBuf *const messages[], size_t size) { if (size != 1) { _failed = true; return 0; } for (size_t i = 0; i < size; ++i) { CHECK(messages[i]->length() == sizeof(int)); int network = 0; messages[i]->cutn(&network, sizeof(int)); if ((int)ntohl(network) != _expected_next_value) { _failed = true; } int send_back = ntohl(network) + 1; _expected_next_value = send_back + 1; butil::IOBuf out; network = htonl(send_back); out.append(&network, sizeof(network)); // don't care the return value brpc::StreamWrite(id, out); } return 0; } void on_idle_timeout(brpc::StreamId /*id*/) { ++_idle_times; } void on_closed(brpc::StreamId /*id*/) { ASSERT_FALSE(_stopped); _stopped = true; } bool failed() const { return _failed; } bool stopped() const { return _stopped; } int idle_times() const { return _idle_times; } private: int _expected_next_value; bool _failed; bool _stopped; int _idle_times; }; TEST_F(StreamingRpcTest, ping_pong) { PingPongHandler resh; brpc::StreamOptions opt; opt.handler = &resh; const int N = 10000; opt.max_buf_size = sizeof(uint32_t) * N; brpc::Server server; MyServiceWithStream service(opt); ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE)); ASSERT_EQ(0, server.Start(9007, NULL)); brpc::Channel channel; ASSERT_EQ(0, channel.Init("127.0.0.1:9007", NULL)); brpc::Controller cntl; brpc::StreamId request_stream; brpc::StreamOptions request_stream_options; PingPongHandler reqh; reqh._expected_next_value = 1; request_stream_options.handler = &reqh; request_stream_options.max_buf_size = sizeof(uint32_t) * N; ASSERT_EQ(0, StreamCreate(&request_stream, cntl, &request_stream_options)); brpc::ScopedStream stream_guard(request_stream); test::EchoService_Stub stub(&channel); stub.Echo(&cntl, &request, &response, NULL); ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText() << " request_stream=" << request_stream; int send = 0; butil::IOBuf out; out.append(&send, sizeof(send)); ASSERT_EQ(0, brpc::StreamWrite(request_stream, out)); usleep(10 * 1000); ASSERT_EQ(0, brpc::StreamClose(request_stream)); while (!resh.stopped() || !reqh.stopped()) { usleep(100); } ASSERT_FALSE(resh.failed()); ASSERT_FALSE(reqh.failed()); ASSERT_EQ(0, resh.idle_times()); ASSERT_EQ(0, reqh.idle_times()); } class SendNAfterAcceptStream : public AfterAcceptStream { public: explicit SendNAfterAcceptStream(int n) : _n(n) {} void action(brpc::StreamId s) { for (int i = 0; i < _n; ++i) { int network = htonl(i); butil::IOBuf out; out.append(&network, sizeof(network)); ASSERT_EQ(0, brpc::StreamWrite(s, out)) << "i=" << i; } } private: int _n; }; TEST_F(StreamingRpcTest, server_send_data_before_run_done) { const int N = 10000; SendNAfterAcceptStream after_accept(N); brpc::StreamOptions opt; opt.max_buf_size = -1; brpc::Server server; MyServiceWithStream service(opt, &after_accept); ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE)); ASSERT_EQ(0, server.Start(9007, NULL)); brpc::Channel channel; ASSERT_EQ(0, channel.Init("127.0.0.1:9007", NULL)); OrderedInputHandler handler; brpc::StreamOptions request_stream_options; brpc::StreamId request_stream; brpc::Controller cntl; request_stream_options.handler = &handler; ASSERT_EQ(0, StreamCreate(&request_stream, cntl, &request_stream_options)); brpc::ScopedStream stream_guard(request_stream); test::EchoService_Stub stub(&channel); stub.Echo(&cntl, &request, &response, NULL); ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText() << " request_stream=" << request_stream; // wait flushing all the pending messages while (handler._expected_next_value != N) { usleep(100); } ASSERT_EQ(0, brpc::StreamClose(request_stream)); while (!handler.stopped()) { usleep(100); } ASSERT_FALSE(handler.failed()); ASSERT_EQ(0, handler.idle_times()); }