Commit 93df20c3 authored by gejun's avatar gejun

Patch svn r34997 r34998 r35000

Change-Id: Ica35c57cf8bbf234387a913a7d7e6b20dd9b8490
parent 798d7db8
......@@ -23,6 +23,7 @@
namespace brpc {
// Internal implementation detail -- do not call these.
void protobuf_AddDesc_baidu_2frpc_2fmemcache_5fbase_2eproto_impl();
void protobuf_AddDesc_baidu_2frpc_2fmemcache_5fbase_2eproto();
void protobuf_AssignDesc_baidu_2frpc_2fmemcache_5fbase_2eproto();
void protobuf_ShutdownFile_baidu_2frpc_2fmemcache_5fbase_2eproto();
......@@ -67,13 +68,14 @@ void protobuf_ShutdownFile_baidu_2frpc_2fmemcache_5fbase_2eproto() {
delete MemcacheResponse::default_instance_;
}
void protobuf_AddDesc_baidu_2frpc_2fmemcache_5fbase_2eproto() {
static bool already_here = false;
if (already_here) return;
already_here = true;
void protobuf_AddDesc_baidu_2frpc_2fmemcache_5fbase_2eproto_impl() {
GOOGLE_PROTOBUF_VERIFY_VERSION;
#if GOOGLE_PROTOBUF_VERSION >= 3002000
::google::protobuf::internal::InitProtobufDefaults();
#else
::google::protobuf::protobuf_AddDesc_google_2fprotobuf_2fdescriptor_2eproto();
#endif
::google::protobuf::DescriptorPool::InternalAddGeneratedFile(
"\n\035baidu/rpc/memcache_base.proto\022\tbaidu.r"
"pc\032 google/protobuf/descriptor.proto\"\021\n\017"
......@@ -87,6 +89,13 @@ void protobuf_AddDesc_baidu_2frpc_2fmemcache_5fbase_2eproto() {
::google::protobuf::internal::OnShutdown(&protobuf_ShutdownFile_baidu_2frpc_2fmemcache_5fbase_2eproto);
}
GOOGLE_PROTOBUF_DECLARE_ONCE(protobuf_AddDesc_baidu_2frpc_2fmemcache_5fbase_2eproto_once);
void protobuf_AddDesc_baidu_2frpc_2fmemcache_5fbase_2eproto() {
::google::protobuf::GoogleOnceInit(
&protobuf_AddDesc_baidu_2frpc_2fmemcache_5fbase_2eproto_once,
&protobuf_AddDesc_baidu_2frpc_2fmemcache_5fbase_2eproto_impl);
}
// Force AddDescriptors() to be called at static initialization time.
struct StaticDescriptorInitializer_baidu_2frpc_2fmemcache_5fbase_2eproto {
StaticDescriptorInitializer_baidu_2frpc_2fmemcache_5fbase_2eproto() {
......
......@@ -117,7 +117,8 @@ private:
int _pipelined_count;
base::IOBuf _buf;
mutable int _cached_size_;
friend void protobuf_AddDesc_baidu_2frpc_2fmemcache_5fbase_2eproto_impl();
friend void protobuf_AddDesc_baidu_2frpc_2fmemcache_5fbase_2eproto();
friend void protobuf_AssignDesc_baidu_2frpc_2fmemcache_5fbase_2eproto();
friend void protobuf_ShutdownFile_baidu_2frpc_2fmemcache_5fbase_2eproto();
......@@ -230,7 +231,8 @@ private:
std::string _err;
base::IOBuf _buf;
mutable int _cached_size_;
friend void protobuf_AddDesc_baidu_2frpc_2fmemcache_5fbase_2eproto_impl();
friend void protobuf_AddDesc_baidu_2frpc_2fmemcache_5fbase_2eproto();
friend void protobuf_AssignDesc_baidu_2frpc_2fmemcache_5fbase_2eproto();
friend void protobuf_ShutdownFile_baidu_2frpc_2fmemcache_5fbase_2eproto();
......
......@@ -48,13 +48,14 @@ void protobuf_ShutdownFile_baidu_2frpc_2fnshead_5fmessage_2eproto() {
delete NsheadMessage::default_instance_;
}
void protobuf_AddDesc_baidu_2frpc_2fnshead_5fmessage_2eproto() {
static bool already_here = false;
if (already_here) return;
already_here = true;
void protobuf_AddDesc_baidu_2frpc_2fnshead_5fmessage_2eproto_impl() {
GOOGLE_PROTOBUF_VERIFY_VERSION;
#if GOOGLE_PROTOBUF_VERSION >= 3002000
::google::protobuf::internal::InitProtobufDefaults();
#else
::google::protobuf::protobuf_AddDesc_google_2fprotobuf_2fdescriptor_2eproto();
#endif
::google::protobuf::DescriptorPool::InternalAddGeneratedFile(
"\n\036baidu/rpc/nshead_message.proto\022\tbaidu."
"rpc\032 google/protobuf/descriptor.proto\"\017\n"
......@@ -67,6 +68,13 @@ void protobuf_AddDesc_baidu_2frpc_2fnshead_5fmessage_2eproto() {
&protobuf_ShutdownFile_baidu_2frpc_2fnshead_5fmessage_2eproto);
}
GOOGLE_PROTOBUF_DECLARE_ONCE(protobuf_AddDesc_baidu_2frpc_2fnshead_5fmessage_2eproto_once);
void protobuf_AddDesc_baidu_2frpc_2fnshead_5fmessage_2eproto() {
::google::protobuf::GoogleOnceInit(
&protobuf_AddDesc_baidu_2frpc_2fnshead_5fmessage_2eproto_once,
&protobuf_AddDesc_baidu_2frpc_2fnshead_5fmessage_2eproto_impl);
}
// Force AddDescriptors() to be called at static initialization time.
struct StaticDescriptorInitializer_baidu_2frpc_2fnshead_5fmessage_2eproto {
StaticDescriptorInitializer_baidu_2frpc_2fnshead_5fmessage_2eproto() {
......
......@@ -69,6 +69,7 @@ private:
void SharedCtor();
void SharedDtor();
private:
friend void protobuf_AddDesc_baidu_2frpc_2fnshead_5fmessage_2eproto_impl();
friend void protobuf_AddDesc_baidu_2frpc_2fnshead_5fmessage_2eproto();
friend void protobuf_AssignDesc_baidu_2frpc_2fnshead_5fmessage_2eproto();
friend void protobuf_ShutdownFile_baidu_2frpc_2fnshead_5fmessage_2eproto();
......
......@@ -25,6 +25,7 @@ namespace brpc {
DEFINE_bool(redis_verbose_crlf2space, false, "[DEBUG] Show \\r\\n as a space");
// Internal implementation detail -- do not call these.
void protobuf_AddDesc_baidu_2frpc_2fredis_5fbase_2eproto_impl();
void protobuf_AddDesc_baidu_2frpc_2fredis_5fbase_2eproto();
void protobuf_AssignDesc_baidu_2frpc_2fredis_5fbase_2eproto();
void protobuf_ShutdownFile_baidu_2frpc_2fredis_5fbase_2eproto();
......@@ -69,13 +70,14 @@ void protobuf_ShutdownFile_baidu_2frpc_2fredis_5fbase_2eproto() {
delete RedisResponse::default_instance_;
}
void protobuf_AddDesc_baidu_2frpc_2fredis_5fbase_2eproto() {
static bool already_here = false;
if (already_here) return;
already_here = true;
void protobuf_AddDesc_baidu_2frpc_2fredis_5fbase_2eproto_impl() {
GOOGLE_PROTOBUF_VERIFY_VERSION;
::google::protobuf::protobuf_AddDesc_google_2fprotobuf_2fdescriptor_2eproto();
#if GOOGLE_PROTOBUF_VERSION >= 3002000
::google::protobuf::internal::InitProtobufDefaults();
#else
::google::protobuf::protobuf_AddDesc_google_2fprotobuf_2fdescriptor_2eproto();
#endif
::google::protobuf::DescriptorPool::InternalAddGeneratedFile(
"\n\032baidu/rpc/redis_base.proto\022\tbaidu.rpc\032"
" google/protobuf/descriptor.proto\"\016\n\014Red"
......@@ -89,6 +91,13 @@ void protobuf_AddDesc_baidu_2frpc_2fredis_5fbase_2eproto() {
::google::protobuf::internal::OnShutdown(&protobuf_ShutdownFile_baidu_2frpc_2fredis_5fbase_2eproto);
}
GOOGLE_PROTOBUF_DECLARE_ONCE(protobuf_AddDesc_baidu_2frpc_2fredis_5fbase_2eproto_once);
void protobuf_AddDesc_baidu_2frpc_2fredis_5fbase_2eproto() {
::google::protobuf::GoogleOnceInit(
&protobuf_AddDesc_baidu_2frpc_2fredis_5fbase_2eproto_once,
&protobuf_AddDesc_baidu_2frpc_2fredis_5fbase_2eproto_impl);
}
// Force AddDescriptors() to be called at static initialization time.
struct StaticDescriptorInitializer_baidu_2frpc_2fredis_5fbase_2eproto {
StaticDescriptorInitializer_baidu_2frpc_2fredis_5fbase_2eproto() {
......
......@@ -129,7 +129,8 @@ private:
bool _has_error; // previous AddCommand had error
base::IOBuf _buf; // the serialized request.
mutable int _cached_size_; // ByteSize
friend void protobuf_AddDesc_baidu_2frpc_2fredis_5fbase_2eproto_impl();
friend void protobuf_AddDesc_baidu_2frpc_2fredis_5fbase_2eproto();
friend void protobuf_AssignDesc_baidu_2frpc_2fredis_5fbase_2eproto();
friend void protobuf_ShutdownFile_baidu_2frpc_2fredis_5fbase_2eproto();
......@@ -201,7 +202,8 @@ private:
base::Arena _arena;
uint32_t _nreply;
mutable int _cached_size_;
friend void protobuf_AddDesc_baidu_2frpc_2fredis_5fbase_2eproto_impl();
friend void protobuf_AddDesc_baidu_2frpc_2fredis_5fbase_2eproto();
friend void protobuf_AssignDesc_baidu_2frpc_2fredis_5fbase_2eproto();
friend void protobuf_ShutdownFile_baidu_2frpc_2fredis_5fbase_2eproto();
......
......@@ -105,8 +105,8 @@ struct ExtendedSocketStat : public SocketStat {
struct Sampled {
uint32_t in_size_s;
uint32_t out_size_s;
uint32_t in_num_messages_s;
uint32_t out_size_s;
uint32_t out_num_messages_s;
};
SparseMinuteCounter<Sampled> _minute_counter;
......@@ -116,7 +116,7 @@ struct ExtendedSocketStat : public SocketStat {
, last_in_num_messages(0)
, last_out_size(0)
, last_out_num_messages(0) {
memset((SocketStat*)this, 0 , sizeof(SocketStat));
memset((SocketStat*)this, 0, sizeof(SocketStat));
}
};
......@@ -212,7 +212,6 @@ void Socket::SharedPart::UpdateStatsEverySecond(int64_t now_ms) {
stat->in_num_messages_m += s.in_num_messages_s;
stat->out_size_m += s.out_size_s;
stat->out_num_messages_m += s.out_num_messages_s;
if (stat->_minute_counter.Add(now_ms, s, &popped)) {
stat->in_size_m -= popped.in_size_s;
stat->in_num_messages_m -= popped.in_num_messages_s;
......@@ -843,17 +842,17 @@ int Socket::SetFailed(SocketId id) {
}
void Socket::NotifyOnFailed(bthread_id_t id) {
int rc = 0;
std::string desc;
pthread_mutex_lock(&_id_wait_list_mutex);
if (Failed()) {
rc = non_zero_error_code();
desc = _error_text;
if (!Failed()) {
const int rc = bthread_id_list_add(&_id_wait_list, id);
pthread_mutex_unlock(&_id_wait_list_mutex);
if (rc != 0) {
bthread_id_error(id, rc);
}
} else {
rc = bthread_id_list_add(&_id_wait_list, id);
}
pthread_mutex_unlock(&_id_wait_list_mutex);
if (rc != 0) {
const int rc = non_zero_error_code();
const std::string desc = _error_text;
pthread_mutex_unlock(&_id_wait_list_mutex);
bthread_id_error2(id, rc, desc);
}
}
......
......@@ -11,9 +11,6 @@
#include <gflags/gflags.h>
#include <bthread/bthread.h>
#include <base/logging.h>
#include <base/string_printf.h>
#include <base/time.h>
#include <base/macros.h>
#include <brpc/channel.h>
#include <brpc/server.h>
#include <bvar/bvar.h>
......
......@@ -8,13 +8,10 @@
// A multi-threaded client getting keys from a memcache server constantly.
#include <deque>
#include <gflags/gflags.h>
#include <bthread/bthread.h>
#include <base/logging.h>
#include <base/string_printf.h>
#include <base/time.h>
#include <base/macros.h>
#include <brpc/channel.h>
#include <brpc/memcache.h>
......@@ -26,32 +23,19 @@ DEFINE_string(load_balancer, "", "The algorithm for load balancing");
DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
DEFINE_bool(dont_fail, false, "Print fatal when some call failed");
DEFINE_int32(exptime, 5, "The to-be-got data will be expired after so many seconds");
DEFINE_int32(exptime, 0, "The to-be-got data will be expired after so many seconds");
DEFINE_string(key, "hello", "The key to be get");
DEFINE_string(value, "world", "The value associated with the key");
DEFINE_int32(batch, 1, "Pipelined Operations");
// For latency stats.
static pthread_mutex_t g_latency_mutex = PTHREAD_MUTEX_INITIALIZER;
struct BAIDU_CACHELINE_ALIGNMENT SenderInfo {
size_t nsuccess;
int64_t latency_sum;
};
static std::deque<SenderInfo> g_sender_info;
bvar::LatencyRecorder g_latency_recorder("client");
bvar::Adder<int> g_error_count("client_error_count");
base::static_atomic<int> g_sender_count = BASE_STATIC_ATOMIC_INIT(0);
static void* sender(void* arg) {
// For latency stats.
SenderInfo* info = NULL;
int base_index = 0;
{
BAIDU_SCOPED_LOCK(g_latency_mutex);
base_index = (int)g_sender_info.size() * FLAGS_batch;
g_sender_info.push_back(SenderInfo());
info = &g_sender_info.back();
}
google::protobuf::RpcChannel* channel =
static_cast<google::protobuf::RpcChannel*>(arg);
const int base_index = g_sender_count.fetch_add(1, base::memory_order_relaxed);
std::string value;
std::vector<std::pair<std::string, std::string> > kvs;
......@@ -75,8 +59,7 @@ static void* sender(void* arg) {
channel->CallMethod(NULL, &cntl, &request, &response, NULL);
const int64_t elp = cntl.latency_us();
if (!cntl.Failed()) {
info->latency_sum += elp;
++info->nsuccess;
g_latency_recorder << cntl.latency_us();
for (int i = 0; i < FLAGS_batch; ++i) {
uint32_t flags;
if (!response.PopGet(&value, &flags, NULL)) {
......@@ -90,9 +73,9 @@ static void* sender(void* arg) {
<< "base=" << base_index << " i=" << i << " value=" << value;
}
} else {
g_error_count << 1;
CHECK(brpc::IsAskedToQuit() || !FLAGS_dont_fail)
<< "error=" << cntl.ErrorText() << " latency=" << elp;
CHECK(elp < 5000) << "actually " << elp;
// We can't connect to the server, sleep a while. Notice that this
// is a specific sleeping to prevent this thread from spinning too
// fast. You should continue the business logic in a production
......@@ -106,6 +89,9 @@ static void* sender(void* arg) {
int main(int argc, char* argv[]) {
// Parse gflags. We recommend you to use gflags as well.
google::ParseCommandLineFlags(&argc, &argv, true);
if (FLAGS_exptime < 0) {
FLAGS_exptime = 0;
}
// A Channel represents a communication line to a Server. Notice that
// Channel is thread-safe and can be shared by all threads in your program.
......@@ -147,8 +133,13 @@ int main(int argc, char* argv[]) {
return -1;
}
}
LOG(INFO) << "Set " << FLAGS_batch * FLAGS_thread_num
<< " values, expiring after " << FLAGS_exptime << " seconds";
if (FLAGS_exptime > 0) {
LOG(INFO) << "Set " << FLAGS_batch * FLAGS_thread_num
<< " values, expired after " << FLAGS_exptime << " seconds";
} else {
LOG(INFO) << "Set " << FLAGS_batch * FLAGS_thread_num
<< " values, never expired";
}
std::vector<bthread_t> tids;
tids.resize(FLAGS_thread_num);
......@@ -169,34 +160,10 @@ int main(int argc, char* argv[]) {
}
}
int64_t last_counter = 0;
int64_t last_latency_sum = 0;
std::vector<size_t> last_nsuccess(FLAGS_thread_num);
while (!brpc::IsAskedToQuit()) {
sleep(1);
// Print qps & latency.
int64_t latency_sum = 0;
int64_t nsuccess = 0;
pthread_mutex_lock(&g_latency_mutex);
CHECK_EQ(g_sender_info.size(), (size_t)FLAGS_thread_num);
for (size_t i = 0; i < g_sender_info.size(); ++i) {
const SenderInfo& info = g_sender_info[i];
latency_sum += info.latency_sum;
nsuccess += info.nsuccess;
if (FLAGS_dont_fail) {
CHECK(info.nsuccess > last_nsuccess[i]);
}
last_nsuccess[i] = info.nsuccess;
}
pthread_mutex_unlock(&g_latency_mutex);
const int64_t avg_latency = (latency_sum - last_latency_sum) /
std::max(nsuccess - last_counter, 1L);
LOG(INFO) << "Accessing memcache server at qps=" << nsuccess - last_counter
<< " latency=" << avg_latency;
last_counter = nsuccess;
last_latency_sum = latency_sum;
LOG(INFO) << "Accessing memcache server at qps=" << g_latency_recorder.qps(1)
<< " latency=" << g_latency_recorder.latency(1);
}
LOG(INFO) << "memcache_client is going to quit";
......
......@@ -11,12 +11,8 @@
#include <gflags/gflags.h>
#include <bthread/bthread.h>
#include <base/logging.h>
#include <base/string_printf.h>
#include <base/time.h>
#include <base/macros.h>
#include <brpc/server.h>
#include <brpc/channel.h>
#include <deque>
#include "echo.pb.h"
#include <bvar/bvar.h>
......
......@@ -21,7 +21,7 @@
DEFINE_int32(thread_num, 50, "Number of threads to send requests");
DEFINE_bool(use_bthread, false, "Use bthread to send requests");
DEFINE_bool(send_attachment, false, "Carry attachment along with requests");
DEFINE_int32(attachment_size, 0, "Carry so many byte attachment along with requests");
DEFINE_string(protocol, "baidu_std", "Protocol type. Defined in protocol/brpc/options.proto");
DEFINE_string(connection_type, "", "Connection type. Available values: single, pooled, short");
DEFINE_string(server, "file://server_list", "Addresses of servers");
......@@ -30,46 +30,38 @@ DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
DEFINE_int32(backup_timeout_ms, -1, "backup timeout in milliseconds");
DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
DEFINE_bool(dont_fail, false, "Print fatal when some call failed");
DEFINE_int32(dummy_port, -1, "Port of dummy server");
DEFINE_int32(dummy_port, 0, "Launch dummy server at this port");
DEFINE_string(http_content_type, "application/json", "Content type of http request");
pthread_mutex_t g_latency_mutex = PTHREAD_MUTEX_INITIALIZER;
struct BAIDU_CACHELINE_ALIGNMENT SenderInfo {
size_t nsuccess;
int64_t latency_sum;
};
std::deque<SenderInfo> g_sender_info;
std::string g_attachment;
bvar::LatencyRecorder g_latency_recorder("client");
bvar::Adder<int> g_error_count("client_error_count");
base::static_atomic<int> g_sender_count = BASE_STATIC_ATOMIC_INIT(0);
static void* sender(void* arg) {
// Normally, you should not call a Channel directly, but instead construct
// a stub Service wrapping it. stub can be shared by all threads as well.
example::EchoService_Stub stub(static_cast<google::protobuf::RpcChannel*>(arg));
SenderInfo* info = NULL;
int thread_index = 0;
{
BAIDU_SCOPED_LOCK(g_latency_mutex);
g_sender_info.push_back(SenderInfo());
info = &g_sender_info.back();
thread_index = (int)g_sender_info.size();
}
int log_id = 0;
brpc::Controller cntl;
while (!brpc::IsAskedToQuit()) {
// We will receive response synchronously, safe to put variables
// on stack.
example::EchoRequest request;
example::EchoResponse response;
cntl.Reset();
brpc::Controller cntl;
const int thread_index = g_sender_count.fetch_add(1, base::memory_order_relaxed);
const int input = ((thread_index & 0xFFF) << 20) | (log_id & 0xFFFFF);
request.set_value(input);
cntl.set_log_id(log_id ++); // set by user
if (FLAGS_send_attachment) {
if (FLAGS_protocol != "http" && FLAGS_protocol != "h2c") {
// Set attachment which is wired to network directly instead of
// being serialized into protobuf messages.
cntl.request_attachment().append("foo");
cntl.request_attachment().append(g_attachment);
} else {
cntl.http_request().set_content_type(FLAGS_http_content_type);
}
// Because `done'(last parameter) is NULL, this function waits until
......@@ -77,9 +69,9 @@ static void* sender(void* arg) {
stub.Echo(&cntl, &request, &response, NULL);
if (!cntl.Failed()) {
CHECK(response.value() == request.value() + 1);
info->latency_sum += cntl.latency_us();
++info->nsuccess;
g_latency_recorder << cntl.latency_us();
} else {
g_error_count << 1;
CHECK(brpc::IsAskedToQuit() || !FLAGS_dont_fail)
<< "input=(" << thread_index << "," << (input & 0xFFFFF)
<< ") error=" << cntl.ErrorText() << " latency=" << cntl.latency_us();
......@@ -101,7 +93,7 @@ int main(int argc, char* argv[]) {
// Channel is thread-safe and can be shared by all threads in your program.
brpc::Channel channel;
// Initialize the channel, NULL means using default options.
// Initialize the channel, NULL means using default options.
brpc::ChannelOptions options;
options.backup_request_ms = FLAGS_backup_timeout_ms;
options.protocol = FLAGS_protocol;
......@@ -113,6 +105,10 @@ int main(int argc, char* argv[]) {
return -1;
}
if (FLAGS_attachment_size > 0) {
g_attachment.resize(FLAGS_attachment_size, 'a');
}
if (FLAGS_dummy_port > 0) {
brpc::StartDummyServerAt(FLAGS_dummy_port);
}
......@@ -136,32 +132,10 @@ int main(int argc, char* argv[]) {
}
}
int64_t last_counter = 0;
int64_t last_latency_sum = 0;
std::vector<size_t> last_nsuccess(FLAGS_thread_num);
while (!brpc::IsAskedToQuit()) {
sleep(1);
int64_t latency_sum = 0;
int64_t nsuccess = 0;
pthread_mutex_lock(&g_latency_mutex);
CHECK_EQ(g_sender_info.size(), (size_t)FLAGS_thread_num);
for (size_t i = 0; i < g_sender_info.size(); ++i) {
const SenderInfo& info = g_sender_info[i];
latency_sum += info.latency_sum;
nsuccess += info.nsuccess;
if (FLAGS_dont_fail) {
CHECK(info.nsuccess > last_nsuccess[i]);
}
last_nsuccess[i] = info.nsuccess;
}
pthread_mutex_unlock(&g_latency_mutex);
const int64_t avg_latency = (latency_sum - last_latency_sum) /
std::max(nsuccess - last_counter, 1L);
LOG(INFO) << "Sending EchoRequest at qps=" << nsuccess - last_counter
<< " latency=" << avg_latency;
last_counter = nsuccess;
last_latency_sum = latency_sum;
LOG(INFO) << "Sending EchoRequest at qps=" << g_latency_recorder.qps(1)
<< " latency=" << g_latency_recorder.latency(1);
}
LOG(INFO) << "EchoClient is going to quit";
......
......@@ -11,12 +11,8 @@
#include <gflags/gflags.h>
#include <bthread/bthread.h>
#include <base/logging.h>
#include <base/string_printf.h>
#include <base/time.h>
#include <base/macros.h>
#include <brpc/server.h>
#include <brpc/channel.h>
#include <deque>
#include "echo.pb.h"
#include <bvar/bvar.h>
......
......@@ -14,8 +14,6 @@
#include <readline/history.h>
#include <gflags/gflags.h>
#include <base/logging.h>
#include <base/string_printf.h>
#include <base/macros.h>
#include <brpc/channel.h>
#include <brpc/redis.h>
......
......@@ -12,8 +12,6 @@
#include <bthread/bthread.h>
#include <base/logging.h>
#include <base/string_printf.h>
#include <base/time.h>
#include <base/macros.h>
#include <bvar/bvar.h>
#include <brpc/channel.h>
#include <brpc/server.h>
......
......@@ -11,12 +11,8 @@
#include <gflags/gflags.h>
#include <bthread/bthread.h>
#include <base/logging.h>
#include <base/string_printf.h>
#include <base/time.h>
#include <base/macros.h>
#include <brpc/selective_channel.h>
#include <brpc/parallel_channel.h>
#include <deque>
#include "echo.pb.h"
DEFINE_int32(thread_num, 50, "Number of threads to send requests");
......@@ -34,25 +30,15 @@ DEFINE_bool(dont_fail, false, "Print fatal when some call failed");
std::string g_request;
std::string g_attachment;
pthread_mutex_t g_latency_mutex = PTHREAD_MUTEX_INITIALIZER;
struct BAIDU_CACHELINE_ALIGNMENT SenderInfo {
size_t nsuccess;
int64_t latency_sum;
};
std::deque<SenderInfo> g_sender_info;
bvar::LatencyRecorder g_latency_recorder("client");
bvar::Adder<int> g_error_count("client_error_count");
static void* sender(void* arg) {
// Normally, you should not call a Channel directly, but instead construct
// a stub Service wrapping it. stub can be shared by all threads as well.
example::EchoService_Stub stub(static_cast<google::protobuf::RpcChannel*>(arg));
SenderInfo* info = NULL;
{
BAIDU_SCOPED_LOCK(g_latency_mutex);
g_sender_info.push_back(SenderInfo());
info = &g_sender_info.back();
}
int log_id = 0;
while (!brpc::IsAskedToQuit()) {
// We will receive response synchronously, safe to put variables
......@@ -75,15 +61,13 @@ static void* sender(void* arg) {
stub.Echo(&cntl, &request, &response, NULL);
const int64_t elp = cntl.latency_us();
if (!cntl.Failed()) {
info->latency_sum += elp;
++info->nsuccess;
g_latency_recorder << cntl.latency_us();
} else {
g_error_count << 1;
CHECK(brpc::IsAskedToQuit() || !FLAGS_dont_fail)
<< "error=" << cntl.ErrorText() << " latency=" << elp;
CHECK_LT(elp, 5000);
// We can't connect to the server, sleep a while. Notice that this
// is a specific sleeping to prevent this thread from spinning too
// fast. You should continue the business logic in a production
// is a specific sleeping to prevent this thread from spinning too // fast. You should continue the business logic in a production
// server rather than sleeping.
bthread_usleep(50000);
}
......@@ -107,18 +91,6 @@ int main(int argc, char* argv[]) {
return -1;
}
LOG(INFO) << "Topology:\n"
<< "SelectiveChannel[\n"
<< " Channel[list://0.0.0.0:8004,0.0.0.0:8005,0.0.0.0:8006]\n"
<< " ParallelChannel[\n"
<< " Channel[0.0.0.0:8007]\n"
<< " Channel[0.0.0.0:8008]\n"
<< " Channel[0.0.0.0:8009]]\n"
<< " SelectiveChannel[\n"
<< " Channel[list://0.0.0.0:8010,0.0.0.0:8011,0.0.0.0:8012]\n"
<< " Channel[0.0.0.0:8013]\n"
<< " Channel[0.0.0.0:8014]]]\n";
// Add sub channels.
// ================
std::vector<brpc::ChannelBase*> sub_channels;
......@@ -238,32 +210,10 @@ int main(int argc, char* argv[]) {
}
}
int64_t last_counter = 0;
int64_t last_latency_sum = 0;
std::vector<size_t> last_nsuccess(FLAGS_thread_num);
while (!brpc::IsAskedToQuit()) {
sleep(1);
int64_t latency_sum = 0;
int64_t nsuccess = 0;
pthread_mutex_lock(&g_latency_mutex);
CHECK_EQ(g_sender_info.size(), (size_t)FLAGS_thread_num);
for (size_t i = 0; i < g_sender_info.size(); ++i) {
const SenderInfo& info = g_sender_info[i];
latency_sum += info.latency_sum;
nsuccess += info.nsuccess;
if (FLAGS_dont_fail) {
CHECK(info.nsuccess > last_nsuccess[i]) << "i=" << i;
}
last_nsuccess[i] = info.nsuccess;
}
pthread_mutex_unlock(&g_latency_mutex);
const int64_t avg_latency = (latency_sum - last_latency_sum) /
std::max(nsuccess - last_counter, 1L);
LOG(INFO) << "Sending EchoRequest at qps=" << nsuccess - last_counter
<< " latency=" << avg_latency;
last_counter = nsuccess;
last_latency_sum = latency_sum;
LOG(INFO) << "Sending EchoRequest at qps=" << g_latency_recorder.qps(1)
<< " latency=" << g_latency_recorder.latency(1);
}
LOG(INFO) << "EchoClient is going to quit";
......
......@@ -12,7 +12,6 @@
#include <gflags/gflags.h>
#include <base/time.h>
#include <base/logging.h>
#include <base/string_printf.h>
#include <base/string_splitter.h>
#include <base/rand_util.h>
#include <brpc/server.h>
......
......@@ -11,9 +11,6 @@
#include <gflags/gflags.h>
#include <bthread/bthread.h>
#include <base/logging.h>
#include <base/string_printf.h>
#include <base/time.h>
#include <base/macros.h>
#include <brpc/channel.h>
#include "echo.pb.h"
#include <bvar/bvar.h>
......
......@@ -10,7 +10,6 @@
#include <gflags/gflags.h>
#include <base/logging.h>
#include <base/string_printf.h>
#include <brpc/channel.h>
#include <brpc/stream.h>
#include "echo.pb.h"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment