Commit 520858cf authored by zhujiashun's avatar zhujiashun

redis_server_protocol: refine code

parent 60581f05
......@@ -29,7 +29,7 @@
class RedisServiceImpl : public brpc::RedisService {
public:
bool Set(const std::string& key, const std::string& value) {
int slot = butil::crc32c::Value(key.c_str(), key.size()) % HashSlotNum;
int slot = butil::crc32c::Value(key.c_str(), key.size()) % kHashSlotNum;
_mutex[slot].lock();
_map[slot][key] = value;
_mutex[slot].unlock();
......@@ -37,7 +37,7 @@ public:
}
bool Get(const std::string& key, std::string* value) {
int slot = butil::crc32c::Value(key.c_str(), key.size()) % HashSlotNum;
int slot = butil::crc32c::Value(key.c_str(), key.size()) % kHashSlotNum;
_mutex[slot].lock();
auto it = _map[slot].find(key);
if (it == _map[slot].end()) {
......@@ -50,9 +50,9 @@ public:
}
private:
const static int HashSlotNum = 32;
std::unordered_map<std::string, std::string> _map[HashSlotNum];
butil::Mutex _mutex[HashSlotNum];
const static int kHashSlotNum = 32;
std::unordered_map<std::string, std::string> _map[kHashSlotNum];
butil::Mutex _mutex[kHashSlotNum];
};
class GetCommandHandler : public brpc::RedisCommandHandler {
......
......@@ -58,6 +58,11 @@ struct InputResponse : public InputMessageBase {
}
};
// This struct is pushed into ExecutionQueue of each connection.
struct CommandInfo {
std::string command;
};
// This class is as parsing_context in socket.
class RedisConnContext : public Destroyable {
public:
......@@ -76,25 +81,26 @@ public:
// first handler pointer that triggers the transaction.
RedisCommandHandler* handler_continue;
// The redis command are parsed and pushed into this queue
bthread::ExecutionQueueId<std::string*> queue;
bthread::ExecutionQueueId<CommandInfo*> queue;
RedisCommandParser parser;
std::string command;
};
int ConsumeTask(RedisConnContext* ctx, std::string* command, butil::IOBuf* sendbuf) {
int ConsumeTask(RedisConnContext* ctx, const std::string& command, butil::IOBuf* sendbuf) {
butil::Arena arena;
RedisReply output(&arena);
if (ctx->handler_continue) {
RedisCommandHandler::Result result =
ctx->handler_continue->Run(command->c_str(), &output);
ctx->handler_continue->Run(command.c_str(), &output);
if (result == RedisCommandHandler::OK) {
ctx->handler_continue = NULL;
}
} else {
std::string comm;
comm.reserve(8);
for (int i = 0; i < (int)command->size() && (*command)[i] != ' '; ++i) {
comm.push_back(std::tolower((*command)[i]));
for (int i = 0; i < (int)command.size() && command[i] != ' '; ++i) {
comm.push_back(std::tolower(command[i]));
}
RedisCommandHandler* ch = ctx->redis_service->FindCommandHandler(comm);
if (!ch) {
......@@ -102,17 +108,17 @@ int ConsumeTask(RedisConnContext* ctx, std::string* command, butil::IOBuf* sendb
snprintf(buf, sizeof(buf), "ERR unknown command `%s`", comm.c_str());
output.SetError(buf);
} else {
RedisCommandHandler::Result result = ch->Run(command->c_str(), &output);
RedisCommandHandler::Result result = ch->Run(command.c_str(), &output);
if (result == RedisCommandHandler::CONTINUE) {
ctx->handler_continue = ch;
}
}
}
output.SerializeToIOBuf(sendbuf);
output.SerializeTo(sendbuf);
return 0;
}
int Consume(void* ctx, bthread::TaskIterator<std::string*>& iter) {
int Consume(void* ctx, bthread::TaskIterator<CommandInfo*>& iter) {
RedisConnContext* qctx = static_cast<RedisConnContext*>(ctx);
if (iter.is_queue_stopped()) {
delete qctx;
......@@ -128,11 +134,11 @@ int Consume(void* ctx, bthread::TaskIterator<std::string*>& iter) {
wopt.ignore_eovercrowded = true;
butil::IOBuf sendbuf;
for (; iter; ++iter) {
std::unique_ptr<std::string> guard(*iter);
std::unique_ptr<CommandInfo> guard(*iter);
if (has_err) {
continue;
}
ConsumeTask(qctx, *iter, &sendbuf);
ConsumeTask(qctx, (*iter)->command, &sendbuf);
// If there are too many tasks to execute, latency of the front
// responses will be increased by waiting the following tasks to
// be completed. To prevent this, if the current buf size is greater
......@@ -194,17 +200,17 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
}
socket->reset_parsing_context(ctx);
}
ParseError err = ctx->parser.Parse(*source);
ParseError err = ctx->parser.Consume(*source, &ctx->command);
if (err != PARSE_OK) {
return MakeParseError(err);
}
std::unique_ptr<std::string> command(new std::string);
ctx->parser.SwapCommandTo(command.get());
if (bthread::execution_queue_execute(ctx->queue, command.get()) != 0) {
std::unique_ptr<CommandInfo> info(new CommandInfo);
info->command.swap(ctx->command);
if (bthread::execution_queue_execute(ctx->queue, info.get()) != 0) {
LOG(ERROR) << "Fail to push execution queue";
return MakeParseError(PARSE_ERROR_NO_RESOURCE);
}
command.release();
info.release();
return MakeMessage(NULL);
} else {
// NOTE(gejun): PopPipelinedInfo() is actually more contended than what
......
......@@ -36,8 +36,8 @@ void ProcessRedisResponse(InputMessageBase* msg);
// Actions to a redis request, which is left unimplemented.
// All requests are processed in execution queue pushed in
// the parsing process. This function must be declared since
// server side will enable redis as a server side protocol
// when this function is declared.
// server only enables redis as a server-side protocol when
// this function is declared.
void ProcessRedisRequest(InputMessageBase* msg);
// Serialize a redis request.
......
......@@ -436,57 +436,6 @@ std::ostream& operator<<(std::ostream& os, const RedisResponse& response) {
return os;
}
bool RedisReply::SetArray(int size) {
if (!_arena || _has_set) {
return false;
}
_type = REDIS_REPLY_ARRAY;
if (size < 0) {
_length = npos;
return true;
} else if (size == 0) {
_length = 0;
return true;
}
RedisReply* subs = (RedisReply*)_arena->allocate(sizeof(RedisReply) * size);
if (!subs) {
LOG(FATAL) << "Fail to allocate RedisReply[" << size << "]";
return false;
}
for (int i = 0; i < size; ++i) {
new (&subs[i]) RedisReply(_arena);
}
_length = size;
_data.array.replies = subs;
_has_set = true;
return true;
}
bool RedisReply::SetBasicString(const std::string& str, RedisReplyType type) {
if (!_arena || _has_set) {
return false;
}
const size_t size = str.size();
if (size < sizeof(_data.short_str)) {
memcpy(_data.short_str, str.c_str(), size);
_data.short_str[size] = '\0';
} else {
char* d = (char*)_arena->allocate((size/8 + 1) * 8);
if (!d) {
LOG(FATAL) << "Fail to allocate string[" << size << "]";
return false;
}
memcpy(d, str.c_str(), size);
d[size] = '\0';
_data.long_str = d;
}
_type = type;
_length = size;
_has_set = true;
return true;
}
bool RedisService::AddCommandHandler(const std::string& name, RedisCommandHandler* handler) {
std::string lcname;
lcname.resize(name.size());
......@@ -496,14 +445,14 @@ bool RedisService::AddCommandHandler(const std::string& name, RedisCommandHandle
LOG(ERROR) << "redis command name=" << name << " exist";
return false;
}
_command_map[lcname].reset(handler);
_command_map[lcname] = handler;
return true;
}
RedisCommandHandler* RedisService::FindCommandHandler(const std::string& name) {
auto it = _command_map.find(name);
if (it != _command_map.end()) {
return it->second.get();
return it->second;
}
return NULL;
}
......
......@@ -219,13 +219,13 @@ class RedisCommandHandler;
// to enable redis support.
class RedisService {
public:
typedef std::unordered_map<std::string, std::shared_ptr<RedisCommandHandler>> CommandMap;
typedef std::unordered_map<std::string, RedisCommandHandler*> CommandMap;
virtual ~RedisService() {}
// Call this function to register `handler` that can handle command `name`.
bool AddCommandHandler(const std::string& name, RedisCommandHandler* handler);
// This function should be touched by user and used by brpc deverloper only.
// This function should not be touched by user and used by brpc deverloper only.
RedisCommandHandler* FindCommandHandler(const std::string& name);
private:
CommandMap _command_map;
......
......@@ -364,7 +364,7 @@ RedisCommandParser::RedisCommandParser() {
Reset();
}
ParseError RedisCommandParser::Parse(butil::IOBuf& buf) {
ParseError RedisCommandParser::Consume(butil::IOBuf& buf, std::string* command) {
const char* pfc = (const char*)buf.fetch1();
if (pfc == NULL) {
return PARSE_ERROR_NOT_ENOUGH_DATA;
......@@ -396,7 +396,7 @@ ParseError RedisCommandParser::Parse(butil::IOBuf& buf) {
_length = value;
_index = 0;
_command.clear();
return Parse(buf);
return Consume(buf, command);
}
CHECK(_index < _length) << "a complete command has been parsed. "
"impl of RedisCommandParser::Parse is buggy";
......@@ -425,17 +425,14 @@ ParseError RedisCommandParser::Parse(butil::IOBuf& buf) {
return PARSE_ERROR_ABSOLUTELY_WRONG;
}
if (++_index < _length) {
return Parse(buf);
return Consume(buf, command);
}
command->clear();
command->swap(_command);
Reset();
return PARSE_OK;
}
void RedisCommandParser::SwapCommandTo(std::string* out) {
out->clear();
out->swap(_command);
}
void RedisCommandParser::Reset() {
_parsing_array = false;
_length = 0;
......
......@@ -45,12 +45,9 @@ class RedisCommandParser {
public:
RedisCommandParser();
// Parse raw message from `buf'. Return PARSE_OK if successful.
ParseError Parse(butil::IOBuf& buf);
// After Parse returns PARSE_OK, call this function to swap
// the parsed command string to `out'.
void SwapCommandTo(std::string* out);
// Parse raw message from `buf'. Return PARSE_OK and set the parsed command
// to `command' if successful.
ParseError Consume(butil::IOBuf& buf, std::string* command);
private:
// Reset parser to the initial state.
......
......@@ -38,7 +38,7 @@ const char* RedisReplyTypeToString(RedisReplyType type) {
}
}
bool RedisReply::SerializeToIOBuf(butil::IOBuf* buf) {
bool RedisReply::SerializeTo(butil::IOBuf* buf) {
butil::IOBufBuilder builder;
switch (_type) {
case REDIS_REPLY_ERROR:
......@@ -78,7 +78,7 @@ bool RedisReply::SerializeToIOBuf(butil::IOBuf* buf) {
break;
}
for (size_t i = 0; i < _length; ++i) {
if (!_data.array.replies[i].SerializeToIOBuf(buf)) {
if (!_data.array.replies[i].SerializeTo(buf)) {
return false;
}
}
......@@ -245,7 +245,7 @@ ParseError RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* aren
return PARSE_ERROR_ABSOLUTELY_WRONG;
}
for (int64_t i = 0; i < count; ++i) {
new (&subs[i]) RedisReply(NULL);
new (&subs[i]) RedisReply;
}
buf.pop_front(crlf_pos + 2/*CRLF*/);
_type = REDIS_REPLY_ARRAY;
......@@ -413,4 +413,52 @@ void RedisReply::CopyFromDifferentArena(const RedisReply& other,
}
}
bool RedisReply::SetArray(int size) {
if (!_arena || _type != REDIS_REPLY_NIL) {
return false;
}
_type = REDIS_REPLY_ARRAY;
if (size < 0) {
_length = npos;
return true;
} else if (size == 0) {
_length = 0;
return true;
}
RedisReply* subs = (RedisReply*)_arena->allocate(sizeof(RedisReply) * size);
if (!subs) {
LOG(FATAL) << "Fail to allocate RedisReply[" << size << "]";
return false;
}
for (int i = 0; i < size; ++i) {
new (&subs[i]) RedisReply(_arena);
}
_length = size;
_data.array.replies = subs;
return true;
}
bool RedisReply::SetBasicString(const std::string& str, RedisReplyType type) {
if (!_arena || _type != REDIS_REPLY_NIL) {
return false;
}
const size_t size = str.size();
if (size < sizeof(_data.short_str)) {
memcpy(_data.short_str, str.c_str(), size);
_data.short_str[size] = '\0';
} else {
char* d = (char*)_arena->allocate((size/8 + 1) * 8);
if (!d) {
LOG(FATAL) << "Fail to allocate string[" << size << "]";
return false;
}
memcpy(d, str.c_str(), size);
d[size] = '\0';
_data.long_str = d;
}
_type = type;
_length = size;
return true;
}
} // namespace brpc
......@@ -127,7 +127,7 @@ public:
ParseError ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* arena);
// Serialize to buf using redis protocol
bool SerializeToIOBuf(butil::IOBuf* buf);
bool SerializeTo(butil::IOBuf* buf);
// Swap internal fields with another reply.
void Swap(RedisReply& other);
......@@ -168,7 +168,6 @@ private:
uint64_t padding[2]; // For swapping, must cover all bytes.
} _data;
butil::Arena* _arena;
bool _has_set;
};
// =========== inline impl. ==============
......@@ -186,8 +185,7 @@ inline RedisReply::RedisReply(butil::Arena* arena)
inline RedisReply::RedisReply()
: _type(REDIS_REPLY_NIL)
, _length(0)
, _arena(NULL)
, _has_set(false) {
, _arena(NULL) {
_data.array.last_index = -1;
_data.array.replies = NULL;
}
......@@ -213,10 +211,11 @@ inline int64_t RedisReply::integer() const {
}
inline bool RedisReply::SetNilString() {
if (!_arena || _has_set) return false;
if (!_arena || _type != REDIS_REPLY_NIL) {
return false;
}
_type = REDIS_REPLY_STRING;
_length = npos;
_has_set = true;
return true;
}
......@@ -229,13 +228,12 @@ inline bool RedisReply::SetError(const std::string& str) {
}
inline bool RedisReply::SetInteger(int64_t value) {
if (!_arena || _has_set) {
if (!_arena || _type != REDIS_REPLY_NIL) {
return false;
}
_type = REDIS_REPLY_INTEGER;
_length = 0;
_data.integer = value;
_has_set = true;
return true;
}
......@@ -311,7 +309,6 @@ inline void RedisReply::Clear() {
_length = 0;
_data.array.last_index = -1;
_data.array.replies = NULL;
_has_set = false;
}
inline void RedisReply::CopyFromSameArena(const RedisReply& other) {
......
......@@ -53,7 +53,7 @@ endif()
set(CMAKE_CPP_FLAGS "${DEFINE_CLOCK_GETTIME} -DBRPC_WITH_GLOG=${WITH_GLOG_VAL} -DGFLAGS_NS=${GFLAGS_NS}")
set(CMAKE_CPP_FLAGS "${CMAKE_CPP_FLAGS} -DBTHREAD_USE_FAST_PTHREAD_MUTEX -D__const__= -D_GNU_SOURCE -DUSE_SYMBOLIZE -DNO_TCMALLOC -D__STDC_FORMAT_MACROS -D__STDC_LIMIT_MACROS -D__STDC_CONSTANT_MACROS -DUNIT_TEST -Dprivate=public -Dprotected=public -DBVAR_NOT_LINK_DEFAULT_VARIABLES -D__STRICT_ANSI__ -include ${PROJECT_SOURCE_DIR}/test/sstream_workaround.h")
set(CMAKE_CXX_FLAGS "${CMAKE_CPP_FLAGS} -O2 -pipe -Wall -W -fPIC -fstrict-aliasing -Wno-invalid-offsetof -Wno-unused-parameter -fno-omit-frame-pointer")
set(CMAKE_CXX_FLAGS "${CMAKE_CPP_FLAGS} -g -O2 -pipe -Wall -W -fPIC -fstrict-aliasing -Wno-invalid-offsetof -Wno-unused-parameter -fno-omit-frame-pointer")
use_cxx11()
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
......
......@@ -553,14 +553,13 @@ TEST_F(RedisTest, quote_and_escape) {
TEST_F(RedisTest, command_parser) {
brpc::RedisCommandParser parser;
butil::IOBuf buf;
std::string command_out;
{
// parse from whole command
std::string command = "set abc edc";
ASSERT_TRUE(brpc::RedisCommandNoFormat(&buf, command.c_str()).ok());
ASSERT_EQ(brpc::PARSE_OK, parser.Parse(buf));
ASSERT_EQ(brpc::PARSE_OK, parser.Consume(buf, &command_out));
ASSERT_TRUE(buf.empty());
std::string command_out;
parser.SwapCommandTo(&command_out);
ASSERT_STREQ(command.c_str(), command_out.c_str());
}
{
......@@ -572,51 +571,49 @@ TEST_F(RedisTest, command_parser) {
for (int i = 0; i < size; ++i) {
buf.push_back(raw_string[i]);
if (i == size - 1) {
ASSERT_EQ(brpc::PARSE_OK, parser.Parse(buf));
ASSERT_EQ(brpc::PARSE_OK, parser.Consume(buf, &command_out));
} else {
if (butil::fast_rand_less_than(2) == 0) {
ASSERT_EQ(brpc::PARSE_ERROR_NOT_ENOUGH_DATA,
parser.Parse(buf));
parser.Consume(buf, &command_out));
}
}
}
ASSERT_TRUE(buf.empty());
std::string command_out;
parser.SwapCommandTo(&command_out);
ASSERT_STREQ(command_out.c_str(), "set abc def");
}
}
{
// there is a non-string message in command and parse should fail
buf.append("*3\r\n$3");
ASSERT_EQ(brpc::PARSE_ERROR_NOT_ENOUGH_DATA, parser.Parse(buf));
ASSERT_EQ(brpc::PARSE_ERROR_NOT_ENOUGH_DATA, parser.Consume(buf, &command_out));
ASSERT_EQ((int)buf.size(), 2); // left "$3"
buf.append("\r\nset\r\n:123\r\n$3\r\ndef\r\n");
ASSERT_EQ(brpc::PARSE_ERROR_ABSOLUTELY_WRONG, parser.Parse(buf));
ASSERT_EQ(brpc::PARSE_ERROR_ABSOLUTELY_WRONG, parser.Consume(buf, &command_out));
parser.Reset();
}
{
// not array
buf.append(":123456\r\n");
ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Parse(buf));
ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Consume(buf, &command_out));
parser.Reset();
}
{
// not array
buf.append("+Error\r\n");
ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Parse(buf));
ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Consume(buf, &command_out));
parser.Reset();
}
{
// not array
buf.append("+OK\r\n");
ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Parse(buf));
ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Consume(buf, &command_out));
parser.Reset();
}
{
// not array
buf.append("$5\r\nhello\r\n");
ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Parse(buf));
ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Consume(buf, &command_out));
parser.Reset();
}
}
......@@ -628,7 +625,7 @@ TEST_F(RedisTest, redis_reply_codec) {
brpc::RedisReply r(&arena);
butil::IOBuf buf;
ASSERT_TRUE(r.SetStatus("OK"));
ASSERT_TRUE(r.SerializeToIOBuf(&buf));
ASSERT_TRUE(r.SerializeTo(&buf));
ASSERT_STREQ(buf.to_string().c_str(), "+OK\r\n");
ASSERT_STREQ(r.c_str(), "OK");
r.Clear();
......@@ -642,7 +639,7 @@ TEST_F(RedisTest, redis_reply_codec) {
brpc::RedisReply r(&arena);
butil::IOBuf buf;
ASSERT_TRUE(r.SetError("not exist \'key\'"));
ASSERT_TRUE(r.SerializeToIOBuf(&buf));
ASSERT_TRUE(r.SerializeTo(&buf));
ASSERT_STREQ(buf.to_string().c_str(), "-not exist \'key\'\r\n");
r.Clear();
brpc::ParseError err = r.ConsumePartialIOBuf(buf, &arena);
......@@ -655,7 +652,7 @@ TEST_F(RedisTest, redis_reply_codec) {
brpc::RedisReply r(&arena);
butil::IOBuf buf;
ASSERT_TRUE(r.SetNilString());
ASSERT_TRUE(r.SerializeToIOBuf(&buf));
ASSERT_TRUE(r.SerializeTo(&buf));
ASSERT_STREQ(buf.to_string().c_str(), "$-1\r\n");
r.Clear();
brpc::ParseError err = r.ConsumePartialIOBuf(buf, &arena);
......@@ -664,7 +661,7 @@ TEST_F(RedisTest, redis_reply_codec) {
r.Clear();
ASSERT_TRUE(r.SetString("abcde'hello world"));
ASSERT_TRUE(r.SerializeToIOBuf(&buf));
ASSERT_TRUE(r.SerializeTo(&buf));
ASSERT_STREQ(buf.to_string().c_str(), "$17\r\nabcde'hello world\r\n");
ASSERT_STREQ(r.c_str(), "abcde'hello world");
r.Clear();
......@@ -683,7 +680,7 @@ TEST_F(RedisTest, redis_reply_codec) {
for (int i = 0; i < t; ++i) {
r.Clear();
ASSERT_TRUE(r.SetInteger(input[i]));
ASSERT_TRUE(r.SerializeToIOBuf(&buf));
ASSERT_TRUE(r.SerializeTo(&buf));
ASSERT_STREQ(buf.to_string().c_str(), output[i]);
r.Clear();
brpc::ParseError err = r.ConsumePartialIOBuf(buf, &arena);
......@@ -704,7 +701,7 @@ TEST_F(RedisTest, redis_reply_codec) {
r[1].SetString("To go over everything");
r[2].SetInteger(1);
ASSERT_TRUE(r[3].is_nil());
ASSERT_TRUE(r.SerializeToIOBuf(&buf));
ASSERT_TRUE(r.SerializeTo(&buf));
ASSERT_STREQ(buf.to_string().c_str(),
"*3\r\n*2\r\n$14\r\nhello, it's me\r\n:422\r\n$21\r\n"
"To go over everything\r\n:1\r\n");
......@@ -726,7 +723,7 @@ TEST_F(RedisTest, redis_reply_codec) {
r.Clear();
// nil array
ASSERT_TRUE(r.SetArray(-1));
ASSERT_TRUE(r.SerializeToIOBuf(&buf));
ASSERT_TRUE(r.SerializeTo(&buf));
ASSERT_STREQ(buf.to_string().c_str(), "*-1\r\n");
ASSERT_EQ(r.ConsumePartialIOBuf(buf, &arena), brpc::PARSE_OK);
ASSERT_TRUE(r.is_nil());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment