// Copyright (c) 2014 Baidu, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // A server to receive EchoRequest and send back EchoResponse asynchronously. #include <gflags/gflags.h> #include <butil/logging.h> #include <brpc/server.h> #include "echo.pb.h" DEFINE_bool(echo_attachment, true, "Echo attachment as well"); DEFINE_int32(port, 8002, "TCP Port of this server"); DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no " "read/write operations during the last `idle_timeout_s'"); DEFINE_int32(logoff_ms, 2000, "Maximum duration of server's LOGOFF state " "(waiting for client to close connection before server stops)"); DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel"); butil::atomic<int> nsd(0); struct MySessionLocalData { MySessionLocalData() : x(123) { nsd.fetch_add(1, butil::memory_order_relaxed); } ~MySessionLocalData() { nsd.fetch_sub(1, butil::memory_order_relaxed); } int x; }; class MySessionLocalDataFactory : public brpc::DataFactory { public: void* CreateData() const { return new MySessionLocalData; } void DestroyData(void* d) const { delete static_cast<MySessionLocalData*>(d); } }; butil::atomic<int> ntls(0); struct MyThreadLocalData { MyThreadLocalData() : y(0) { ntls.fetch_add(1, butil::memory_order_relaxed); } ~MyThreadLocalData() { ntls.fetch_sub(1, butil::memory_order_relaxed); } static void deleter(void* d) { delete static_cast<MyThreadLocalData*>(d); } int y; }; class MyThreadLocalDataFactory : public brpc::DataFactory { public: void* CreateData() const { return new MyThreadLocalData; } void DestroyData(void* d) const { MyThreadLocalData::deleter(d); } }; struct AsyncJob { MySessionLocalData* expected_session_local_data; int expected_session_value; brpc::Controller* cntl; const example::EchoRequest* request; example::EchoResponse* response; google::protobuf::Closure* done; void run(); void run_and_delete() { run(); delete this; } }; static void* process_thread(void* args) { AsyncJob* job = static_cast<AsyncJob*>(args); job->run_and_delete(); return NULL; } // Your implementation of example::EchoService class EchoServiceWithThreadAndSessionLocal : public example::EchoService { public: EchoServiceWithThreadAndSessionLocal() { CHECK_EQ(0, bthread_key_create(&_tls2_key, MyThreadLocalData::deleter)); } ~EchoServiceWithThreadAndSessionLocal() { CHECK_EQ(0, bthread_key_delete(_tls2_key)); }; void Echo(google::protobuf::RpcController* cntl_base, const example::EchoRequest* request, example::EchoResponse* response, google::protobuf::Closure* done) { brpc::ClosureGuard done_guard(done); brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); // Get the session-local data which is created by ServerOptions.session_local_data_factory // and reused between different RPC. All session-local data are // destroyed upon server destruction. MySessionLocalData* sd = static_cast<MySessionLocalData*>(cntl->session_local_data()); if (sd == NULL) { cntl->SetFailed("Require ServerOptions.session_local_data_factory to be" " set with a correctly implemented instance"); LOG(ERROR) << cntl->ErrorText(); return; } const int expected_value = sd->x + (((uintptr_t)cntl) & 0xFFFFFFFF); sd->x = expected_value; // Get the thread-local data which is created by ServerOptions.thread_local_data_factory // and reused between different threads. All thread-local data are // destroyed upon server destruction. // "tls" is short for "thread local storage". MyThreadLocalData* tls = static_cast<MyThreadLocalData*>(brpc::thread_local_data()); if (tls == NULL) { cntl->SetFailed("Require ServerOptions.thread_local_data_factory " "to be set with a correctly implemented instance"); LOG(ERROR) << cntl->ErrorText(); return; } tls->y = expected_value; // You can create bthread-local data for your own. // The interfaces are similar with pthread equivalence: // pthread_key_create -> bthread_key_create // pthread_key_delete -> bthread_key_delete // pthread_getspecific -> bthread_getspecific // pthread_setspecific -> bthread_setspecific MyThreadLocalData* tls2 = static_cast<MyThreadLocalData*>(bthread_getspecific(_tls2_key)); if (tls2 == NULL) { tls2 = new MyThreadLocalData; CHECK_EQ(0, bthread_setspecific(_tls2_key, tls2)); } tls2->y = expected_value + 1; // sleep awhile to force context switching. bthread_usleep(10000); // tls is unchanged after context switching. CHECK_EQ(tls, brpc::thread_local_data()); CHECK_EQ(expected_value, tls->y); CHECK_EQ(tls2, bthread_getspecific(_tls2_key)); CHECK_EQ(expected_value + 1, tls2->y); // Process the request asynchronously. AsyncJob* job = new AsyncJob; job->expected_session_local_data = sd; job->expected_session_value = expected_value; job->cntl = cntl; job->request = request; job->response = response; job->done = done; bthread_t th; CHECK_EQ(0, bthread_start_background(&th, NULL, process_thread, job)); // We don't want to call done->Run() here, release the guard. done_guard.release(); LOG_EVERY_SECOND(INFO) << "ntls=" << ntls.load(butil::memory_order_relaxed) << " nsd=" << nsd.load(butil::memory_order_relaxed); } private: bthread_key_t _tls2_key; }; void AsyncJob::run() { brpc::ClosureGuard done_guard(done); // Sleep some time to make sure that Echo() exits. bthread_usleep(10000); // Still the session-local data that we saw in Echo(). // This is the major difference between session-local data and thread-local // data which was already destroyed upon Echo() exit. MySessionLocalData* sd = static_cast<MySessionLocalData*>(cntl->session_local_data()); CHECK_EQ(expected_session_local_data, sd); CHECK_EQ(expected_session_value, sd->x); // Echo request and its attachment response->set_message(request->message()); if (FLAGS_echo_attachment) { cntl->response_attachment().append(cntl->request_attachment()); } } int main(int argc, char* argv[]) { // Parse gflags. We recommend you to use gflags as well. google::ParseCommandLineFlags(&argc, &argv, true); // The factory to create MySessionLocalData. Must be valid when server is running. MySessionLocalDataFactory session_local_data_factory; MyThreadLocalDataFactory thread_local_data_factory; // Generally you only need one Server. brpc::Server server; // For more options see `brpc/server.h'. brpc::ServerOptions options; options.idle_timeout_sec = FLAGS_idle_timeout_s; options.max_concurrency = FLAGS_max_concurrency; options.session_local_data_factory = &session_local_data_factory; options.thread_local_data_factory = &thread_local_data_factory; // Instance of your service. EchoServiceWithThreadAndSessionLocal echo_service_impl; // Add the service into server. Notice the second parameter, because the // service is put on stack, we don't want server to delete it, otherwise // use brpc::SERVER_OWNS_SERVICE. if (server.AddService(&echo_service_impl, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) { LOG(ERROR) << "Fail to add service"; return -1; } // Start the server. if (server.Start(FLAGS_port, &options) != 0) { LOG(ERROR) << "Fail to start EchoServer"; return -1; } // Wait until Ctrl-C is pressed, then Stop() and Join() the server. server.RunUntilAskedToQuit(); CHECK_EQ(ntls, 0); CHECK_EQ(nsd, 0); return 0; }