Unverified Commit cd9bcea3 authored by Ge Jun's avatar Ge Jun Committed by GitHub

Merge pull request #972 from zyearn/redis_server_protocol

Redis server protocol
parents 7906e5ff cd1b4eac
...@@ -38,6 +38,10 @@ set(CMAKE_PREFIX_PATH ${OUTPUT_PATH}) ...@@ -38,6 +38,10 @@ set(CMAKE_PREFIX_PATH ${OUTPUT_PATH})
include(FindThreads) include(FindThreads)
include(FindProtobuf) include(FindProtobuf)
find_path(GPERFTOOLS_INCLUDE_DIR NAMES gperftools/heap-profiler.h)
find_library(GPERFTOOLS_LIBRARIES NAMES tcmalloc_and_profiler)
include_directories(${GPERFTOOLS_INCLUDE_DIR})
# Search for libthrift* by best effort. If it is not found and brpc is # Search for libthrift* by best effort. If it is not found and brpc is
# compiled with thrift protocol enabled, a link error would be reported. # compiled with thrift protocol enabled, a link error would be reported.
find_library(THRIFT_LIB NAMES thrift) find_library(THRIFT_LIB NAMES thrift)
...@@ -126,6 +130,7 @@ set(DYNAMIC_LIB ...@@ -126,6 +130,7 @@ set(DYNAMIC_LIB
${CRYPTO_LIB} ${CRYPTO_LIB}
${THRIFT_LIB} ${THRIFT_LIB}
${THRIFTNB_LIB} ${THRIFTNB_LIB}
${GPERFTOOLS_LIBRARIES}
dl dl
) )
...@@ -145,7 +150,10 @@ endif() ...@@ -145,7 +150,10 @@ endif()
add_executable(redis_cli redis_cli.cpp) add_executable(redis_cli redis_cli.cpp)
add_executable(redis_press redis_press.cpp) add_executable(redis_press redis_press.cpp)
add_executable(redis_server redis_server.cpp)
set(AUX_LIB readline ncurses) set(AUX_LIB readline ncurses)
target_link_libraries(redis_cli ${BRPC_LIB} ${DYNAMIC_LIB} ${AUX_LIB}) target_link_libraries(redis_cli ${BRPC_LIB} ${DYNAMIC_LIB} ${AUX_LIB})
target_link_libraries(redis_press ${BRPC_LIB} ${DYNAMIC_LIB} ${AUX_LIB}) target_link_libraries(redis_press ${BRPC_LIB} ${DYNAMIC_LIB})
target_link_libraries(redis_server ${BRPC_LIB} ${DYNAMIC_LIB})
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// A brpc based redis-server. Currently just implement set and
// get, but it's sufficient that you can get the idea how to
// implement brpc::RedisCommandHandler.
#include <brpc/server.h>
#include <brpc/redis.h>
#include <butil/crc32c.h>
#include <butil/strings/string_split.h>
#include <gflags/gflags.h>
#include <unordered_map>
#include <butil/time.h>
class RedisServiceImpl : public brpc::RedisService {
public:
bool Set(const std::string& key, const std::string& value) {
int slot = butil::crc32c::Value(key.c_str(), key.size()) % kHashSlotNum;
_mutex[slot].lock();
_map[slot][key] = value;
_mutex[slot].unlock();
return true;
}
bool Get(const std::string& key, std::string* value) {
int slot = butil::crc32c::Value(key.c_str(), key.size()) % kHashSlotNum;
_mutex[slot].lock();
auto it = _map[slot].find(key);
if (it == _map[slot].end()) {
_mutex[slot].unlock();
return false;
}
*value = it->second;
_mutex[slot].unlock();
return true;
}
private:
const static int kHashSlotNum = 32;
std::unordered_map<std::string, std::string> _map[kHashSlotNum];
butil::Mutex _mutex[kHashSlotNum];
};
class GetCommandHandler : public brpc::RedisCommandHandler {
public:
GetCommandHandler(RedisServiceImpl* rsimpl)
: _rsimpl(rsimpl) {}
brpc::RedisCommandHandler::Result Run(const std::vector<const char*>& args,
brpc::RedisReply* output,
bool flush_batched) override {
if (args.size() <= 1) {
output->SetError("ERR wrong number of arguments for 'get' command");
return brpc::RedisCommandHandler::OK;
}
const std::string key(args[1]);
std::string value;
if (_rsimpl->Get(key, &value)) {
output->SetString(value);
} else {
output->SetNullString();
}
return brpc::RedisCommandHandler::OK;
}
private:
RedisServiceImpl* _rsimpl;
};
class SetCommandHandler : public brpc::RedisCommandHandler {
public:
SetCommandHandler(RedisServiceImpl* rsimpl)
: _rsimpl(rsimpl) {}
brpc::RedisCommandHandler::Result Run(const std::vector<const char*>& args,
brpc::RedisReply* output,
bool flush_batched) override {
if (args.size() <= 2) {
output->SetError("ERR wrong number of arguments for 'set' command");
return brpc::RedisCommandHandler::OK;
}
const std::string key(args[1]);
const std::string value(args[2]);
_rsimpl->Set(key, value);
output->SetStatus("OK");
return brpc::RedisCommandHandler::OK;
}
private:
RedisServiceImpl* _rsimpl;
};
int main(int argc, char* argv[]) {
google::ParseCommandLineFlags(&argc, &argv, true);
RedisServiceImpl* rsimpl = new RedisServiceImpl;
rsimpl->AddCommandHandler("get", new GetCommandHandler(rsimpl));
rsimpl->AddCommandHandler("set", new SetCommandHandler(rsimpl));
brpc::Server server;
brpc::ServerOptions server_options;
server_options.redis_service = rsimpl;
if (server.Start(6379, &server_options) != 0) {
LOG(ERROR) << "Fail to start server";
return -1;
}
server.RunUntilAskedToQuit();
return 0;
}
...@@ -21,7 +21,7 @@ include $(BRPC_PATH)/config.mk ...@@ -21,7 +21,7 @@ include $(BRPC_PATH)/config.mk
# Notes on the flags: # Notes on the flags:
# 1. Added -fno-omit-frame-pointer: perf/tcmalloc-profiler use frame pointers by default # 1. Added -fno-omit-frame-pointer: perf/tcmalloc-profiler use frame pointers by default
# 2. Added -D__const__= : Avoid over-optimizations of TLS variables by GCC>=4.8 # 2. Added -D__const__= : Avoid over-optimizations of TLS variables by GCC>=4.8
CXXFLAGS = $(CPPFLAGS) -std=c++0x -g -DNDEBUG -O2 -D__const__= -pipe -W -Wall -Werror -Wno-unused-parameter -fPIC -fno-omit-frame-pointer CXXFLAGS = $(CPPFLAGS) -std=c++0x -g -DNDEBUG -O2 -D__const__= -pipe -W -Wall -Wno-unused-parameter -fPIC -fno-omit-frame-pointer
ifeq ($(NEED_GPERFTOOLS), 1) ifeq ($(NEED_GPERFTOOLS), 1)
CXXFLAGS+=-DBRPC_ENABLE_CPU_PROFILER CXXFLAGS+=-DBRPC_ENABLE_CPU_PROFILER
endif endif
......
...@@ -489,7 +489,7 @@ static void GlobalInitializeOrDieImpl() { ...@@ -489,7 +489,7 @@ static void GlobalInitializeOrDieImpl() {
Protocol redis_protocol = { ParseRedisMessage, Protocol redis_protocol = { ParseRedisMessage,
SerializeRedisRequest, SerializeRedisRequest,
PackRedisRequest, PackRedisRequest,
NULL, ProcessRedisResponse, ProcessRedisRequest, ProcessRedisResponse,
NULL, NULL, GetRedisMethodName, NULL, NULL, GetRedisMethodName,
CONNECTION_TYPE_ALL, "redis" }; CONNECTION_TYPE_ALL, "redis" };
if (RegisterProtocol(PROTOCOL_REDIS, redis_protocol) != 0) { if (RegisterProtocol(PROTOCOL_REDIS, redis_protocol) != 0) {
......
...@@ -46,14 +46,14 @@ enum ProtocolType { ...@@ -46,14 +46,14 @@ enum ProtocolType {
PROTOCOL_HTTP = 7; PROTOCOL_HTTP = 7;
PROTOCOL_PUBLIC_PBRPC = 8; PROTOCOL_PUBLIC_PBRPC = 8;
PROTOCOL_NOVA_PBRPC = 9; PROTOCOL_NOVA_PBRPC = 9;
PROTOCOL_NSHEAD_CLIENT = 10; // implemented in baidu-rpc-ub PROTOCOL_REDIS = 10;
PROTOCOL_NSHEAD = 11; PROTOCOL_NSHEAD_CLIENT = 11; // implemented in baidu-rpc-ub
PROTOCOL_HADOOP_RPC = 12; PROTOCOL_NSHEAD = 12;
PROTOCOL_HADOOP_SERVER_RPC = 13; PROTOCOL_HADOOP_RPC = 13;
PROTOCOL_MONGO = 14; // server side only PROTOCOL_HADOOP_SERVER_RPC = 14;
PROTOCOL_UBRPC_COMPACK = 15; PROTOCOL_MONGO = 15; // server side only
PROTOCOL_DIDX_CLIENT = 16; // Client side only PROTOCOL_UBRPC_COMPACK = 16;
PROTOCOL_REDIS = 17; // Client side only PROTOCOL_DIDX_CLIENT = 17; // Client side only
PROTOCOL_MEMCACHE = 18; // Client side only PROTOCOL_MEMCACHE = 18; // Client side only
PROTOCOL_ITP = 19; PROTOCOL_ITP = 19;
PROTOCOL_NSHEAD_MCPACK = 20; PROTOCOL_NSHEAD_MCPACK = 20;
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
// under the License. // under the License.
// Authors: Ge,Jun (gejun@baidu.com) // Authors: Ge,Jun (gejun@baidu.com)
// Jiashun Zhu(zhujiashun2010@gmail.com)
#include <google/protobuf/descriptor.h> // MethodDescriptor #include <google/protobuf/descriptor.h> // MethodDescriptor
#include <google/protobuf/message.h> // Message #include <google/protobuf/message.h> // Message
...@@ -30,12 +31,13 @@ ...@@ -30,12 +31,13 @@
#include "brpc/details/server_private_accessor.h" #include "brpc/details/server_private_accessor.h"
#include "brpc/span.h" #include "brpc/span.h"
#include "brpc/redis.h" #include "brpc/redis.h"
#include "brpc/redis_command.h"
#include "brpc/policy/redis_protocol.h" #include "brpc/policy/redis_protocol.h"
namespace brpc { namespace brpc {
DECLARE_bool(enable_rpcz); DECLARE_bool(enable_rpcz);
DECLARE_bool(usercode_in_pthread);
namespace policy { namespace policy {
...@@ -52,62 +54,199 @@ struct InputResponse : public InputMessageBase { ...@@ -52,62 +54,199 @@ struct InputResponse : public InputMessageBase {
} }
}; };
// "Message" = "Response" as we only implement the client for redis. // This class is as parsing_context in socket.
ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket, class RedisConnContext : public Destroyable {
bool /*read_eof*/, const void* /*arg*/) { public:
if (source->empty()) { RedisConnContext()
return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA); : redis_service(NULL)
, batched_size(0) {}
~RedisConnContext();
// @Destroyable
void Destroy() override;
SocketId socket_id;
RedisService* redis_service;
// If user starts a transaction, transaction_handler indicates the
// handler pointer that runs the transaction command.
std::unique_ptr<RedisCommandHandler> transaction_handler;
// >0 if command handler is run in batched mode.
int batched_size;
RedisCommandParser parser;
butil::Arena arena;
};
int ConsumeCommand(RedisConnContext* ctx,
const std::vector<const char*>& commands,
bool flush_batched,
butil::IOBufAppender* appender) {
RedisReply output(&ctx->arena);
RedisCommandHandler::Result result = RedisCommandHandler::OK;
if (ctx->transaction_handler) {
result = ctx->transaction_handler->Run(commands, &output, flush_batched);
if (result == RedisCommandHandler::OK) {
ctx->transaction_handler.reset(NULL);
} else if (result == RedisCommandHandler::BATCHED) {
LOG(ERROR) << "BATCHED should not be returned by a transaction handler.";
return -1;
}
} else {
RedisCommandHandler* ch = ctx->redis_service->FindCommandHandler(commands[0]);
if (!ch) {
char buf[64];
snprintf(buf, sizeof(buf), "ERR unknown command `%s`", commands[0]);
output.SetError(buf);
} else {
result = ch->Run(commands, &output, flush_batched);
if (result == RedisCommandHandler::CONTINUE) {
if (ctx->batched_size != 0) {
LOG(ERROR) << "CONTINUE should not be returned in a batched process.";
return -1;
}
ctx->transaction_handler.reset(ch->NewTransactionHandler());
} else if (result == RedisCommandHandler::BATCHED) {
ctx->batched_size++;
}
}
} }
// NOTE(gejun): PopPipelinedInfo() is actually more contended than what if (result == RedisCommandHandler::OK) {
// I thought before. The Socket._pipeline_q is a SPSC queue pushed before if (ctx->batched_size) {
// sending and popped when response comes back, being protected by a if ((int)output.size() != (ctx->batched_size + 1)) {
// mutex. Previously the mutex is shared with Socket._id_wait_list. When LOG(ERROR) << "reply array size can't be matched with batched size, "
// 200 bthreads access one redis-server, ~1.5s in total is spent on << " expected=" << ctx->batched_size + 1 << " actual=" << output.size();
// contention in 10-second duration. If the mutex is separated, the time return -1;
// drops to ~0.25s. I further replaced PeekPipelinedInfo() with }
// GivebackPipelinedInfo() to lock only once(when receiving response) for (int i = 0; i < (int)output.size(); ++i) {
// in most cases, and the time decreases to ~0.14s. output[i].SerializeTo(appender);
PipelinedInfo pi; }
if (!socket->PopPipelinedInfo(&pi)) { ctx->batched_size = 0;
LOG(WARNING) << "No corresponding PipelinedInfo in socket"; } else {
return MakeParseError(PARSE_ERROR_TRY_OTHERS); output.SerializeTo(appender);
}
} else if (result == RedisCommandHandler::CONTINUE) {
output.SerializeTo(appender);
} else if (result == RedisCommandHandler::BATCHED) {
// just do nothing and wait handler to return OK.
} else {
LOG(ERROR) << "unknown status=" << result;
return -1;
} }
return 0;
}
do { // ========== impl of RedisConnContext ==========
InputResponse* msg = static_cast<InputResponse*>(socket->parsing_context());
if (msg == NULL) { RedisConnContext::~RedisConnContext() { }
msg = new InputResponse;
socket->reset_parsing_context(msg); void RedisConnContext::Destroy() {
} delete this;
}
// ========== impl of RedisConnContext ==========
const int consume_count = (pi.with_auth ? 1 : pi.count); ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
bool read_eof, const void* arg) {
if (read_eof || source->empty()) {
return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
}
const Server* server = static_cast<const Server*>(arg);
if (server) {
RedisService* rs = server->options().redis_service;
if (!rs) {
return MakeParseError(PARSE_ERROR_TRY_OTHERS);
}
RedisConnContext* ctx = static_cast<RedisConnContext*>(socket->parsing_context());
if (ctx == NULL) {
ctx = new RedisConnContext;
ctx->socket_id = socket->id();
ctx->redis_service = rs;
socket->reset_parsing_context(ctx);
}
std::vector<const char*> current_commands;
butil::IOBufAppender appender;
ParseError err = PARSE_OK;
ParseError err = msg->response.ConsumePartialIOBuf(*source, consume_count); err = ctx->parser.Consume(*source, &current_commands, &ctx->arena);
if (err != PARSE_OK) { if (err != PARSE_OK) {
socket->GivebackPipelinedInfo(pi);
return MakeParseError(err); return MakeParseError(err);
} }
while (true) {
std::vector<const char*> next_commands;
err = ctx->parser.Consume(*source, &next_commands, &ctx->arena);
if (err != PARSE_OK) {
break;
}
if (ConsumeCommand(ctx, current_commands, false, &appender) != 0) {
return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
}
current_commands.swap(next_commands);
}
if (ConsumeCommand(ctx, current_commands,
true /*must be the last message*/, &appender) != 0) {
return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
}
butil::IOBuf sendbuf;
appender.move_to(sendbuf);
CHECK(!sendbuf.empty());
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
LOG_IF(WARNING, socket->Write(&sendbuf, &wopt) != 0)
<< "Fail to send redis reply";
ctx->arena.clear();
return MakeParseError(err);
} else {
// NOTE(gejun): PopPipelinedInfo() is actually more contended than what
// I thought before. The Socket._pipeline_q is a SPSC queue pushed before
// sending and popped when response comes back, being protected by a
// mutex. Previously the mutex is shared with Socket._id_wait_list. When
// 200 bthreads access one redis-server, ~1.5s in total is spent on
// contention in 10-second duration. If the mutex is separated, the time
// drops to ~0.25s. I further replaced PeekPipelinedInfo() with
// GivebackPipelinedInfo() to lock only once(when receiving response)
// in most cases, and the time decreases to ~0.14s.
PipelinedInfo pi;
if (!socket->PopPipelinedInfo(&pi)) {
LOG(WARNING) << "No corresponding PipelinedInfo in socket";
return MakeParseError(PARSE_ERROR_TRY_OTHERS);
}
if (pi.with_auth) { do {
if (msg->response.reply_size() != 1 || InputResponse* msg = static_cast<InputResponse*>(socket->parsing_context());
!(msg->response.reply(0).type() == brpc::REDIS_REPLY_STATUS && if (msg == NULL) {
msg->response.reply(0).data().compare("OK") == 0)) { msg = new InputResponse;
LOG(ERROR) << "Redis Auth failed: " << msg->response; socket->reset_parsing_context(msg);
return MakeParseError(PARSE_ERROR_NO_RESOURCE,
"Fail to authenticate with Redis");
} }
DestroyingPtr<InputResponse> auth_msg( const int consume_count = (pi.with_auth ? 1 : pi.count);
static_cast<InputResponse*>(socket->release_parsing_context()));
pi.with_auth = false; ParseError err = msg->response.ConsumePartialIOBuf(*source, consume_count);
continue; if (err != PARSE_OK) {
} socket->GivebackPipelinedInfo(pi);
return MakeParseError(err);
}
CHECK_EQ((uint32_t)msg->response.reply_size(), pi.count); if (pi.with_auth) {
msg->id_wait = pi.id_wait; if (msg->response.reply_size() != 1 ||
socket->release_parsing_context(); !(msg->response.reply(0).type() == brpc::REDIS_REPLY_STATUS &&
return MakeMessage(msg); msg->response.reply(0).data().compare("OK") == 0)) {
} while(true); LOG(ERROR) << "Redis Auth failed: " << msg->response;
return MakeParseError(PARSE_ERROR_NO_RESOURCE,
"Fail to authenticate with Redis");
}
DestroyingPtr<InputResponse> auth_msg(
static_cast<InputResponse*>(socket->release_parsing_context()));
pi.with_auth = false;
continue;
}
CHECK_EQ((uint32_t)msg->response.reply_size(), pi.count);
msg->id_wait = pi.id_wait;
socket->release_parsing_context();
return MakeMessage(msg);
} while(true);
}
return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG); return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
} }
...@@ -158,6 +297,8 @@ void ProcessRedisResponse(InputMessageBase* msg_base) { ...@@ -158,6 +297,8 @@ void ProcessRedisResponse(InputMessageBase* msg_base) {
accessor.OnResponse(cid, saved_error); accessor.OnResponse(cid, saved_error);
} }
void ProcessRedisRequest(InputMessageBase* msg_base) { }
void SerializeRedisRequest(butil::IOBuf* buf, void SerializeRedisRequest(butil::IOBuf* buf,
Controller* cntl, Controller* cntl,
const google::protobuf::Message* request) { const google::protobuf::Message* request) {
......
...@@ -33,6 +33,13 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket *socket, bool read_eo ...@@ -33,6 +33,13 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket *socket, bool read_eo
// Actions to a redis response. // Actions to a redis response.
void ProcessRedisResponse(InputMessageBase* msg); void ProcessRedisResponse(InputMessageBase* msg);
// Actions to a redis request, which is left unimplemented.
// All requests are processed in execution queue pushed in
// the parsing process. This function must be declared since
// server only enables redis as a server-side protocol when
// this function is declared.
void ProcessRedisRequest(InputMessageBase* msg);
// Serialize a redis request. // Serialize a redis request.
void SerializeRedisRequest(butil::IOBuf* buf, void SerializeRedisRequest(butil::IOBuf* buf,
Controller* cntl, Controller* cntl,
......
...@@ -17,9 +17,10 @@ ...@@ -17,9 +17,10 @@
// Authors: Ge,Jun (gejun@baidu.com) // Authors: Ge,Jun (gejun@baidu.com)
#include <google/protobuf/reflection_ops.h> // ReflectionOps::Merge #include <google/protobuf/reflection_ops.h> // ReflectionOps::Merge
#include <gflags/gflags.h> #include <gflags/gflags.h>
#include "butil/status.h" #include "butil/status.h"
#include "butil/strings/string_util.h" // StringToLowerASCII
#include "brpc/redis.h" #include "brpc/redis.h"
#include "brpc/redis_command.h" #include "brpc/redis_command.h"
...@@ -239,12 +240,13 @@ std::ostream& operator<<(std::ostream& os, const RedisRequest& r) { ...@@ -239,12 +240,13 @@ std::ostream& operator<<(std::ostream& os, const RedisRequest& r) {
} }
RedisResponse::RedisResponse() RedisResponse::RedisResponse()
: ::google::protobuf::Message() { : ::google::protobuf::Message()
, _first_reply(&_arena) {
SharedCtor(); SharedCtor();
} }
RedisResponse::RedisResponse(const RedisResponse& from) RedisResponse::RedisResponse(const RedisResponse& from)
: ::google::protobuf::Message() { : ::google::protobuf::Message()
, _first_reply(&_arena) {
SharedCtor(); SharedCtor();
MergeFrom(from); MergeFrom(from);
} }
...@@ -315,7 +317,7 @@ void RedisResponse::MergeFrom(const RedisResponse& from) { ...@@ -315,7 +317,7 @@ void RedisResponse::MergeFrom(const RedisResponse& from) {
} }
_cached_size_ += from._cached_size_; _cached_size_ += from._cached_size_;
if (_nreply == 0) { if (_nreply == 0) {
_first_reply.CopyFromDifferentArena(from._first_reply, &_arena); _first_reply.CopyFromDifferentArena(from._first_reply);
} }
const int new_nreply = _nreply + from._nreply; const int new_nreply = _nreply + from._nreply;
if (new_nreply == 1) { if (new_nreply == 1) {
...@@ -325,7 +327,7 @@ void RedisResponse::MergeFrom(const RedisResponse& from) { ...@@ -325,7 +327,7 @@ void RedisResponse::MergeFrom(const RedisResponse& from) {
RedisReply* new_others = RedisReply* new_others =
(RedisReply*)_arena.allocate(sizeof(RedisReply) * (new_nreply - 1)); (RedisReply*)_arena.allocate(sizeof(RedisReply) * (new_nreply - 1));
for (int i = 0; i < new_nreply - 1; ++i) { for (int i = 0; i < new_nreply - 1; ++i) {
new (new_others + i) RedisReply; new (new_others + i) RedisReply(&_arena);
} }
int new_other_index = 0; int new_other_index = 0;
for (int i = 1; i < _nreply; ++i) { for (int i = 1; i < _nreply; ++i) {
...@@ -333,8 +335,7 @@ void RedisResponse::MergeFrom(const RedisResponse& from) { ...@@ -333,8 +335,7 @@ void RedisResponse::MergeFrom(const RedisResponse& from) {
_other_replies[i - 1]); _other_replies[i - 1]);
} }
for (int i = !_nreply; i < from._nreply; ++i) { for (int i = !_nreply; i < from._nreply; ++i) {
new_others[new_other_index++].CopyFromDifferentArena( new_others[new_other_index++].CopyFromDifferentArena(from.reply(i));
from.reply(i), &_arena);
} }
DCHECK_EQ(new_nreply - 1, new_other_index); DCHECK_EQ(new_nreply - 1, new_other_index);
_other_replies = new_others; _other_replies = new_others;
...@@ -383,7 +384,7 @@ const ::google::protobuf::Descriptor* RedisResponse::descriptor() { ...@@ -383,7 +384,7 @@ const ::google::protobuf::Descriptor* RedisResponse::descriptor() {
ParseError RedisResponse::ConsumePartialIOBuf(butil::IOBuf& buf, int reply_count) { ParseError RedisResponse::ConsumePartialIOBuf(butil::IOBuf& buf, int reply_count) {
size_t oldsize = buf.size(); size_t oldsize = buf.size();
if (reply_size() == 0) { if (reply_size() == 0) {
ParseError err = _first_reply.ConsumePartialIOBuf(buf, &_arena); ParseError err = _first_reply.ConsumePartialIOBuf(buf);
if (err != PARSE_OK) { if (err != PARSE_OK) {
return err; return err;
} }
...@@ -401,11 +402,11 @@ ParseError RedisResponse::ConsumePartialIOBuf(butil::IOBuf& buf, int reply_count ...@@ -401,11 +402,11 @@ ParseError RedisResponse::ConsumePartialIOBuf(butil::IOBuf& buf, int reply_count
return PARSE_ERROR_ABSOLUTELY_WRONG; return PARSE_ERROR_ABSOLUTELY_WRONG;
} }
for (int i = 0; i < reply_count - 1; ++i) { for (int i = 0; i < reply_count - 1; ++i) {
new (&_other_replies[i]) RedisReply; new (&_other_replies[i]) RedisReply(&_arena);
} }
} }
for (int i = reply_size(); i < reply_count; ++i) { for (int i = reply_size(); i < reply_count; ++i) {
ParseError err = _other_replies[i - 1].ConsumePartialIOBuf(buf, &_arena); ParseError err = _other_replies[i - 1].ConsumePartialIOBuf(buf);
if (err != PARSE_OK) { if (err != PARSE_OK) {
return err; return err;
} }
...@@ -435,5 +436,30 @@ std::ostream& operator<<(std::ostream& os, const RedisResponse& response) { ...@@ -435,5 +436,30 @@ std::ostream& operator<<(std::ostream& os, const RedisResponse& response) {
} }
return os; return os;
} }
bool RedisService::AddCommandHandler(const std::string& name, RedisCommandHandler* handler) {
std::string lcname = StringToLowerASCII(name);
auto it = _command_map.find(lcname);
if (it != _command_map.end()) {
LOG(ERROR) << "redis command name=" << name << " exist";
return false;
}
_command_map[lcname] = handler;
return true;
}
RedisCommandHandler* RedisService::FindCommandHandler(const std::string& name) {
std::string lcname = StringToLowerASCII(name);
auto it = _command_map.find(lcname);
if (it != _command_map.end()) {
return it->second;
}
return NULL;
}
RedisCommandHandler* RedisCommandHandler::NewTransactionHandler() {
LOG(ERROR) << "NewTransactionHandler is not implemented";
return NULL;
}
} // namespace brpc } // namespace brpc
...@@ -21,12 +21,16 @@ ...@@ -21,12 +21,16 @@
#define BRPC_REDIS_H #define BRPC_REDIS_H
#include <google/protobuf/message.h> #include <google/protobuf/message.h>
#include <unordered_map>
#include <memory>
#include "butil/iobuf.h" #include "butil/iobuf.h"
#include "butil/strings/string_piece.h" #include "butil/strings/string_piece.h"
#include "butil/arena.h" #include "butil/arena.h"
#include "brpc/proto_base.pb.h" #include "brpc/proto_base.pb.h"
#include "brpc/redis_reply.h" #include "brpc/redis_reply.h"
#include "brpc/parse_result.h" #include "brpc/parse_result.h"
#include "brpc/callback.h"
#include "brpc/socket.h"
namespace brpc { namespace brpc {
...@@ -161,7 +165,7 @@ public: ...@@ -161,7 +165,7 @@ public:
if (index < reply_size()) { if (index < reply_size()) {
return (index == 0 ? _first_reply : _other_replies[index - 1]); return (index == 0 ? _first_reply : _other_replies[index - 1]);
} }
static RedisReply redis_nil; static RedisReply redis_nil(NULL);
return redis_nil; return redis_nil;
} }
...@@ -209,7 +213,73 @@ private: ...@@ -209,7 +213,73 @@ private:
std::ostream& operator<<(std::ostream& os, const RedisRequest&); std::ostream& operator<<(std::ostream& os, const RedisRequest&);
std::ostream& operator<<(std::ostream& os, const RedisResponse&); std::ostream& operator<<(std::ostream& os, const RedisResponse&);
} // namespace brpc class RedisCommandHandler;
// Implement this class and assign an instance to ServerOption.redis_service
// to enable redis support.
class RedisService {
public:
typedef std::unordered_map<std::string, RedisCommandHandler*> CommandMap;
virtual ~RedisService() {}
// Call this function to register `handler` that can handle command `name`.
bool AddCommandHandler(const std::string& name, RedisCommandHandler* handler);
// This function should not be touched by user and used by brpc deverloper only.
RedisCommandHandler* FindCommandHandler(const std::string& name);
private:
CommandMap _command_map;
};
// The Command handler for a redis request. User should impletement Run().
class RedisCommandHandler {
public:
enum Result {
OK = 0,
CONTINUE = 1,
BATCHED = 2,
};
~RedisCommandHandler() {}
// Once Server receives commands, it will first find the corresponding handlers and
// call them sequentially(one by one) according to the order that requests arrive,
// just like what redis-server does.
// `args' is the array of request command. For example, "set somekey somevalue"
// corresponds to args[0]=="set", args[1]=="somekey" and args[2]=="somevalue".
// `output', which should be filled by user, is the content that sent to client side.
// Read brpc/src/redis_reply.h for more usage.
// `flush_batched' indicates whether the user should flush all the results of
// batched commands. If user want to do some batch processing, user should buffer
// the commands and return RedisCommandHandler::BATCHED. Once `flush_batched' is true,
// run all the commands, set `output' to be an array in which every element is the
// result of batched commands and return RedisCommandHandler::OK.
//
// The return value should be RedisCommandHandler::OK for normal cases. If you want
// to implement transaction, return RedisCommandHandler::CONTINUE once server receives
// an start marker and brpc will call MultiTransactionHandler() to new a transaction
// handler that all the following commands are sent to this tranction handler until
// it returns RedisCommandHandler::OK. Read the comment below.
virtual RedisCommandHandler::Result Run(const std::vector<const char*>& args,
brpc::RedisReply* output,
bool flush_batched) = 0;
// The Run() returns CONTINUE for "multi", which makes brpc call this method to
// create a transaction_handler to process following commands until transaction_handler
// returns OK. For example, for command "multi; set k1 v1; set k2 v2; set k3 v3;
// exec":
// 1) First command is "multi" and Run() should return RedisCommandHandler::CONTINUE,
// then brpc calls NewTransactionHandler() to new a transaction_handler.
// 2) brpc calls transaction_handler.Run() with command "set k1 v1",
// which should return CONTINUE.
// 3) brpc calls transaction_handler.Run() with command "set k2 v2",
// which should return CONTINUE.
// 4) brpc calls transaction_handler.Run() with command "set k3 v3",
// which should return CONTINUE.
// 5) An ending marker(exec) is found in transaction_handler.Run(), user exeuctes all
// the commands and return OK. This Transation is done.
virtual RedisCommandHandler* NewTransactionHandler();
};
} // namespace brpc
#endif // BRPC_REDIS_H #endif // BRPC_REDIS_H
...@@ -21,9 +21,6 @@ ...@@ -21,9 +21,6 @@
#include "brpc/log.h" #include "brpc/log.h"
#include "brpc/redis_command.h" #include "brpc/redis_command.h"
// Defined in src/butil/iobuf.cpp
void* fast_memcpy(void *__restrict dest, const void *__restrict src, size_t n);
namespace brpc { namespace brpc {
const size_t CTX_WIDTH = 5; const size_t CTX_WIDTH = 5;
...@@ -360,4 +357,96 @@ butil::Status RedisCommandByComponents(butil::IOBuf* output, ...@@ -360,4 +357,96 @@ butil::Status RedisCommandByComponents(butil::IOBuf* output,
return butil::Status::OK(); return butil::Status::OK();
} }
RedisCommandParser::RedisCommandParser()
: _parsing_array(false)
, _length(0)
, _index(0) {}
ParseError RedisCommandParser::Consume(butil::IOBuf& buf,
std::vector<const char*>* commands,
butil::Arena* arena) {
const char* pfc = (const char*)buf.fetch1();
if (pfc == NULL) {
return PARSE_ERROR_NOT_ENOUGH_DATA;
}
// '*' stands for array "*<size>\r\n<sub-reply1><sub-reply2>..."
if (!_parsing_array && *pfc != '*') {
return PARSE_ERROR_TRY_OTHERS;
}
// '$' stands for bulk string "$<length>\r\n<string>\r\n"
if (_parsing_array && *pfc != '$') {
return PARSE_ERROR_ABSOLUTELY_WRONG;
}
char intbuf[32]; // enough for fc + 64-bit decimal + \r\n
const size_t ncopied = buf.copy_to(intbuf, sizeof(intbuf) - 1);
intbuf[ncopied] = '\0';
const size_t crlf_pos = butil::StringPiece(intbuf, ncopied).find("\r\n");
if (crlf_pos == butil::StringPiece::npos) { // not enough data
return PARSE_ERROR_NOT_ENOUGH_DATA;
}
char* endptr = NULL;
int64_t value = strtoll(intbuf + 1/*skip fc*/, &endptr, 10);
if (endptr != intbuf + crlf_pos) {
LOG(ERROR) << '`' << intbuf + 1 << "' is not a valid 64-bit decimal";
return PARSE_ERROR_ABSOLUTELY_WRONG;
}
if (value <= 0) {
LOG(ERROR) << "Invalid len=" << value << " in redis command";
return PARSE_ERROR_ABSOLUTELY_WRONG;
}
if (!_parsing_array) {
buf.pop_front(crlf_pos + 2/*CRLF*/);
_parsing_array = true;
_length = value;
_index = 0;
_commands.resize(value);
return Consume(buf, commands, arena);
}
CHECK(_index < _length) << "a complete command has been parsed. "
"impl of RedisCommandParser::Parse is buggy";
const int64_t len = value; // `value' is length of the string
if (len < 0) {
LOG(ERROR) << "string in command is nil!";
return PARSE_ERROR_ABSOLUTELY_WRONG;
}
if (len > (int64_t)std::numeric_limits<uint32_t>::max()) {
LOG(ERROR) << "string in command is too long! max length=2^32-1,"
" actually=" << len;
return PARSE_ERROR_ABSOLUTELY_WRONG;
}
if (buf.size() < crlf_pos + 2 + (size_t)len + 2/*CRLF*/) {
return PARSE_ERROR_NOT_ENOUGH_DATA;
}
buf.pop_front(crlf_pos + 2/*CRLF*/);
char* d = (char*)arena->allocate((len/8 + 1) * 8);
buf.cutn(d, len);
d[len] = '\0';
_commands[_index] = d;
if (_index == 0) {
// convert it to lowercase when it is command name
for (int i = 0; i < len; ++i) {
d[i] = ::tolower(d[i]);
}
}
char crlf[2];
buf.cutn(crlf, sizeof(crlf));
if (crlf[0] != '\r' || crlf[1] != '\n') {
LOG(ERROR) << "string in command is not ended with CRLF";
return PARSE_ERROR_ABSOLUTELY_WRONG;
}
if (++_index < _length) {
return Consume(buf, commands, arena);
}
commands->swap(_commands);
Reset();
return PARSE_OK;
}
void RedisCommandParser::Reset() {
_parsing_array = false;
_length = 0;
_index = 0;
_commands.clear();
}
} // namespace brpc } // namespace brpc
...@@ -20,9 +20,12 @@ ...@@ -20,9 +20,12 @@
#ifndef BRPC_REDIS_COMMAND_H #ifndef BRPC_REDIS_COMMAND_H
#define BRPC_REDIS_COMMAND_H #define BRPC_REDIS_COMMAND_H
#include <memory> // std::unique_ptr
#include <vector>
#include "butil/iobuf.h" #include "butil/iobuf.h"
#include "butil/status.h" #include "butil/status.h"
#include "butil/arena.h"
#include "brpc/parse_result.h"
namespace brpc { namespace brpc {
...@@ -40,6 +43,27 @@ butil::Status RedisCommandByComponents(butil::IOBuf* buf, ...@@ -40,6 +43,27 @@ butil::Status RedisCommandByComponents(butil::IOBuf* buf,
const butil::StringPiece* components, const butil::StringPiece* components,
size_t num_components); size_t num_components);
// A parser used to parse redis raw command.
class RedisCommandParser {
public:
RedisCommandParser();
// Parse raw message from `buf'. Return PARSE_OK and set the parsed command
// to `commands' and length to `len' if successful. Memory of commands are
// allocated in `arena'.
ParseError Consume(butil::IOBuf& buf, std::vector<const char*>* commands,
butil::Arena* arena);
private:
// Reset parser to the initial state.
void Reset();
bool _parsing_array; // if the parser has met array indicator '*'
int _length; // array length
int _index; // current parsing array index
std::vector<const char*> _commands; // parsed command string
};
} // namespace brpc } // namespace brpc
......
...@@ -19,11 +19,13 @@ ...@@ -19,11 +19,13 @@
#include <limits> #include <limits>
#include "butil/logging.h" #include "butil/logging.h"
#include "butil/string_printf.h"
#include "brpc/redis_reply.h" #include "brpc/redis_reply.h"
namespace brpc { namespace brpc {
//BAIDU_CASSERT(sizeof(RedisReply) == 24, size_match); //BAIDU_CASSERT(sizeof(RedisReply) == 24, size_match);
const int RedisReply::npos = -1;
const char* RedisReplyTypeToString(RedisReplyType type) { const char* RedisReplyTypeToString(RedisReplyType type) {
switch (type) { switch (type) {
...@@ -37,13 +39,64 @@ const char* RedisReplyTypeToString(RedisReplyType type) { ...@@ -37,13 +39,64 @@ const char* RedisReplyTypeToString(RedisReplyType type) {
} }
} }
ParseError RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* arena) { bool RedisReply::SerializeTo(butil::IOBufAppender* appender) {
switch (_type) {
case REDIS_REPLY_ERROR:
// fall through
case REDIS_REPLY_STATUS:
appender->push_back((_type == REDIS_REPLY_ERROR)? '-' : '+');
if (_length < (int)sizeof(_data.short_str)) {
appender->append(_data.short_str, _length);
} else {
appender->append(_data.long_str, _length);
}
appender->append("\r\n", 2);
return true;
case REDIS_REPLY_INTEGER:
appender->push_back(':');
appender->append_decimal(_data.integer);
appender->append("\r\n", 2);
return true;
case REDIS_REPLY_STRING:
appender->push_back('$');
appender->append_decimal(_length);
appender->append("\r\n", 2);
if (_length != npos) {
if (_length < (int)sizeof(_data.short_str)) {
appender->append(_data.short_str, _length);
} else {
appender->append(_data.long_str, _length);
}
appender->append("\r\n", 2);
}
return true;
case REDIS_REPLY_ARRAY:
appender->push_back('*');
appender->append_decimal(_length);
appender->append("\r\n", 2);
if (_length != npos) {
for (int i = 0; i < _length; ++i) {
if (!_data.array.replies[i].SerializeTo(appender)) {
return false;
}
}
}
return true;
case REDIS_REPLY_NIL:
LOG(ERROR) << "Do you forget to call SetXXX()?";
return false;
}
CHECK(false) << "unknown redis type=" << _type;
return false;
}
ParseError RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf) {
if (_type == REDIS_REPLY_ARRAY && _data.array.last_index >= 0) { if (_type == REDIS_REPLY_ARRAY && _data.array.last_index >= 0) {
// The parsing was suspended while parsing sub replies, // The parsing was suspended while parsing sub replies,
// continue the parsing. // continue the parsing.
RedisReply* subs = (RedisReply*)_data.array.replies; RedisReply* subs = (RedisReply*)_data.array.replies;
for (uint32_t i = _data.array.last_index; i < _length; ++i) { for (int i = _data.array.last_index; i < _length; ++i) {
ParseError err = subs[i].ConsumePartialIOBuf(buf, arena); ParseError err = subs[i].ConsumePartialIOBuf(buf);
if (err != PARSE_OK) { if (err != PARSE_OK) {
return err; return err;
} }
...@@ -81,7 +134,7 @@ ParseError RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* aren ...@@ -81,7 +134,7 @@ ParseError RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* aren
str.copy_to_cstr(_data.short_str, (size_t)-1L, 1/*skip fc*/); str.copy_to_cstr(_data.short_str, (size_t)-1L, 1/*skip fc*/);
return PARSE_OK; return PARSE_OK;
} }
char* d = (char*)arena->allocate((len/8 + 1)*8); char* d = (char*)_arena->allocate((len/8 + 1)*8);
if (d == NULL) { if (d == NULL) {
LOG(FATAL) << "Fail to allocate string[" << len << "]"; LOG(FATAL) << "Fail to allocate string[" << len << "]";
return PARSE_ERROR_ABSOLUTELY_WRONG; return PARSE_ERROR_ABSOLUTELY_WRONG;
...@@ -141,7 +194,7 @@ ParseError RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* aren ...@@ -141,7 +194,7 @@ ParseError RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* aren
buf.cutn(_data.short_str, len); buf.cutn(_data.short_str, len);
_data.short_str[len] = '\0'; _data.short_str[len] = '\0';
} else { } else {
char* d = (char*)arena->allocate((len/8 + 1)*8); char* d = (char*)_arena->allocate((len/8 + 1)*8);
if (d == NULL) { if (d == NULL) {
LOG(FATAL) << "Fail to allocate string[" << len << "]"; LOG(FATAL) << "Fail to allocate string[" << len << "]";
return PARSE_ERROR_ABSOLUTELY_WRONG; return PARSE_ERROR_ABSOLUTELY_WRONG;
...@@ -183,13 +236,13 @@ ParseError RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* aren ...@@ -183,13 +236,13 @@ ParseError RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* aren
return PARSE_ERROR_ABSOLUTELY_WRONG; return PARSE_ERROR_ABSOLUTELY_WRONG;
} }
// FIXME(gejun): Call allocate_aligned instead. // FIXME(gejun): Call allocate_aligned instead.
RedisReply* subs = (RedisReply*)arena->allocate(sizeof(RedisReply) * count); RedisReply* subs = (RedisReply*)_arena->allocate(sizeof(RedisReply) * count);
if (subs == NULL) { if (subs == NULL) {
LOG(FATAL) << "Fail to allocate RedisReply[" << count << "]"; LOG(FATAL) << "Fail to allocate RedisReply[" << count << "]";
return PARSE_ERROR_ABSOLUTELY_WRONG; return PARSE_ERROR_ABSOLUTELY_WRONG;
} }
for (int64_t i = 0; i < count; ++i) { for (int64_t i = 0; i < count; ++i) {
new (&subs[i]) RedisReply; new (&subs[i]) RedisReply(_arena);
} }
buf.pop_front(crlf_pos + 2/*CRLF*/); buf.pop_front(crlf_pos + 2/*CRLF*/);
_type = REDIS_REPLY_ARRAY; _type = REDIS_REPLY_ARRAY;
...@@ -200,7 +253,7 @@ ParseError RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* aren ...@@ -200,7 +253,7 @@ ParseError RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* aren
// be continued in next calls by tracking _data.array.last_index. // be continued in next calls by tracking _data.array.last_index.
_data.array.last_index = 0; _data.array.last_index = 0;
for (int64_t i = 0; i < count; ++i) { for (int64_t i = 0; i < count; ++i) {
ParseError err = subs[i].ConsumePartialIOBuf(buf, arena); ParseError err = subs[i].ConsumePartialIOBuf(buf);
if (err != PARSE_OK) { if (err != PARSE_OK) {
return err; return err;
} }
...@@ -265,7 +318,7 @@ void RedisReply::Print(std::ostream& os) const { ...@@ -265,7 +318,7 @@ void RedisReply::Print(std::ostream& os) const {
switch (_type) { switch (_type) {
case REDIS_REPLY_STRING: case REDIS_REPLY_STRING:
os << '"'; os << '"';
if (_length < sizeof(_data.short_str)) { if (_length < (int)sizeof(_data.short_str)) {
os << RedisStringPrinter(_data.short_str, _length); os << RedisStringPrinter(_data.short_str, _length);
} else { } else {
os << RedisStringPrinter(_data.long_str, _length); os << RedisStringPrinter(_data.long_str, _length);
...@@ -274,7 +327,7 @@ void RedisReply::Print(std::ostream& os) const { ...@@ -274,7 +327,7 @@ void RedisReply::Print(std::ostream& os) const {
break; break;
case REDIS_REPLY_ARRAY: case REDIS_REPLY_ARRAY:
os << '['; os << '[';
for (uint32_t i = 0; i < _length; ++i) { for (int i = 0; i < _length; ++i) {
if (i != 0) { if (i != 0) {
os << ", "; os << ", ";
} }
...@@ -292,7 +345,7 @@ void RedisReply::Print(std::ostream& os) const { ...@@ -292,7 +345,7 @@ void RedisReply::Print(std::ostream& os) const {
os << "(error) "; os << "(error) ";
// fall through // fall through
case REDIS_REPLY_STATUS: case REDIS_REPLY_STATUS:
if (_length < sizeof(_data.short_str)) { if (_length < (int)sizeof(_data.short_str)) {
os << RedisStringPrinter(_data.short_str, _length); os << RedisStringPrinter(_data.short_str, _length);
} else { } else {
os << RedisStringPrinter(_data.long_str, _length); os << RedisStringPrinter(_data.long_str, _length);
...@@ -304,24 +357,28 @@ void RedisReply::Print(std::ostream& os) const { ...@@ -304,24 +357,28 @@ void RedisReply::Print(std::ostream& os) const {
} }
} }
void RedisReply::CopyFromDifferentArena(const RedisReply& other, void RedisReply::CopyFromDifferentArena(const RedisReply& other) {
butil::Arena* arena) {
_type = other._type; _type = other._type;
_length = other._length; _length = other._length;
switch (_type) { switch (_type) {
case REDIS_REPLY_ARRAY: { case REDIS_REPLY_ARRAY: {
RedisReply* subs = (RedisReply*)arena->allocate(sizeof(RedisReply) * _length); RedisReply* subs = (RedisReply*)_arena->allocate(sizeof(RedisReply) * _length);
if (subs == NULL) { if (subs == NULL) {
LOG(FATAL) << "Fail to allocate RedisReply[" << _length << "]"; LOG(FATAL) << "Fail to allocate RedisReply[" << _length << "]";
return; return;
} }
for (uint32_t i = 0; i < _length; ++i) { for (int i = 0; i < _length; ++i) {
new (&subs[i]) RedisReply; new (&subs[i]) RedisReply(_arena);
} }
_data.array.last_index = other._data.array.last_index; _data.array.last_index = other._data.array.last_index;
if (_data.array.last_index > 0) { if (_data.array.last_index > 0) {
// incomplete state
for (int i = 0; i < _data.array.last_index; ++i) { for (int i = 0; i < _data.array.last_index; ++i) {
subs[i].CopyFromDifferentArena(other._data.array.replies[i], arena); subs[i].CopyFromDifferentArena(other._data.array.replies[i]);
}
} else {
for (int i = 0; i < _length; ++i) {
subs[i].CopyFromDifferentArena(other._data.array.replies[i]);
} }
} }
_data.array.replies = subs; _data.array.replies = subs;
...@@ -337,10 +394,10 @@ void RedisReply::CopyFromDifferentArena(const RedisReply& other, ...@@ -337,10 +394,10 @@ void RedisReply::CopyFromDifferentArena(const RedisReply& other,
case REDIS_REPLY_ERROR: case REDIS_REPLY_ERROR:
// fall through // fall through
case REDIS_REPLY_STATUS: case REDIS_REPLY_STATUS:
if (_length < sizeof(_data.short_str)) { if (_length < (int)sizeof(_data.short_str)) {
memcpy(_data.short_str, other._data.short_str, _length + 1); memcpy(_data.short_str, other._data.short_str, _length + 1);
} else { } else {
char* d = (char*)arena->allocate((_length/8 + 1)*8); char* d = (char*)_arena->allocate((_length/8 + 1)*8);
if (d == NULL) { if (d == NULL) {
LOG(FATAL) << "Fail to allocate string[" << _length << "]"; LOG(FATAL) << "Fail to allocate string[" << _length << "]";
return; return;
...@@ -352,4 +409,56 @@ void RedisReply::CopyFromDifferentArena(const RedisReply& other, ...@@ -352,4 +409,56 @@ void RedisReply::CopyFromDifferentArena(const RedisReply& other,
} }
} }
void RedisReply::SetArray(int size) {
if (!_arena) {
return;
}
if (_type != REDIS_REPLY_NIL) {
Reset();
}
_type = REDIS_REPLY_ARRAY;
if (size < 0) {
LOG(ERROR) << "negative size=" << size << " when calling SetArray";
return;
} else if (size == 0) {
_length = 0;
return;
}
RedisReply* subs = (RedisReply*)_arena->allocate(sizeof(RedisReply) * size);
if (!subs) {
LOG(FATAL) << "Fail to allocate RedisReply[" << size << "]";
return;
}
for (int i = 0; i < size; ++i) {
new (&subs[i]) RedisReply(_arena);
}
_length = size;
_data.array.replies = subs;
}
void RedisReply::SetStringImpl(const butil::StringPiece& str, RedisReplyType type) {
if (!_arena) {
return;
}
if (_type != REDIS_REPLY_NIL) {
Reset();
}
const size_t size = str.size();
if (size < sizeof(_data.short_str)) {
memcpy(_data.short_str, str.data(), size);
_data.short_str[size] = '\0';
} else {
char* d = (char*)_arena->allocate((size/8 + 1) * 8);
if (!d) {
LOG(FATAL) << "Fail to allocate string[" << size << "]";
return;
}
memcpy(d, str.data(), size);
d[size] = '\0';
_data.long_str = d;
}
_type = type;
_length = size;
}
} // namespace brpc } // namespace brpc
...@@ -44,8 +44,9 @@ const char* RedisReplyTypeToString(RedisReplyType); ...@@ -44,8 +44,9 @@ const char* RedisReplyTypeToString(RedisReplyType);
// A reply from redis-server. // A reply from redis-server.
class RedisReply { class RedisReply {
public: public:
// A default constructed reply is a nil. // The initial value for a reply is a nil.
RedisReply(); // All needed memory is allocated on `arena'.
RedisReply(butil::Arena* arena);
// Type of the reply. // Type of the reply.
RedisReplyType type() const { return _type; } RedisReplyType type() const { return _type; }
...@@ -56,6 +57,29 @@ public: ...@@ -56,6 +57,29 @@ public:
bool is_string() const; // True if the reply is a string. bool is_string() const; // True if the reply is a string.
bool is_array() const; // True if the reply is an array. bool is_array() const; // True if the reply is an array.
// Set the reply to the null string.
void SetNullString();
// Set the reply to the null array.
void SetNullArray();
// Set the reply to the array with `size' elements. After calling
// SetArray, use operator[] to visit sub replies and set their
// value.
void SetArray(int size);
// Set the reply to status message `str'.
void SetStatus(const butil::StringPiece& str);
// Set the reply to error message `str'.
void SetError(const butil::StringPiece& str);
// Set the reply to integer `value'.
void SetInteger(int64_t value);
// Set the reply to string `str'.
void SetString(const butil::StringPiece& str);
// Convert the reply into a signed 64-bit integer(according to // Convert the reply into a signed 64-bit integer(according to
// http://redis.io/topics/protocol). If the reply is not an integer, // http://redis.io/topics/protocol). If the reply is not an integer,
// call stacks are logged and 0 is returned. // call stacks are logged and 0 is returned.
...@@ -74,15 +98,16 @@ public: ...@@ -74,15 +98,16 @@ public:
// If you need a std::string, call .data().as_string() (which allocates mem) // If you need a std::string, call .data().as_string() (which allocates mem)
butil::StringPiece data() const; butil::StringPiece data() const;
// Return number of sub replies in the array. If this reply is not an array, // Return number of sub replies in the array if this reply is an array, or
// 0 is returned (call stacks are not logged). // return the length of string if this reply is a string, otherwise 0 is
// returned (call stacks are not logged).
size_t size() const; size_t size() const;
// Get the index-th sub reply. If this reply is not an array, a nil reply // Get the index-th sub reply. If this reply is not an array or index is out of
// is returned (call stacks are not logged) // range, a nil reply is returned (call stacks are not logged)
const RedisReply& operator[](size_t index) const; const RedisReply& operator[](size_t index) const;
RedisReply& operator[](size_t index);
// Parse from `buf' which may be incomplete and allocate needed memory // Parse from `buf' which may be incomplete.
// on `arena'.
// Returns PARSE_OK when an intact reply is parsed and cut off from `buf'. // Returns PARSE_OK when an intact reply is parsed and cut off from `buf'.
// Returns PARSE_ERROR_NOT_ENOUGH_DATA if data in `buf' is not enough to parse, // Returns PARSE_ERROR_NOT_ENOUGH_DATA if data in `buf' is not enough to parse,
// and `buf' is guaranteed to be UNCHANGED so that you can call this // and `buf' is guaranteed to be UNCHANGED so that you can call this
...@@ -92,7 +117,10 @@ public: ...@@ -92,7 +117,10 @@ public:
// reply. As a contrast, if the parsing needs `buf' to be intact, // reply. As a contrast, if the parsing needs `buf' to be intact,
// the complexity in worst case may be O(N^2). // the complexity in worst case may be O(N^2).
// Returns PARSE_ERROR_ABSOLUTELY_WRONG if the parsing failed. // Returns PARSE_ERROR_ABSOLUTELY_WRONG if the parsing failed.
ParseError ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* arena); ParseError ConsumePartialIOBuf(butil::IOBuf& buf);
// Serialize to iobuf appender using redis protocol
bool SerializeTo(butil::IOBufAppender* appender);
// Swap internal fields with another reply. // Swap internal fields with another reply.
void Swap(RedisReply& other); void Swap(RedisReply& other);
...@@ -103,21 +131,24 @@ public: ...@@ -103,21 +131,24 @@ public:
// Print fields into ostream // Print fields into ostream
void Print(std::ostream& os) const; void Print(std::ostream& os) const;
// Copy from another reply allocating on a different Arena, and allocate // Copy from another reply allocating on `_arena', which is a deep copy.
// required memory with `self_arena'. void CopyFromDifferentArena(const RedisReply& other);
void CopyFromDifferentArena(const RedisReply& other,
butil::Arena* self_arena);
// Copy from another reply allocating on a same Arena. // Copy from another reply allocating on a same Arena, which is a shallow copy.
void CopyFromSameArena(const RedisReply& other); void CopyFromSameArena(const RedisReply& other);
private: private:
static const int npos;
// RedisReply does not own the memory of fields, copying must be done // RedisReply does not own the memory of fields, copying must be done
// by calling CopyFrom[Different|Same]Arena. // by calling CopyFrom[Different|Same]Arena.
DISALLOW_COPY_AND_ASSIGN(RedisReply); DISALLOW_COPY_AND_ASSIGN(RedisReply);
void SetStringImpl(const butil::StringPiece& str, RedisReplyType type);
void Reset();
RedisReplyType _type; RedisReplyType _type;
uint32_t _length; // length of short_str/long_str, count of replies int _length; // length of short_str/long_str, count of replies
union { union {
int64_t integer; int64_t integer;
char short_str[16]; char short_str[16];
...@@ -128,6 +159,7 @@ private: ...@@ -128,6 +159,7 @@ private:
} array; } array;
uint64_t padding[2]; // For swapping, must cover all bytes. uint64_t padding[2]; // For swapping, must cover all bytes.
} _data; } _data;
butil::Arena* _arena;
}; };
// =========== inline impl. ============== // =========== inline impl. ==============
...@@ -137,14 +169,22 @@ inline std::ostream& operator<<(std::ostream& os, const RedisReply& r) { ...@@ -137,14 +169,22 @@ inline std::ostream& operator<<(std::ostream& os, const RedisReply& r) {
return os; return os;
} }
inline RedisReply::RedisReply() inline void RedisReply::Reset() {
: _type(REDIS_REPLY_NIL) _type = REDIS_REPLY_NIL;
, _length(0) { _length = 0;
_data.array.last_index = -1; _data.array.last_index = -1;
_data.array.replies = NULL; _data.array.replies = NULL;
// _arena should not be reset because further memory allocation needs it.
}
inline RedisReply::RedisReply(butil::Arena* arena)
: _arena(arena) {
Reset();
} }
inline bool RedisReply::is_nil() const { return _type == REDIS_REPLY_NIL; } inline bool RedisReply::is_nil() const {
return (_type == REDIS_REPLY_NIL || _length == npos);
}
inline bool RedisReply::is_error() const { return _type == REDIS_REPLY_ERROR; } inline bool RedisReply::is_error() const { return _type == REDIS_REPLY_ERROR; }
inline bool RedisReply::is_integer() const { return _type == REDIS_REPLY_INTEGER; } inline bool RedisReply::is_integer() const { return _type == REDIS_REPLY_INTEGER; }
inline bool RedisReply::is_string() const inline bool RedisReply::is_string() const
...@@ -160,9 +200,46 @@ inline int64_t RedisReply::integer() const { ...@@ -160,9 +200,46 @@ inline int64_t RedisReply::integer() const {
return 0; return 0;
} }
inline void RedisReply::SetNullArray() {
if (_type != REDIS_REPLY_NIL) {
Reset();
}
_type = REDIS_REPLY_ARRAY;
_length = npos;
}
inline void RedisReply::SetNullString() {
if (_type != REDIS_REPLY_NIL) {
Reset();
}
_type = REDIS_REPLY_STRING;
_length = npos;
}
inline void RedisReply::SetStatus(const butil::StringPiece& str) {
return SetStringImpl(str, REDIS_REPLY_STATUS);
}
inline void RedisReply::SetError(const butil::StringPiece& str) {
return SetStringImpl(str, REDIS_REPLY_ERROR);
}
inline void RedisReply::SetInteger(int64_t value) {
if (_type != REDIS_REPLY_NIL) {
Reset();
}
_type = REDIS_REPLY_INTEGER;
_length = 0;
_data.integer = value;
}
inline void RedisReply::SetString(const butil::StringPiece& str) {
return SetStringImpl(str, REDIS_REPLY_STRING);
}
inline const char* RedisReply::c_str() const { inline const char* RedisReply::c_str() const {
if (is_string()) { if (is_string()) {
if (_length < sizeof(_data.short_str)) { // SSO if (_length < (int)sizeof(_data.short_str)) { // SSO
return _data.short_str; return _data.short_str;
} else { } else {
return _data.long_str; return _data.long_str;
...@@ -175,7 +252,7 @@ inline const char* RedisReply::c_str() const { ...@@ -175,7 +252,7 @@ inline const char* RedisReply::c_str() const {
inline butil::StringPiece RedisReply::data() const { inline butil::StringPiece RedisReply::data() const {
if (is_string()) { if (is_string()) {
if (_length < sizeof(_data.short_str)) { // SSO if (_length < (int)sizeof(_data.short_str)) { // SSO
return butil::StringPiece(_data.short_str, _length); return butil::StringPiece(_data.short_str, _length);
} else { } else {
return butil::StringPiece(_data.long_str, _length); return butil::StringPiece(_data.long_str, _length);
...@@ -188,7 +265,7 @@ inline butil::StringPiece RedisReply::data() const { ...@@ -188,7 +265,7 @@ inline butil::StringPiece RedisReply::data() const {
inline const char* RedisReply::error_message() const { inline const char* RedisReply::error_message() const {
if (is_error()) { if (is_error()) {
if (_length < sizeof(_data.short_str)) { // SSO if (_length < (int)sizeof(_data.short_str)) { // SSO
return _data.short_str; return _data.short_str;
} else { } else {
return _data.long_str; return _data.long_str;
...@@ -200,14 +277,19 @@ inline const char* RedisReply::error_message() const { ...@@ -200,14 +277,19 @@ inline const char* RedisReply::error_message() const {
} }
inline size_t RedisReply::size() const { inline size_t RedisReply::size() const {
return (is_array() ? _length : 0); return _length;
}
inline RedisReply& RedisReply::operator[](size_t index) {
return const_cast<RedisReply&>(
const_cast<const RedisReply*>(this)->operator[](index));
} }
inline const RedisReply& RedisReply::operator[](size_t index) const { inline const RedisReply& RedisReply::operator[](size_t index) const {
if (is_array() && index < _length) { if (is_array() && index < (size_t)_length) {
return _data.array.replies[index]; return _data.array.replies[index];
} }
static RedisReply redis_nil; static RedisReply redis_nil(NULL);
return redis_nil; return redis_nil;
} }
...@@ -216,6 +298,7 @@ inline void RedisReply::Swap(RedisReply& other) { ...@@ -216,6 +298,7 @@ inline void RedisReply::Swap(RedisReply& other) {
std::swap(_length, other._length); std::swap(_length, other._length);
std::swap(_data.padding[0], other._data.padding[0]); std::swap(_data.padding[0], other._data.padding[0]);
std::swap(_data.padding[1], other._data.padding[1]); std::swap(_data.padding[1], other._data.padding[1]);
std::swap(_arena, other._arena);
} }
inline void RedisReply::Clear() { inline void RedisReply::Clear() {
...@@ -223,6 +306,7 @@ inline void RedisReply::Clear() { ...@@ -223,6 +306,7 @@ inline void RedisReply::Clear() {
_length = 0; _length = 0;
_data.array.last_index = -1; _data.array.last_index = -1;
_data.array.replies = NULL; _data.array.replies = NULL;
// _arena should not be cleared because it may be shared between RedisReply;
} }
inline void RedisReply::CopyFromSameArena(const RedisReply& other) { inline void RedisReply::CopyFromSameArena(const RedisReply& other) {
...@@ -230,6 +314,7 @@ inline void RedisReply::CopyFromSameArena(const RedisReply& other) { ...@@ -230,6 +314,7 @@ inline void RedisReply::CopyFromSameArena(const RedisReply& other) {
_length = other._length; _length = other._length;
_data.padding[0] = other._data.padding[0]; _data.padding[0] = other._data.padding[0];
_data.padding[1] = other._data.padding[1]; _data.padding[1] = other._data.padding[1];
_arena = other._arena;
} }
} // namespace brpc } // namespace brpc
......
...@@ -142,7 +142,8 @@ ServerOptions::ServerOptions() ...@@ -142,7 +142,8 @@ ServerOptions::ServerOptions()
, has_builtin_services(true) , has_builtin_services(true)
, http_master_service(NULL) , http_master_service(NULL)
, health_reporter(NULL) , health_reporter(NULL)
, rtmp_service(NULL) { , rtmp_service(NULL)
, redis_service(NULL) {
if (s_ncore > 0) { if (s_ncore > 0) {
num_threads = s_ncore + 1; num_threads = s_ncore + 1;
} }
...@@ -1588,7 +1589,8 @@ void Server::GenerateVersionIfNeeded() { ...@@ -1588,7 +1589,8 @@ void Server::GenerateVersionIfNeeded() {
if (!_version.empty()) { if (!_version.empty()) {
return; return;
} }
int extra_count = !!_options.nshead_service + !!_options.rtmp_service + !!_options.thrift_service; int extra_count = !!_options.nshead_service + !!_options.rtmp_service +
!!_options.thrift_service + !!_options.redis_service;
_version.reserve((extra_count + service_count()) * 20); _version.reserve((extra_count + service_count()) * 20);
for (ServiceMap::const_iterator it = _fullname_service_map.begin(); for (ServiceMap::const_iterator it = _fullname_service_map.begin();
it != _fullname_service_map.end(); ++it) { it != _fullname_service_map.end(); ++it) {
...@@ -1621,6 +1623,13 @@ void Server::GenerateVersionIfNeeded() { ...@@ -1621,6 +1623,13 @@ void Server::GenerateVersionIfNeeded() {
} }
_version.append(butil::class_name_str(*_options.rtmp_service)); _version.append(butil::class_name_str(*_options.rtmp_service));
} }
if (_options.redis_service) {
if (!_version.empty()) {
_version.push_back('+');
}
_version.append(butil::class_name_str(*_options.redis_service));
}
} }
static std::string ExpandPath(const std::string &path) { static std::string ExpandPath(const std::string &path) {
......
...@@ -42,6 +42,7 @@ ...@@ -42,6 +42,7 @@
#include "brpc/health_reporter.h" #include "brpc/health_reporter.h"
#include "brpc/adaptive_max_concurrency.h" #include "brpc/adaptive_max_concurrency.h"
#include "brpc/http2.h" #include "brpc/http2.h"
#include "brpc/redis.h"
namespace brpc { namespace brpc {
...@@ -53,6 +54,7 @@ class SimpleDataPool; ...@@ -53,6 +54,7 @@ class SimpleDataPool;
class MongoServiceAdaptor; class MongoServiceAdaptor;
class RestfulMap; class RestfulMap;
class RtmpService; class RtmpService;
class RedisService;
struct SocketSSLContext; struct SocketSSLContext;
struct ServerOptions { struct ServerOptions {
...@@ -235,6 +237,10 @@ struct ServerOptions { ...@@ -235,6 +237,10 @@ struct ServerOptions {
// Customize parameters of HTTP2, defined in http2.h // Customize parameters of HTTP2, defined in http2.h
H2Settings h2_settings; H2Settings h2_settings;
// For processing Redis connections. Read src/brpc/redis.h for details.
// Default: NULL (disabled)
RedisService* redis_service;
private: private:
// SSLOptions is large and not often used, allocate it on heap to // SSLOptions is large and not often used, allocate it on heap to
// prevent ServerOptions from being bloated in most cases. // prevent ServerOptions from being bloated in most cases.
......
...@@ -669,6 +669,11 @@ public: ...@@ -669,6 +669,11 @@ public:
// Returns 0 on success, -1 otherwise. // Returns 0 on success, -1 otherwise.
int append(const void* data, size_t n); int append(const void* data, size_t n);
int append(const butil::StringPiece& str); int append(const butil::StringPiece& str);
// Format integer |d| to back side of the internal buffer, which is much faster
// than snprintf(..., "%lu", d).
// Returns 0 on success, -1 otherwise.
int append_decimal(long d);
// Push the character to back side of the internal buffer. // Push the character to back side of the internal buffer.
// Costs ~3ns while IOBuf.push_back costs ~13ns on Intel(R) Xeon(R) CPU // Costs ~3ns while IOBuf.push_back costs ~13ns on Intel(R) Xeon(R) CPU
......
...@@ -285,6 +285,25 @@ inline int IOBufAppender::append(const StringPiece& str) { ...@@ -285,6 +285,25 @@ inline int IOBufAppender::append(const StringPiece& str) {
return append(str.data(), str.size()); return append(str.data(), str.size());
} }
inline int IOBufAppender::append_decimal(long d) {
char buf[24]; // enough for decimal 64-bit integers
size_t n = sizeof(buf);
bool negative = false;
if (d < 0) {
negative = true;
d = -d;
}
do {
const long q = d / 10;
buf[--n] = d - q * 10 + '0';
d = q;
} while (d);
if (negative) {
buf[--n] = '-';
}
return append(buf + n, sizeof(buf) - n);
}
inline int IOBufAppender::push_back(char c) { inline int IOBufAppender::push_back(char c) {
if (_data == _data_end) { if (_data == _data_end) {
if (add_block() != 0) { if (add_block() != 0) {
......
...@@ -137,5 +137,4 @@ int string_vprintf(std::string* output, const char* format, va_list args) { ...@@ -137,5 +137,4 @@ int string_vprintf(std::string* output, const char* format, va_list args) {
return rc; return rc;
}; };
} // namespace butil } // namespace butil
...@@ -45,7 +45,6 @@ int string_appendf(std::string* output, const char* format, ...) ...@@ -45,7 +45,6 @@ int string_appendf(std::string* output, const char* format, ...)
// Returns 0 on success, -1 otherwise. // Returns 0 on success, -1 otherwise.
int string_vappendf(std::string* output, const char* format, va_list args); int string_vappendf(std::string* output, const char* format, va_list args);
} // namespace butil } // namespace butil
#endif // BUTIL_STRING_PRINTF_H #endif // BUTIL_STRING_PRINTF_H
...@@ -53,7 +53,7 @@ endif() ...@@ -53,7 +53,7 @@ endif()
set(CMAKE_CPP_FLAGS "${DEFINE_CLOCK_GETTIME} -DBRPC_WITH_GLOG=${WITH_GLOG_VAL} -DGFLAGS_NS=${GFLAGS_NS}") set(CMAKE_CPP_FLAGS "${DEFINE_CLOCK_GETTIME} -DBRPC_WITH_GLOG=${WITH_GLOG_VAL} -DGFLAGS_NS=${GFLAGS_NS}")
set(CMAKE_CPP_FLAGS "${CMAKE_CPP_FLAGS} -DBTHREAD_USE_FAST_PTHREAD_MUTEX -D__const__= -D_GNU_SOURCE -DUSE_SYMBOLIZE -DNO_TCMALLOC -D__STDC_FORMAT_MACROS -D__STDC_LIMIT_MACROS -D__STDC_CONSTANT_MACROS -DUNIT_TEST -Dprivate=public -Dprotected=public -DBVAR_NOT_LINK_DEFAULT_VARIABLES -D__STRICT_ANSI__ -include ${PROJECT_SOURCE_DIR}/test/sstream_workaround.h") set(CMAKE_CPP_FLAGS "${CMAKE_CPP_FLAGS} -DBTHREAD_USE_FAST_PTHREAD_MUTEX -D__const__= -D_GNU_SOURCE -DUSE_SYMBOLIZE -DNO_TCMALLOC -D__STDC_FORMAT_MACROS -D__STDC_LIMIT_MACROS -D__STDC_CONSTANT_MACROS -DUNIT_TEST -Dprivate=public -Dprotected=public -DBVAR_NOT_LINK_DEFAULT_VARIABLES -D__STRICT_ANSI__ -include ${PROJECT_SOURCE_DIR}/test/sstream_workaround.h")
set(CMAKE_CXX_FLAGS "${CMAKE_CPP_FLAGS} -O2 -pipe -Wall -W -fPIC -fstrict-aliasing -Wno-invalid-offsetof -Wno-unused-parameter -fno-omit-frame-pointer") set(CMAKE_CXX_FLAGS "${CMAKE_CPP_FLAGS} -g -O2 -pipe -Wall -W -fPIC -fstrict-aliasing -Wno-invalid-offsetof -Wno-unused-parameter -fno-omit-frame-pointer")
use_cxx11() use_cxx11()
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU") if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
......
...@@ -17,11 +17,14 @@ ...@@ -17,11 +17,14 @@
#include <iostream> #include <iostream>
#include "butil/time.h" #include <unordered_map>
#include "butil/logging.h" #include <butil/time.h>
#include <butil/logging.h>
#include <brpc/redis.h> #include <brpc/redis.h>
#include <brpc/channel.h> #include <brpc/channel.h>
#include <brpc/policy/redis_authenticator.h> #include <brpc/policy/redis_authenticator.h>
#include <brpc/server.h>
#include <brpc/redis_command.h>
#include <gtest/gtest.h> #include <gtest/gtest.h>
namespace brpc { namespace brpc {
...@@ -547,4 +550,648 @@ TEST_F(RedisTest, quote_and_escape) { ...@@ -547,4 +550,648 @@ TEST_F(RedisTest, quote_and_escape) {
request.Clear(); request.Clear();
} }
std::string GetCompleteCommand(const std::vector<const char*>& commands) {
std::string res;
for (int i = 0; i < (int)commands.size(); ++i) {
if (i != 0) {
res.push_back(' ');
}
res.append(commands[i]);
}
return res;
}
TEST_F(RedisTest, command_parser) {
brpc::RedisCommandParser parser;
butil::IOBuf buf;
std::vector<const char*> command_out;
butil::Arena arena;
{
// parse from whole command
std::string command = "set abc edc";
ASSERT_TRUE(brpc::RedisCommandNoFormat(&buf, command.c_str()).ok());
ASSERT_EQ(brpc::PARSE_OK, parser.Consume(buf, &command_out, &arena));
ASSERT_TRUE(buf.empty());
ASSERT_STREQ(command.c_str(), GetCompleteCommand(command_out).c_str());
}
{
// simulate parsing from network
int t = 100;
std::string raw_string("*3\r\n$3\r\nset\r\n$3\r\nabc\r\n$3\r\ndef\r\n");
int size = raw_string.size();
while (t--) {
for (int i = 0; i < size; ++i) {
buf.push_back(raw_string[i]);
if (i == size - 1) {
ASSERT_EQ(brpc::PARSE_OK, parser.Consume(buf, &command_out, &arena));
} else {
if (butil::fast_rand_less_than(2) == 0) {
ASSERT_EQ(brpc::PARSE_ERROR_NOT_ENOUGH_DATA,
parser.Consume(buf, &command_out, &arena));
}
}
}
ASSERT_TRUE(buf.empty());
ASSERT_STREQ(GetCompleteCommand(command_out).c_str(), "set abc def");
}
}
{
// there is a non-string message in command and parse should fail
buf.append("*3\r\n$3");
ASSERT_EQ(brpc::PARSE_ERROR_NOT_ENOUGH_DATA, parser.Consume(buf, &command_out, &arena));
ASSERT_EQ((int)buf.size(), 2); // left "$3"
buf.append("\r\nset\r\n:123\r\n$3\r\ndef\r\n");
ASSERT_EQ(brpc::PARSE_ERROR_ABSOLUTELY_WRONG, parser.Consume(buf, &command_out, &arena));
parser.Reset();
}
{
// not array
buf.append(":123456\r\n");
ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Consume(buf, &command_out, &arena));
parser.Reset();
}
{
// not array
buf.append("+Error\r\n");
ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Consume(buf, &command_out, &arena));
parser.Reset();
}
{
// not array
buf.append("+OK\r\n");
ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Consume(buf, &command_out, &arena));
parser.Reset();
}
{
// not array
buf.append("$5\r\nhello\r\n");
ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Consume(buf, &command_out, &arena));
parser.Reset();
}
}
TEST_F(RedisTest, redis_reply_codec) {
butil::Arena arena;
// status
{
brpc::RedisReply r(&arena);
butil::IOBuf buf;
butil::IOBufAppender appender;
r.SetStatus("OK");
ASSERT_TRUE(r.SerializeTo(&appender));
appender.move_to(buf);
ASSERT_STREQ(buf.to_string().c_str(), "+OK\r\n");
ASSERT_STREQ(r.c_str(), "OK");
r.Clear();
brpc::ParseError err = r.ConsumePartialIOBuf(buf);
ASSERT_EQ(err, brpc::PARSE_OK);
ASSERT_TRUE(r.is_string());
ASSERT_STREQ("OK", r.c_str());
}
// error
{
brpc::RedisReply r(&arena);
butil::IOBuf buf;
butil::IOBufAppender appender;
r.SetError("not exist \'key\'");
ASSERT_TRUE(r.SerializeTo(&appender));
appender.move_to(buf);
ASSERT_STREQ(buf.to_string().c_str(), "-not exist \'key\'\r\n");
r.Clear();
brpc::ParseError err = r.ConsumePartialIOBuf(buf);
ASSERT_EQ(err, brpc::PARSE_OK);
ASSERT_TRUE(r.is_error());
ASSERT_STREQ("not exist \'key\'", r.error_message());
}
// string
{
brpc::RedisReply r(&arena);
butil::IOBuf buf;
butil::IOBufAppender appender;
r.SetNullString();
ASSERT_TRUE(r.SerializeTo(&appender));
appender.move_to(buf);
ASSERT_STREQ(buf.to_string().c_str(), "$-1\r\n");
r.Clear();
brpc::ParseError err = r.ConsumePartialIOBuf(buf);
ASSERT_EQ(err, brpc::PARSE_OK);
ASSERT_TRUE(r.is_nil());
r.Clear();
r.SetString("abcde'hello world");
ASSERT_TRUE(r.SerializeTo(&appender));
appender.move_to(buf);
ASSERT_STREQ(buf.to_string().c_str(), "$17\r\nabcde'hello world\r\n");
ASSERT_STREQ(r.c_str(), "abcde'hello world");
r.Clear();
err = r.ConsumePartialIOBuf(buf);
ASSERT_EQ(err, brpc::PARSE_OK);
ASSERT_TRUE(r.is_string());
ASSERT_STREQ(r.c_str(), "abcde'hello world");
}
// integer
{
brpc::RedisReply r(&arena);
butil::IOBuf buf;
butil::IOBufAppender appender;
int t = 2;
int input[] = { -1, 1234567 };
const char* output[] = { ":-1\r\n", ":1234567\r\n" };
for (int i = 0; i < t; ++i) {
r.Clear();
r.SetInteger(input[i]);
ASSERT_TRUE(r.SerializeTo(&appender));
appender.move_to(buf);
ASSERT_STREQ(buf.to_string().c_str(), output[i]);
r.Clear();
brpc::ParseError err = r.ConsumePartialIOBuf(buf);
ASSERT_EQ(err, brpc::PARSE_OK);
ASSERT_TRUE(r.is_integer());
ASSERT_EQ(r.integer(), input[i]);
}
}
// array
{
brpc::RedisReply r(&arena);
butil::IOBuf buf;
butil::IOBufAppender appender;
r.SetArray(3);
brpc::RedisReply& sub_reply = r[0];
sub_reply.SetArray(2);
sub_reply[0].SetString("hello, it's me");
sub_reply[1].SetInteger(422);
r[1].SetString("To go over everything");
r[2].SetInteger(1);
ASSERT_TRUE(r[3].is_nil());
ASSERT_TRUE(r.SerializeTo(&appender));
appender.move_to(buf);
ASSERT_STREQ(buf.to_string().c_str(),
"*3\r\n*2\r\n$14\r\nhello, it's me\r\n:422\r\n$21\r\n"
"To go over everything\r\n:1\r\n");
r.Clear();
ASSERT_EQ(r.ConsumePartialIOBuf(buf), brpc::PARSE_OK);
ASSERT_TRUE(r.is_array());
ASSERT_EQ(3ul, r.size());
ASSERT_TRUE(r[0].is_array());
ASSERT_EQ(2ul, r[0].size());
ASSERT_TRUE(r[0][0].is_string());
ASSERT_STREQ(r[0][0].c_str(), "hello, it's me");
ASSERT_TRUE(r[0][1].is_integer());
ASSERT_EQ(r[0][1].integer(), 422);
ASSERT_TRUE(r[1].is_string());
ASSERT_STREQ(r[1].c_str(), "To go over everything");
ASSERT_TRUE(r[2].is_integer());
ASSERT_EQ(1, r[2].integer());
r.Clear();
// null array
r.SetNullArray();
ASSERT_TRUE(r.SerializeTo(&appender));
appender.move_to(buf);
ASSERT_STREQ(buf.to_string().c_str(), "*-1\r\n");
ASSERT_EQ(r.ConsumePartialIOBuf(buf), brpc::PARSE_OK);
ASSERT_TRUE(r.is_nil());
}
// CopyFromDifferentArena
{
brpc::RedisReply r(&arena);
r.SetArray(1);
brpc::RedisReply& sub_reply = r[0];
sub_reply.SetArray(2);
sub_reply[0].SetString("hello, it's me");
sub_reply[1].SetInteger(422);
brpc::RedisReply r2(&arena);
r2.CopyFromDifferentArena(r);
ASSERT_TRUE(r2.is_array());
ASSERT_EQ((int)r2[0].size(), 2);
ASSERT_STREQ(r2[0][0].c_str(), sub_reply[0].c_str());
ASSERT_EQ(r2[0][1].integer(), sub_reply[1].integer());
}
// SetXXX can be called multiple times.
{
brpc::RedisReply r(&arena);
r.SetStatus("OK");
ASSERT_TRUE(r.is_string());
r.SetNullString();
ASSERT_TRUE(r.is_nil());
r.SetArray(2);
ASSERT_TRUE(r.is_array());
r.SetString("OK");
ASSERT_TRUE(r.is_string());
r.SetError("OK");
ASSERT_TRUE(r.is_error());
r.SetInteger(42);
ASSERT_TRUE(r.is_integer());
}
}
butil::Mutex s_mutex;
std::unordered_map<std::string, std::string> m;
std::unordered_map<std::string, int64_t> int_map;
class RedisServiceImpl : public brpc::RedisService {
public:
RedisServiceImpl()
: _batch_count(0) {}
brpc::RedisCommandHandler::Result OnBatched(const std::vector<const char*> args,
brpc::RedisReply* output, bool flush_batched) {
if (_batched_command.empty() && flush_batched) {
if (strcmp(args[0], "set") == 0) {
DoSet(args[1], args[2], output);
} else if (strcmp(args[0], "get") == 0) {
DoGet(args[1], output);
}
return brpc::RedisCommandHandler::OK;
}
std::vector<std::string> comm;
for (int i = 0; i < (int)args.size(); ++i) {
comm.push_back(args[i]);
}
_batched_command.push_back(comm);
if (flush_batched) {
output->SetArray(_batched_command.size());
for (int i = 0; i < (int)_batched_command.size(); ++i) {
if (_batched_command[i][0] == "set") {
DoSet(_batched_command[i][1], _batched_command[i][2], &(*output)[i]);
} else if (_batched_command[i][0] == "get") {
DoGet(_batched_command[i][1], &(*output)[i]);
}
}
_batch_count++;
_batched_command.clear();
return brpc::RedisCommandHandler::OK;
} else {
return brpc::RedisCommandHandler::BATCHED;
}
}
void DoSet(const std::string& key, const std::string& value, brpc::RedisReply* output) {
m[key] = value;
output->SetStatus("OK");
}
void DoGet(const std::string& key, brpc::RedisReply* output) {
auto it = m.find(key);
if (it != m.end()) {
output->SetString(it->second);
} else {
output->SetNullString();
}
}
std::vector<std::vector<std::string> > _batched_command;
int _batch_count;
};
class SetCommandHandler : public brpc::RedisCommandHandler {
public:
SetCommandHandler(bool batch_process = false)
: rs(NULL)
, _batch_process(batch_process) {}
brpc::RedisCommandHandler::Result Run(const std::vector<const char*>& args,
brpc::RedisReply* output,
bool flush_batched) {
if (args.size() < 3) {
output->SetError("ERR wrong number of arguments for 'set' command");
return brpc::RedisCommandHandler::OK;
}
if (_batch_process) {
return rs->OnBatched(args, output, flush_batched);
} else {
DoSet(args[1], args[2], output);
return brpc::RedisCommandHandler::OK;
}
}
void DoSet(const std::string& key, const std::string& value, brpc::RedisReply* output) {
m[key] = value;
output->SetStatus("OK");
}
RedisServiceImpl* rs;
private:
bool _batch_process;
};
class GetCommandHandler : public brpc::RedisCommandHandler {
public:
GetCommandHandler(bool batch_process = false)
: rs(NULL)
, _batch_process(batch_process) {}
brpc::RedisCommandHandler::Result Run(const std::vector<const char*>& args,
brpc::RedisReply* output,
bool flush_batched) {
if (args.size() < 2) {
output->SetError("ERR wrong number of arguments for 'get' command");
return brpc::RedisCommandHandler::OK;
}
if (_batch_process) {
return rs->OnBatched(args, output, flush_batched);
} else {
DoGet(args[1], output);
return brpc::RedisCommandHandler::OK;
}
}
void DoGet(const std::string& key, brpc::RedisReply* output) {
auto it = m.find(key);
if (it != m.end()) {
output->SetString(it->second);
} else {
output->SetNullString();
}
}
RedisServiceImpl* rs;
private:
bool _batch_process;
};
class IncrCommandHandler : public brpc::RedisCommandHandler {
public:
IncrCommandHandler() {}
brpc::RedisCommandHandler::Result Run(const std::vector<const char*>& args,
brpc::RedisReply* output,
bool flush_batched) {
if (args.size() < 2) {
output->SetError("ERR wrong number of arguments for 'incr' command");
return brpc::RedisCommandHandler::OK;
}
const std::string& key = args[1];
int64_t value;
s_mutex.lock();
value = ++int_map[key];
s_mutex.unlock();
output->SetInteger(value);
return brpc::RedisCommandHandler::OK;
}
};
TEST_F(RedisTest, server_sanity) {
brpc::Server server;
brpc::ServerOptions server_options;
RedisServiceImpl* rsimpl = new RedisServiceImpl;
GetCommandHandler *gh = new GetCommandHandler;
SetCommandHandler *sh = new SetCommandHandler;
IncrCommandHandler *ih = new IncrCommandHandler;
rsimpl->AddCommandHandler("get", gh);
rsimpl->AddCommandHandler("set", sh);
rsimpl->AddCommandHandler("incr", ih);
server_options.redis_service = rsimpl;
brpc::PortRange pr(8081, 8900);
ASSERT_EQ(0, server.Start("127.0.0.1", pr, &server_options));
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_REDIS;
brpc::Channel channel;
ASSERT_EQ(0, channel.Init("127.0.0.1", server.listen_address().port, &options));
brpc::RedisRequest request;
brpc::RedisResponse response;
brpc::Controller cntl;
ASSERT_TRUE(request.AddCommand("get hello"));
ASSERT_TRUE(request.AddCommand("get hello2"));
ASSERT_TRUE(request.AddCommand("set key1 value1"));
ASSERT_TRUE(request.AddCommand("get key1"));
ASSERT_TRUE(request.AddCommand("set key2 value2"));
ASSERT_TRUE(request.AddCommand("get key2"));
ASSERT_TRUE(request.AddCommand("xxxcommand key2"));
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(7, response.reply_size());
ASSERT_EQ(brpc::REDIS_REPLY_NIL, response.reply(0).type());
ASSERT_EQ(brpc::REDIS_REPLY_NIL, response.reply(1).type());
ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(2).type());
ASSERT_STREQ("OK", response.reply(2).c_str());
ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(3).type());
ASSERT_STREQ("value1", response.reply(3).c_str());
ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(4).type());
ASSERT_STREQ("OK", response.reply(4).c_str());
ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(5).type());
ASSERT_STREQ("value2", response.reply(5).c_str());
ASSERT_EQ(brpc::REDIS_REPLY_ERROR, response.reply(6).type());
ASSERT_TRUE(butil::StringPiece(response.reply(6).error_message()).starts_with("ERR unknown command"));
}
void* incr_thread(void* arg) {
brpc::Channel* c = static_cast<brpc::Channel*>(arg);
for (int i = 0; i < 5000; ++i) {
brpc::RedisRequest request;
brpc::RedisResponse response;
brpc::Controller cntl;
EXPECT_TRUE(request.AddCommand("incr count"));
c->CallMethod(NULL, &cntl, &request, &response, NULL);
EXPECT_FALSE(cntl.Failed()) << cntl.ErrorText();
EXPECT_EQ(1, response.reply_size());
EXPECT_TRUE(response.reply(0).is_integer());
}
return NULL;
}
TEST_F(RedisTest, server_concurrency) {
int N = 10;
brpc::Server server;
brpc::ServerOptions server_options;
RedisServiceImpl* rsimpl = new RedisServiceImpl;
IncrCommandHandler *ih = new IncrCommandHandler;
rsimpl->AddCommandHandler("incr", ih);
server_options.redis_service = rsimpl;
brpc::PortRange pr(8081, 8900);
ASSERT_EQ(0, server.Start("0.0.0.0", pr, &server_options));
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_REDIS;
options.connection_type = "pooled";
std::vector<bthread_t> bths;
std::vector<brpc::Channel*> channels;
for (int i = 0; i < N; ++i) {
channels.push_back(new brpc::Channel);
ASSERT_EQ(0, channels.back()->Init("127.0.0.1", server.listen_address().port, &options));
bthread_t bth;
ASSERT_EQ(bthread_start_background(&bth, NULL, incr_thread, channels.back()), 0);
bths.push_back(bth);
}
for (int i = 0; i < N; ++i) {
bthread_join(bths[i], NULL);
delete channels[i];
}
ASSERT_EQ(int_map["count"], 10 * 5000LL);
}
class MultiCommandHandler : public brpc::RedisCommandHandler {
public:
MultiCommandHandler() {}
brpc::RedisCommandHandler::Result Run(const std::vector<const char*>& args,
brpc::RedisReply* output,
bool flush_batched) {
output->SetStatus("OK");
return brpc::RedisCommandHandler::CONTINUE;
}
RedisCommandHandler* NewTransactionHandler() override {
return new MultiTransactionHandler;
}
class MultiTransactionHandler : public brpc::RedisCommandHandler {
public:
brpc::RedisCommandHandler::Result Run(const std::vector<const char*>& args,
brpc::RedisReply* output,
bool flush_batched) {
if (strcmp(args[0], "multi") == 0) {
output->SetError("ERR duplicate multi");
return brpc::RedisCommandHandler::CONTINUE;
}
if (strcmp(args[0], "exec") != 0) {
std::vector<std::string> comm;
for (int i = 0; i < (int)args.size(); ++i) {
comm.push_back(args[i]);
}
_commands.push_back(comm);
output->SetStatus("QUEUED");
return brpc::RedisCommandHandler::CONTINUE;
}
output->SetArray(_commands.size());
s_mutex.lock();
for (size_t i = 0; i < _commands.size(); ++i) {
if (_commands[i][0] == "incr") {
int64_t value;
value = ++int_map[_commands[i][1]];
(*output)[i].SetInteger(value);
} else {
(*output)[i].SetStatus("unknown command");
}
}
s_mutex.unlock();
return brpc::RedisCommandHandler::OK;
}
private:
std::vector<std::vector<std::string> > _commands;
};
};
TEST_F(RedisTest, server_command_continue) {
brpc::Server server;
brpc::ServerOptions server_options;
RedisServiceImpl* rsimpl = new RedisServiceImpl;
rsimpl->AddCommandHandler("get", new GetCommandHandler);
rsimpl->AddCommandHandler("set", new SetCommandHandler);
rsimpl->AddCommandHandler("incr", new IncrCommandHandler);
rsimpl->AddCommandHandler("multi", new MultiCommandHandler);
server_options.redis_service = rsimpl;
brpc::PortRange pr(8081, 8900);
ASSERT_EQ(0, server.Start("127.0.0.1", pr, &server_options));
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_REDIS;
brpc::Channel channel;
ASSERT_EQ(0, channel.Init("127.0.0.1", server.listen_address().port, &options));
{
brpc::RedisRequest request;
brpc::RedisResponse response;
brpc::Controller cntl;
ASSERT_TRUE(request.AddCommand("set hello world"));
ASSERT_TRUE(request.AddCommand("get hello"));
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(2, response.reply_size());
ASSERT_STREQ("world", response.reply(1).c_str());
}
{
brpc::RedisRequest request;
brpc::RedisResponse response;
brpc::Controller cntl;
ASSERT_TRUE(request.AddCommand("multi"));
ASSERT_TRUE(request.AddCommand("mUltI"));
int count = 10;
for (int i = 0; i < count; ++i) {
ASSERT_TRUE(request.AddCommand("incr hello 1"));
}
ASSERT_TRUE(request.AddCommand("exec"));
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
ASSERT_EQ(13, response.reply_size());
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(0).type());
ASSERT_STREQ("OK", response.reply(0).c_str());
ASSERT_EQ(brpc::REDIS_REPLY_ERROR, response.reply(1).type());
for (int i = 2; i < count + 2; ++i) {
ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(i).type());
ASSERT_STREQ("QUEUED", response.reply(i).c_str());
}
const brpc::RedisReply& m = response.reply(count + 2);
ASSERT_EQ(count, (int)m.size());
for (int i = 0; i < count; ++i) {
ASSERT_EQ(i+1, m[i].integer());
}
}
// After 'multi', normal requests should be successful
{
brpc::RedisRequest request;
brpc::RedisResponse response;
brpc::Controller cntl;
ASSERT_TRUE(request.AddCommand("get hello"));
ASSERT_TRUE(request.AddCommand("get hello2"));
ASSERT_TRUE(request.AddCommand("set key1 value1"));
ASSERT_TRUE(request.AddCommand("get key1"));
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_STREQ("world", response.reply(0).c_str());
ASSERT_EQ(brpc::REDIS_REPLY_NIL, response.reply(1).type());
ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(2).type());
ASSERT_STREQ("OK", response.reply(2).c_str());
ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(3).type());
ASSERT_STREQ("value1", response.reply(3).c_str());
}
}
TEST_F(RedisTest, server_handle_pipeline) {
brpc::Server server;
brpc::ServerOptions server_options;
RedisServiceImpl* rsimpl = new RedisServiceImpl;
GetCommandHandler* getch = new GetCommandHandler(true);
SetCommandHandler* setch = new SetCommandHandler(true);
getch->rs = rsimpl;
setch->rs = rsimpl;
rsimpl->AddCommandHandler("get", getch);
rsimpl->AddCommandHandler("set", setch);
rsimpl->AddCommandHandler("multi", new MultiCommandHandler);
server_options.redis_service = rsimpl;
brpc::PortRange pr(8081, 8900);
ASSERT_EQ(0, server.Start("127.0.0.1", pr, &server_options));
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_REDIS;
brpc::Channel channel;
ASSERT_EQ(0, channel.Init("127.0.0.1", server.listen_address().port, &options));
brpc::RedisRequest request;
brpc::RedisResponse response;
brpc::Controller cntl;
ASSERT_TRUE(request.AddCommand("set key1 v1"));
ASSERT_TRUE(request.AddCommand("set key2 v2"));
ASSERT_TRUE(request.AddCommand("set key3 v3"));
ASSERT_TRUE(request.AddCommand("get hello"));
ASSERT_TRUE(request.AddCommand("get hello"));
ASSERT_TRUE(request.AddCommand("set key1 world"));
ASSERT_TRUE(request.AddCommand("set key2 world"));
ASSERT_TRUE(request.AddCommand("get key2"));
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(8, response.reply_size());
ASSERT_EQ(1, rsimpl->_batch_count);
ASSERT_TRUE(response.reply(7).is_string());
ASSERT_STREQ(response.reply(7).c_str(), "world");
}
} //namespace } //namespace
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment