Commit d7826178 authored by zhujiashun's avatar zhujiashun

redis_server_protocol: change user interface && support multi

parent 801bb1f8
......@@ -16,7 +16,9 @@
// under the License.
// Authors: Ge,Jun (
// Jiashun Zhu(
#include <queue>
#include <google/protobuf/descriptor.h> // MethodDescriptor
#include <google/protobuf/message.h> // Message
#include <gflags/gflags.h>
......@@ -30,6 +32,7 @@
#include "brpc/details/server_private_accessor.h"
#include "brpc/span.h"
#include "brpc/redis.h"
#include "brpc/redis_command.h"
#include "brpc/policy/redis_protocol.h"
#include "bthread/execution_queue.h"
......@@ -53,29 +56,128 @@ struct InputResponse : public InputMessageBase {
struct ExecutionQueueContext {
RedisMessage message;
struct QueueMeta {
SocketId socket_id;
RedisService::CommandMap command_map;
RedisCommandHandler* handler_continue;
// queue to buffer commands and execute these commands together and atomicly
// used to implement command like 'MULTI'.
std::queue<RedisMessage> command_queue;
// used to allocate memory for queued command;
butil::Arena arena;
// command that trigger commands to execute atomicly.
std::string queue_command_name;
QueueMeta() : handler_continue(NULL) {}
struct TaskContext {
RedisMessage message;
butil::Arena arena;
int Consume(void* meta, bthread::TaskIterator<ExecutionQueueContext*>& iter) {
RedisConnection* conn = static_cast<RedisConnection*>(meta);
void ConsumeTask(QueueMeta* meta, const RedisMessage& m, butil::Arena* arena, butil::IOBuf* sendbuf) {
RedisMessage output;
char buf[64];
do {
std::vector<const char*> args;
bool args_parsed = true;
for (size_t i = 0; i < m.size(); ++i) {
if (!m[i].is_string()) {
output.set_error("ERR command not string", arena);
args_parsed = false;
if (!args_parsed) {
std::string comm;
for (const char* c = m[0].c_str(); *c; ++c) {
if (meta->handler_continue) {
RedisCommandResult result = meta->handler_continue->Run(args, &output, arena);
if (comm == meta->queue_command_name) {
snprintf(buf, sizeof(buf), "ERR %s calls can not be nested", comm.c_str());
output.set_error(buf, arena);
RedisMessage& last = meta->command_queue.back();
last.CopyFromDifferentArena(m, &meta->arena);
output.set_status("QUEUED", arena);
} else if (result == REDIS_COMMAND_OK) {
meta->handler_continue = NULL;
butil::IOBuf nocountbuf;
int array_count = meta->command_queue.size();
while (!meta->command_queue.empty()) {
RedisMessage& front = meta->command_queue.front();
ConsumeTask(meta, front, arena, &nocountbuf);
AppendHeader(*sendbuf, '*', array_count);
} else if (result == REDIS_COMMAND_ERROR) {
if (!output.is_error()) {
output.set_error("internal server error", arena);
} else {
meta->handler_continue = NULL;
LOG(ERROR) << "unknown redis command result=" << result;
output.set_error("internal server error", arena);
auto it = meta->command_map.find(comm);
if (it == meta->command_map.end()) {
snprintf(buf, sizeof(buf), "ERR unknown command `%s`", comm.c_str());
output.set_error(buf, arena);
RedisCommandResult result = it->second->Run(args, &output, arena);
// First command that return REDIS_COMMAND_CONTINUE should not be pushed
// into queue, since it is always a marker.
meta->handler_continue = it->second.get();
meta->queue_command_name = comm;
output.set_status("OK", arena);
} else if (result == REDIS_COMMAND_ERROR) {
if (!output.is_error()) {
output.set_error("internal server error", arena);
} else if (result != REDIS_COMMAND_OK) {
LOG(ERROR) << "unknown redis command result=" << result;
} while(0);
int Consume(void* meta, bthread::TaskIterator<TaskContext*>& iter) {
QueueMeta* qmeta = static_cast<QueueMeta*>(meta);
if (iter.is_queue_stopped()) {
delete conn;
delete qmeta;
return 0;
SocketUniquePtr s;
bool has_err = false;
if (Socket::Address(qmeta->socket_id, &s) != 0) {
LOG(WARNING) << "Fail to address redis socket";
has_err = true;
for (; iter; ++iter) {
std::unique_ptr<ExecutionQueueContext> ctx(*iter);
SocketUniquePtr s;
if (Socket::Address(ctx->socket_id, &s) != 0) {
LOG(WARNING) << "Fail to address redis socket";
std::unique_ptr<TaskContext> ctx(*iter);
if (has_err) {
RedisMessage output;
conn->OnRedisMessage(ctx->message, &output, &ctx->arena);
butil::IOBuf sendbuf;
ConsumeTask(qmeta, ctx->message, &ctx->arena, &sendbuf);
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
LOG_IF(WARNING, s->Write(&sendbuf, &wopt) != 0)
......@@ -93,18 +195,18 @@ public:
// @Destroyable
void Destroy() { delete this; }
int init(RedisConnection* conn) {
int init(QueueMeta* meta) {
bthread::ExecutionQueueOptions q_opt;
q_opt.bthread_attr =
if (bthread::execution_queue_start(&queue, &q_opt, Consume, conn) != 0) {
if (bthread::execution_queue_start(&queue, &q_opt, Consume, meta) != 0) {
LOG(ERROR) << "Fail to start execution queue";
return -1;
return 0;
bthread::ExecutionQueueId<ExecutionQueueContext*> queue;
bthread::ExecutionQueueId<TaskContext*> queue;
ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
......@@ -120,33 +222,28 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
ServerContext* ctx = static_cast<ServerContext*>(socket->parsing_context());
if (ctx == NULL) {
RedisConnection* conn = rs->NewConnection();
if (!conn) {
LOG(ERROR) << "Fail to new redis connection from redis service";
return MakeParseError(PARSE_ERROR_TRY_OTHERS);
QueueMeta* meta = new QueueMeta;
meta->socket_id = socket->id();
ctx = new ServerContext;
if (ctx->init(conn) != 0) {
delete conn;
if (ctx->init(meta) != 0) {
delete ctx;
delete meta;
LOG(ERROR) << "Fail to init redis ServerContext";
return MakeParseError(PARSE_ERROR_NO_RESOURCE);
std::unique_ptr<ExecutionQueueContext> task(new ExecutionQueueContext);
RedisMessage message;
ParseError err = message.ConsumePartialIOBuf(*source, &task->arena);
std::unique_ptr<TaskContext> task_ctx(new TaskContext);
ParseError err = task_ctx->message.ConsumePartialIOBuf(*source, &task_ctx->arena);
if (err != PARSE_OK) {
return MakeParseError(err);
task->socket_id = socket->id();
if (bthread::execution_queue_execute(ctx->queue, task.get()) != 0) {
if (bthread::execution_queue_execute(ctx->queue, task_ctx.get()) != 0) {
LOG(ERROR) << "Fail to push execution queue";
return MakeParseError(PARSE_ERROR_NO_RESOURCE);
return MakeMessage(NULL);
} else {
// NOTE(gejun): PopPipelinedInfo() is actually more contended than what
......@@ -435,5 +435,21 @@ std::ostream& operator<<(std::ostream& os, const RedisResponse& response) {
return os;
bool RedisService::AddHandler(const std::string& name, RedisCommandHandler* handler) {
std::string lcname;
for (auto c : name) {
return true;
void RedisService::CloneCommandMap(CommandMap* map) {
for (auto it : _command_map) {
} // namespace brpc
......@@ -208,29 +208,44 @@ private:
std::ostream& operator<<(std::ostream& os, const RedisRequest&);
std::ostream& operator<<(std::ostream& os, const RedisResponse&);
class RedisConnection;
// Implement this class and assign an instance to ServerOption.redis_service
// to enable redis support. The return type of NewConnection(), which is
// RedisConnection, should also be implemented by users.
class RedisService {
virtual ~RedisService() {}
virtual RedisConnection* NewConnection() = 0;
enum RedisCommandResult {
// Implement this class and make RedisServiceImpl::NewConnection return the
// implemented class. Notice that one TCP connection corresponds to one RedisConnection
// instance, and for the same TCP connection, OnRedisMessage is called sequentially.
// But OnRedisMessage are called concurrently between different TCP connections.
// Read src/brpc/redis_message.h to get the idea how to read and write RedisMessage.
class RedisConnection {
// The handler for a redis command. Run() and New() should be implemented
// by user. For Run(), `args` is the redis command argument. For example,
// "set foo bar" corresponds to args[0] == "set", args[1] == "foo" and
// args[2] == "bar". `output` is the content that sent to client side,
// which should be set by user. Read brpc/src/redis_message.h for more usage.
// `arena` is the memory arena that `output` would use.
// For New(), whenever a tcp connection is established, all handlers would
// be cloned and brpc makes sure that all requests of the same command name
// from one connection would be sent to the same command handler. All requests
// in one connection are executed sequentially, just like what redis-server does.
class RedisCommandHandler {
virtual ~RedisConnection() {}
virtual void OnRedisMessage(const RedisMessage& message,
~RedisCommandHandler() {}
virtual RedisCommandResult Run(const std::vector<const char*>& args,
RedisMessage* output, butil::Arena* arena) = 0;
virtual RedisCommandHandler* New() = 0;
// Implement this class and assign an instance to ServerOption.redis_service
// to enable redis support. To support a particular command, you should implement
// the corresponding handler and call AddHandler to install it.
class RedisService {
typedef std::unordered_map<std::string, std::shared_ptr<RedisCommandHandler>> CommandMap;
virtual ~RedisService() {}
bool AddHandler(const std::string& name, RedisCommandHandler* handler);
void CloneCommandMap(CommandMap* map);
CommandMap _command_map;
} // namespace brpc
......@@ -28,40 +28,6 @@ namespace brpc {
const size_t CTX_WIDTH = 5;
// Much faster than snprintf(..., "%lu", d);
inline size_t AppendDecimal(char* outbuf, unsigned long d) {
char buf[24]; // enough for decimal 64-bit integers
size_t n = sizeof(buf);
do {
const unsigned long q = d / 10;
buf[--n] = d - q * 10 + '0';
d = q;
} while (d);
fast_memcpy(outbuf, buf + n, sizeof(buf) - n);
return sizeof(buf) - n;
// This function is the hotspot of RedisCommandFormatV() when format is
// short or does not have many %. In a 100K-time call to formating of
// "GET key1", the time spent on RedisRequest.AddCommand() are ~700ns
// vs. ~400ns while using snprintf() vs. AppendDecimal() respectively.
inline void AppendHeader(std::string& buf, char fc, unsigned long value) {
char header[32];
header[0] = fc;
size_t len = AppendDecimal(header + 1, value);
header[len + 1] = '\r';
header[len + 2] = '\n';
buf.append(header, len + 3);
inline void AppendHeader(butil::IOBuf& buf, char fc, unsigned long value) {
char header[32];
header[0] = fc;
size_t len = AppendDecimal(header + 1, value);
header[len + 1] = '\r';
header[len + 2] = '\n';
buf.append(header, len + 3);
static void FlushComponent(std::string* out, std::string* compbuf, int* ncomp) {
AppendHeader(*out, '$', compbuf->size());
......@@ -40,6 +40,41 @@ butil::Status RedisCommandByComponents(butil::IOBuf* buf,
const butil::StringPiece* components,
size_t num_components);
// Much faster than snprintf(..., "%lu", d);
inline size_t AppendDecimal(char* outbuf, unsigned long d) {
char buf[24]; // enough for decimal 64-bit integers
size_t n = sizeof(buf);
do {
const unsigned long q = d / 10;
buf[--n] = d - q * 10 + '0';
d = q;
} while (d);
fast_memcpy(outbuf, buf + n, sizeof(buf) - n);
return sizeof(buf) - n;
// This function is the hotspot of RedisCommandFormatV() when format is
// short or does not have many %. In a 100K-time call to formating of
// "GET key1", the time spent on RedisRequest.AddCommand() are ~700ns
// vs. ~400ns while using snprintf() vs. AppendDecimal() respectively.
inline void AppendHeader(std::string& buf, char fc, unsigned long value) {
char header[32];
header[0] = fc;
size_t len = AppendDecimal(header + 1, value);
header[len + 1] = '\r';
header[len + 2] = '\n';
buf.append(header, len + 3);
inline void AppendHeader(butil::IOBuf& buf, char fc, unsigned long value) {
char header[32];
header[0] = fc;
size_t len = AppendDecimal(header + 1, value);
header[len + 1] = '\r';
header[len + 2] = '\n';
buf.append(header, len + 3);
} // namespace brpc
......@@ -375,10 +375,8 @@ void RedisMessage::CopyFromDifferentArena(const RedisMessage& other,
new (&subs[i]) RedisMessage;
_data.array.last_index = other._data.array.last_index;
if (_data.array.last_index > 0) {
for (int i = 0; i < _data.array.last_index; ++i) {
subs[i].CopyFromDifferentArena(other._data.array.replies[i], arena);
for (size_t i = 0; i < _length; ++i) {
subs[i].CopyFromDifferentArena(other._data.array.replies[i], arena);
_data.array.replies = subs;
......@@ -211,6 +211,7 @@ inline bool RedisMessage::set_basic_string(const std::string& str, butil::Arena*
size_t size = str.size();
if (size < sizeof(_data.short_str)) {
memcpy(_data.short_str, str.c_str(), size);
_data.short_str[size] = '\0';
} else {
char* d = (char*)arena->allocate((_length/8 + 1) * 8);
if (!d) {
......@@ -218,6 +219,7 @@ inline bool RedisMessage::set_basic_string(const std::string& str, butil::Arena*
return false;
memcpy(d, str.c_str(), size);
d[size] = '\0';
_data.long_str = d;
_type = type;
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment