Commit a79093b4 authored by zhujiashun's avatar zhujiashun

redis_server_protocol: revise to sync interface

parent 491a1860
......@@ -60,9 +60,7 @@ public:
: _rsimpl(rsimpl) {}
brpc::RedisCommandHandler::Result Run(const char* args[],
brpc::RedisReply* output,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
brpc::RedisReply* output) {
if (args[1] == NULL) {
output->SetError("ERR wrong number of arguments for 'get' command");
return brpc::RedisCommandHandler::OK;
......@@ -88,14 +86,12 @@ public:
: _rsimpl(rsimpl) {}
brpc::RedisCommandHandler::Result Run(const char* args[],
brpc::RedisReply* output,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
std::string key = args[1];
if (args[2] == NULL) {
brpc::RedisReply* output) {
if (args[1] == NULL || args[2] == NULL) {
output->SetError("ERR wrong number of arguments for 'set' command");
return brpc::RedisCommandHandler::OK;
std::string key = args[1];
_rsimpl->Set(key, args[2]);
return brpc::RedisCommandHandler::OK;
......@@ -58,47 +58,40 @@ struct InputResponse : public InputMessageBase {
static const char** ParseArgs(const RedisReply& message) {
static bool ParseArgs(const RedisReply& message, std::unique_ptr<const char*[]>* args_out) {
if (!message.is_array() || message.size() == 0) {
LOG(WARNING) << "request message is not array or size equals to zero";
return NULL;
return false;
const char** args = (const char**)
malloc(sizeof(const char*) * (message.size() + 1 /* NULL */));
args_out->reset(new const char*[message.size() + 1 /* NULL */]);
for (size_t i = 0; i < message.size(); ++i) {
if (!message[i].is_string()) {
LOG(WARNING) << "request message[" << i << "] is not array";
return NULL;
return false;
args[i] = message[i].c_str();
(*args_out)[i] = message[i].c_str();
args[message.size()] = NULL;
return args;
(*args_out)[message.size()] = NULL;
return true;
// One redis command corresponding to one ConsumeTaskDone. Whenever user
// has completed the process of handling command and call done->Run()
// (read redis.h for more details), RedisConnContext::Flush() will be
// called and flush the response to client by the order that commands arrive.
class ConsumeTaskDone;
struct RedisTask {
RedisReply input_message;
butil::Arena arena;
// This class is as parsing_context in socket.
class RedisConnContext : public SharedObject
, public Destroyable {
: handler_continue(NULL)
, message_count(0) {}
: handler_continue(NULL) {}
// @Destroyable
void Destroy();
int Init();
// Push `done` to a queue which is read by Flush().
void Push(ConsumeTaskDone* done);
void Flush();
void ClearSentDones();
SocketId socket_id;
RedisService::CommandMap command_map;
......@@ -106,57 +99,23 @@ public:
// first handler pointer that triggers the transaction.
RedisCommandHandler* handler_continue;
// The redis command are parsed and pushed into this queue
bthread::ExecutionQueueId<ConsumeTaskDone*> queue;
bthread::ExecutionQueueId<RedisTask*> queue;
RedisReply parsing_message;
butil::Arena arena;
int64_t message_count;
void AddSentDone(ConsumeTaskDone* done);
bool _writing = false;
butil::Mutex _mutex;
std::queue<ConsumeTaskDone*> _dones;
butil::Mutex _dones_sent_mutex;
std::queue<ConsumeTaskDone*> _dones_sent;
class ConsumeTaskDone : public google::protobuf::Closure {
: _ready(false)
, output_message(&arena)
, ctx(NULL) {}
void Run() override;
bool IsReady() { return _ready.load(butil::memory_order_acquire); }
butil::atomic<bool> _ready;
RedisReply input_message;
RedisReply output_message;
RedisConnContext* ctx;
butil::IOBuf sendbuf;
butil::Arena arena;
int ConsumeTask(RedisConnContext* ctx, ConsumeTaskDone* done) {
ClosureGuard done_guard(done);
done->ctx = ctx;
RedisReply& output = done->output_message;
const char** args = ParseArgs(done->input_message);
if (!args) {
int ConsumeTask(RedisConnContext* ctx, RedisTask* task, butil::IOBuf* sendbuf) {
RedisReply output(&task->arena);
std::unique_ptr<const char*[]> args;
if (!ParseArgs(task->input_message, &args)) {
LOG(ERROR) << "ERR command not string";
output.SetError("ERR command not string");
return -1;
if (ctx->handler_continue) {
RedisCommandHandler::Result result = ctx->handler_continue->Run(
args, &output, done_guard.release());
RedisCommandHandler::Result result =
ctx->handler_continue->Run(args.get(), &output);
if (result == RedisCommandHandler::OK) {
ctx->handler_continue = NULL;
......@@ -172,39 +131,50 @@ int ConsumeTask(RedisConnContext* ctx, ConsumeTaskDone* done) {
snprintf(buf, sizeof(buf), "ERR unknown command `%s`", comm.c_str());
} else {
RedisCommandHandler::Result result =
it->second->Run(args, &output, done_guard.release());
RedisCommandHandler::Result result = it->second->Run(args.get(), &output);
if (result == RedisCommandHandler::CONTINUE) {
ctx->handler_continue = it->second.get();
return 0;
int Consume(void* ctx, bthread::TaskIterator<ConsumeTaskDone*>& iter) {
int Consume(void* ctx, bthread::TaskIterator<RedisTask*>& iter) {
RedisConnContext* qctx = static_cast<RedisConnContext*>(ctx);
if (iter.is_queue_stopped()) {
return 0;
SocketUniquePtr s;
bool has_err = false;
if (Socket::Address(qctx->socket_id, &s) != 0) {
LOG(WARNING) << "Fail to address redis socket";
has_err = true;
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
butil::IOBuf sendbuf;
for (; iter; ++iter) {
ConsumeTask(qctx, *iter);
std::unique_ptr<RedisTask> guard(*iter);
if (has_err) {
if (ConsumeTask(qctx, *iter, &sendbuf) != 0) {
has_err = true;
if (!has_err) {
LOG_IF(WARNING, s->Write(&sendbuf, &wopt) != 0) << "Fail to send redis reply";
return 0;
// ========== impl of RedisConnContext ==========
RedisConnContext::~RedisConnContext() {
while (!_dones.empty()) {
ConsumeTaskDone* head = _dones.front();
delete head;
RedisConnContext::~RedisConnContext() { }
void RedisConnContext::Destroy() {
......@@ -221,88 +191,8 @@ int RedisConnContext::Init() {
return 0;
void RedisConnContext::Push(ConsumeTaskDone* done) {
std::unique_lock<butil::Mutex> m(_mutex);
void RedisConnContext::Flush() {
SocketUniquePtr s;
if (Socket::Address(socket_id, &s) != 0) {
LOG(WARNING) << "Fail to address redis socket";
std::unique_lock<butil::Mutex> m(_mutex);
if (_writing) return;
_writing = true;
std::queue<ConsumeTaskDone*> ready_to_write;
butil::IOBuf buf;
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
while (true) {
std::unique_lock<butil::Mutex> m(_mutex);
while (!_dones.empty() && _dones.front()->IsReady()) {
if (ready_to_write.empty()) {
_writing = false;
if (!buf.empty()) {
LOG_IF(WARNING, s->Write(&buf, &wopt) != 0)
<< "Fail to send redis reply";
while (!ready_to_write.empty()) {
ConsumeTaskDone* head = ready_to_write.front();
if ((int)buf.size() > FLAGS_redis_batch_flush_max_size) {
// In extreme cases, there are always tasks that are ready in every check
// loop and the buf size continues to grow, then we will never have chance
// to write the buffer. To solve this issue, just add a limit to the maximum
// size of buf.
LOG_IF(WARNING, s->Write(&buf, &wopt) != 0)
<< "Fail to send redis reply";
void RedisConnContext::AddSentDone(ConsumeTaskDone* done) {
std::unique_lock<butil::Mutex> m(_dones_sent_mutex);
void RedisConnContext::ClearSentDones() {
std::queue<ConsumeTaskDone*> dones_sent;
std::unique_lock<butil::Mutex> m(_dones_sent_mutex);
while (!dones_sent.empty()) {
ConsumeTaskDone* head = dones_sent.front();
delete head;
// ========== impl of RedisConnContext ==========
void ConsumeTaskDone::Run() {
butil::intrusive_ptr<RedisConnContext> delete_ctx(ctx, false);
output_message.SerializeToIOBuf(&sendbuf);, butil::memory_order_release);
// After Flush(), this object may be deleted and should never be
// touched.
ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
bool read_eof, const void* arg) {
if (read_eof || source->empty()) {
......@@ -332,19 +222,15 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
if (err != PARSE_OK) {
return MakeParseError(err);
std::unique_ptr<ConsumeTaskDone> done(new ConsumeTaskDone);
done->input_message.CopyFromDifferentArena(ctx->parsing_message, &done->arena);
std::unique_ptr<RedisTask> task(new RedisTask);
task->input_message.CopyFromDifferentArena(ctx->parsing_message, &task->arena);
// Add a ref that removed in ConsumeTaskDone::Run
if (bthread::execution_queue_execute(ctx->queue, done.get()) != 0) {
if (bthread::execution_queue_execute(ctx->queue, task.get()) != 0) {
LOG(ERROR) << "Fail to push execution queue";
return MakeParseError(PARSE_ERROR_NO_RESOURCE);
return MakeMessage(NULL);
} else {
// NOTE(gejun): PopPipelinedInfo() is actually more contended than what
......@@ -248,19 +248,17 @@ public:
// args[2] == "bar" and args[3] == nullptr.
// `output`, which should be filled by user, is the content that sent to client side.
// Read brpc/src/redis_reply.h for more usage.
// Remember to call `done->Run()` when everything is set up into `output`. The return
// value should be RedisCommandHandler::OK for normal cases. If you want to implement
// transaction, return RedisCommandHandler::CONTINUE until server receives an ending
// marker. The first handler that return RedisCommandHandler::CONTINUE will continue
// receiving the following commands until it receives a ending marker and return
// RedisCommandHandler::OK to end transaction. For example, the return value of
// commands "multi; set k1 v1; set k2 v2; set k3 v3; exec" should be four
// The return value should be RedisCommandHandler::OK for normal cases. If you want
// to implement transaction, return RedisCommandHandler::CONTINUE until server receives
// an ending marker. The first handler that return RedisCommandHandler::CONTINUE will
// continue receiving the following commands until it receives an ending marker and
// return RedisCommandHandler::OK to end transaction. For example, the return value
// of commands "multi; set k1 v1; set k2 v2; set k3 v3; exec" should be four
// RedisCommandHandler::CONTINUE and one RedisCommandHandler::OK since exec is the
// marker that ends the transaction. User may queue the commands and execute them
// all once an ending marker is received.
// marker that ends the transaction. User should queue the commands and execute them
// all once the ending marker is received.
virtual RedisCommandHandler::Result Run(const char* args[],
RedisReply* output,
google::protobuf::Closure* done) = 0;
RedisReply* output) = 0;
// Whenever a tcp connection is established, a bunch of new handlers would be created
// using New() of the corresponding handler and brpc makes sure that all requests from
......@@ -665,60 +665,31 @@ butil::Mutex s_mutex;
std::unordered_map<std::string, std::string> m;
std::unordered_map<std::string, int64_t> int_map;
struct SleepArgs {
int sleep_ms;
google::protobuf::Closure* done;
void* sleep(void *arg) {
SleepArgs* args = static_cast<SleepArgs*>(arg);
bthread_usleep(args->sleep_ms * 1000);
delete args;
return NULL;
class SetCommandHandler : public brpc::RedisCommandHandler {
SetCommandHandler(bool sleep = false)
: _sleep(sleep) {}
SetCommandHandler() {}
brpc::RedisCommandHandler::Result Run(const char* args[],
brpc::RedisReply* output,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
brpc::RedisReply* output) {
std::string key = args[1];
std::string value = args[2];
m[key] = value;
if (_sleep) {
SleepArgs *args = new SleepArgs;
args->sleep_ms = _sleep_ms;
args->done = done_guard.release();
bthread_t bth;
EXPECT_EQ(0, bthread_start_background(&bth, NULL, sleep, args));
if (_sleep_ms > 20) _sleep_ms -= 20;
return brpc::RedisCommandHandler::OK;
RedisCommandHandler* New() { _new_count++; return new SetCommandHandler(_sleep); }
RedisCommandHandler* New() { _new_count++; return new SetCommandHandler(); }
int new_count() { return _new_count; }
int _sleep_ms = 100;
int _new_count = 0;
bool _sleep = false;
class GetCommandHandler : public brpc::RedisCommandHandler {
GetCommandHandler(bool sleep = false)
: _sleep(sleep) {}
GetCommandHandler() {}
brpc::RedisCommandHandler::Result Run(const char* args[],
brpc::RedisReply* output,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
brpc::RedisReply* output) {
std::string key = args[1];
auto it = m.find(key);
if (it != m.end()) {
......@@ -726,56 +697,33 @@ public:
} else {
if (_sleep) {
SleepArgs *args = new SleepArgs;
args->sleep_ms = _sleep_ms;
args->done = done_guard.release();
bthread_t bth;
EXPECT_EQ(0, bthread_start_background(&bth, NULL, sleep, args));
if (_sleep_ms > 20) _sleep_ms -= 20;
return brpc::RedisCommandHandler::OK;
RedisCommandHandler* New() { _new_count++; return new GetCommandHandler(_sleep); }
RedisCommandHandler* New() { _new_count++; return new GetCommandHandler(); }
int new_count() { return _new_count; }
int _sleep_ms = 100;
int _new_count = 0;
bool _sleep = false;
class IncrCommandHandler : public brpc::RedisCommandHandler {
IncrCommandHandler(bool sleep = false)
: _sleep(sleep) {}
IncrCommandHandler() {}
brpc::RedisCommandHandler::Result Run(const char* args[],
brpc::RedisReply* output,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
brpc::RedisReply* output) {
int64_t value;
value = ++int_map[args[1]];
if (_sleep) {
SleepArgs *args = new SleepArgs;
args->sleep_ms = _sleep_ms;
args->done = done_guard.release();
bthread_t bth;
EXPECT_EQ(0, bthread_start_background(&bth, NULL, sleep, args));
if (_sleep_ms > 20) _sleep_ms -= 20;
return brpc::RedisCommandHandler::OK;
RedisCommandHandler* New() { _new_count++; return new IncrCommandHandler(_sleep); }
RedisCommandHandler* New() { _new_count++; return new IncrCommandHandler(); }
int new_count() { return _new_count; }
int _sleep_ms = 100;
int _new_count = 0;
bool _sleep = false;
class RedisServiceImpl : public brpc::RedisService { };
......@@ -784,9 +732,9 @@ TEST_F(RedisTest, server_sanity) {
brpc::Server server;
brpc::ServerOptions server_options;
RedisServiceImpl* rsimpl = new RedisServiceImpl;
GetCommandHandler *gh = new GetCommandHandler(true);
SetCommandHandler *sh = new SetCommandHandler(true);
IncrCommandHandler *ih = new IncrCommandHandler(true);
GetCommandHandler *gh = new GetCommandHandler;
SetCommandHandler *sh = new SetCommandHandler;
IncrCommandHandler *ih = new IncrCommandHandler;
rsimpl->AddCommandHandler("get", gh);
rsimpl->AddCommandHandler("set", sh);
rsimpl->AddCommandHandler("incr", ih);
......@@ -884,9 +832,7 @@ public:
: _started(false) {}
RedisCommandHandler::Result Run(const char* args[],
brpc::RedisReply* output,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
brpc::RedisReply* output) {
if (strcasecmp(args[0], "multi") == 0) {
if (!_started) {
