Commit c0aed241 authored by zhujiashun's avatar zhujiashun

redis_server_protocol: refine code

parent 01da505f
......@@ -67,59 +67,52 @@ public:
SocketId socket_id;
RedisService* redis_service;
// If user starts a transaction, handler_continue indicates the
// If user starts a transaction, transaction_handler indicates the
// handler pointer that runs the transaction command.
std::unique_ptr<RedisCommandHandler> handler_continue;
std::unique_ptr<RedisCommandHandler> transaction_handler;
// >0 if command handler is run in batched mode.
int batched_size;
RedisCommandParser parser;
};
static std::string ToLowercase(const std::string& command) {
std::string res;
res.resize(command.size());
std::transform(command.begin(), command.end(), res.begin(),
[](unsigned char c){ return std::tolower(c); });
return res;
}
int ConsumeCommand(RedisConnContext* ctx,
const std::unique_ptr<const char*[]>& commands,
int len, butil::Arena* arena,
int command_len, butil::Arena* arena,
bool is_last,
butil::IOBuf* sendbuf) {
RedisReply output(arena);
RedisCommandHandler::Result result = RedisCommandHandler::OK;
if (ctx->handler_continue) {
result = ctx->handler_continue->Run(len, commands.get(), &output, is_last);
if (ctx->transaction_handler) {
result = ctx->transaction_handler->Run(
command_len, commands.get(), &output, is_last);
if (result == RedisCommandHandler::OK) {
ctx->handler_continue.reset(NULL);
ctx->transaction_handler.reset(NULL);
} else if (result == RedisCommandHandler::BATCHED) {
LOG(ERROR) << "BATCHED should not be returned in redis transaction process.";
LOG(ERROR) << "BATCHED should not be returned by a transaction handler.";
return -1;
}
} else {
std::string lcname = ToLowercase(commands[0]);
RedisCommandHandler* ch = ctx->redis_service->FindCommandHandler(lcname);
RedisCommandHandler* ch = ctx->redis_service->FindCommandHandler(commands[0]);
if (!ch) {
char buf[64];
snprintf(buf, sizeof(buf), "ERR unknown command `%s`", lcname.c_str());
snprintf(buf, sizeof(buf), "ERR unknown command `%s`", commands[0]);
output.SetError(buf);
} else {
result = ch->Run(len, commands.get(), &output, is_last);
result = ch->Run(command_len, commands.get(), &output, is_last);
if (result == RedisCommandHandler::CONTINUE) {
if (ctx->batched_size) {
LOG(ERROR) << "CONTINUE should not be returned in redis batched process.";
return -1;
}
ctx->handler_continue.reset(ch->NewTransactionHandler());
ctx->transaction_handler.reset(ch->NewTransactionHandler());
} else if (result == RedisCommandHandler::BATCHED) {
ctx->batched_size++;
}
}
}
if (result == RedisCommandHandler::OK && ctx->batched_size) {
if (result == RedisCommandHandler::OK) {
if (ctx->batched_size) {
if ((int)output.size() != (ctx->batched_size + 1)) {
LOG(ERROR) << "reply array size can't be matched with batched size, "
<< " expected=" << ctx->batched_size + 1 << " actual=" << output.size();
......@@ -129,9 +122,17 @@ int ConsumeCommand(RedisConnContext* ctx,
output[i].SerializeTo(sendbuf);
}
ctx->batched_size = 0;
} else if (result != RedisCommandHandler::BATCHED) {
} else {
output.SerializeTo(sendbuf);
} // else result == RedisCommandHandler::BATCHED, do not serialize to buf
}
} else if (result == RedisCommandHandler::CONTINUE) {
output.SerializeTo(sendbuf);
} else if (result == RedisCommandHandler::BATCHED) {
// just do nothing and wait handler to return OK.
} else {
LOG(ERROR) << "unknown status=" << result;
return -1;
}
return 0;
}
......@@ -180,16 +181,15 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
if (err != PARSE_OK) {
break;
}
// safe to read first element.
// current_commands and next_commands both have at least one element(NULL).
bool is_last = (strcasecmp(current_commands[0], next_commands[0]) != 0);
if (ConsumeCommand(ctx, current_commands, current_len, &arena, is_last, &sendbuf) != 0) {
if (ConsumeCommand(ctx, current_commands, current_len, &arena,
false, &sendbuf) != 0) {
return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
}
current_commands.swap(next_commands);
current_len = next_len;
}
if (ConsumeCommand(ctx, current_commands, current_len, &arena, true, &sendbuf) != 0) {
if (ConsumeCommand(ctx, current_commands, current_len, &arena,
true /* must be last message */, &sendbuf) != 0) {
return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
}
CHECK(!sendbuf.empty());
......
......@@ -20,6 +20,7 @@
#include <google/protobuf/reflection_ops.h> // ReflectionOps::Merge
#include <gflags/gflags.h>
#include "butil/status.h"
#include "butil/strings/string_util.h" // StringToLowerASCII
#include "brpc/redis.h"
#include "brpc/redis_command.h"
......@@ -437,11 +438,9 @@ std::ostream& operator<<(std::ostream& os, const RedisResponse& response) {
}
bool RedisService::AddCommandHandler(const std::string& name, RedisCommandHandler* handler) {
std::string lcname;
lcname.resize(name.size());
std::transform(name.begin(), name.end(), lcname.begin(),
[](unsigned char c){ return std::tolower(c); });
if (_command_map.count(lcname)) {
std::string lcname = StringToLowerASCII(name);
auto it = _command_map.find(lcname);
if (it != _command_map.end()) {
LOG(ERROR) << "redis command name=" << name << " exist";
return false;
}
......@@ -450,7 +449,8 @@ bool RedisService::AddCommandHandler(const std::string& name, RedisCommandHandle
}
RedisCommandHandler* RedisService::FindCommandHandler(const std::string& name) {
auto it = _command_map.find(name);
std::string lcname = StringToLowerASCII(name);
auto it = _command_map.find(lcname);
if (it != _command_map.end()) {
return it->second;
}
......
......@@ -20,6 +20,7 @@
#ifndef BRPC_REDIS_COMMAND_H
#define BRPC_REDIS_COMMAND_H
#include <memory> // std::unique_ptr
#include <vector>
#include "butil/iobuf.h"
#include "butil/status.h"
......
......@@ -413,34 +413,39 @@ void RedisReply::CopyFromDifferentArena(const RedisReply& other,
}
}
bool RedisReply::SetArray(int size) {
if (!_arena || _type != REDIS_REPLY_NIL) {
return false;
void RedisReply::SetArray(int size) {
if (!_arena) {
return;
}
if (_type != REDIS_REPLY_NIL) {
Reset();
}
_type = REDIS_REPLY_ARRAY;
if (size < 0) {
LOG(ERROR) << "negative size=" << size << " when calling SetArray";
return false;
return;
} else if (size == 0) {
_length = 0;
return true;
return;
}
RedisReply* subs = (RedisReply*)_arena->allocate(sizeof(RedisReply) * size);
if (!subs) {
LOG(FATAL) << "Fail to allocate RedisReply[" << size << "]";
return false;
return;
}
for (int i = 0; i < size; ++i) {
new (&subs[i]) RedisReply(_arena);
}
_length = size;
_data.array.replies = subs;
return true;
}
bool RedisReply::SetBasicString(const std::string& str, RedisReplyType type) {
if (!_arena || _type != REDIS_REPLY_NIL) {
return false;
void RedisReply::SetStringImpl(const std::string& str, RedisReplyType type) {
if (!_arena) {
return;
}
if (_type != REDIS_REPLY_NIL) {
Reset();
}
const size_t size = str.size();
if (size < sizeof(_data.short_str)) {
......@@ -450,7 +455,7 @@ bool RedisReply::SetBasicString(const std::string& str, RedisReplyType type) {
char* d = (char*)_arena->allocate((size/8 + 1) * 8);
if (!d) {
LOG(FATAL) << "Fail to allocate string[" << size << "]";
return false;
return;
}
memcpy(d, str.c_str(), size);
d[size] = '\0';
......@@ -458,7 +463,6 @@ bool RedisReply::SetBasicString(const std::string& str, RedisReplyType type) {
}
_type = type;
_length = size;
return true;
}
} // namespace brpc
......@@ -59,35 +59,28 @@ public:
bool is_string() const; // True if the reply is a string.
bool is_array() const; // True if the reply is an array.
// Set the reply to the null string. Return True if it is set
// successfully. If the reply has already been set, return false.
bool SetNullString();
// Set the reply to the null string.
void SetNullString();
// Set the reply to the null array. Return True if it is set
// successfully. If the reply has already been set, return false.
bool SetNullArray();
// Set the reply to the null array.
void SetNullArray();
// Set the reply to the array with `size' elements. After calling
// SetArray, use operator[] to visit sub replies and set their
// value. Return True if it is set successfully. If the reply has
// already been set, return false.
bool SetArray(int size);
// value.
void SetArray(int size);
// Set the reply to status message `str'. Return True if it is set
// successfully. If the reply has already been set, return false.
bool SetStatus(const std::string& str);
// Set the reply to status message `str'.
void SetStatus(const std::string& str);
// Set the reply to error message `str'. Return True if it is set
// successfully. If the reply has already been set, return false.
bool SetError(const std::string& str);
// Set the reply to error message `str'.
void SetError(const std::string& str);
// Set the reply to integer `value'. Return True if it is set
// successfully. If the reply has already been set, return false.
bool SetInteger(int64_t value);
// Set the reply to integer `value'.
void SetInteger(int64_t value);
// Set the reply to string `str'. Return True if it is set
// successfully. If the reply has already been set, return false.
bool SetString(const std::string& str);
// Set the reply to string `str'.
void SetString(const std::string& str);
// Convert the reply into a signed 64-bit integer(according to
// http://redis.io/topics/protocol). If the reply is not an integer,
......@@ -156,7 +149,8 @@ private:
// by calling CopyFrom[Different|Same]Arena.
DISALLOW_COPY_AND_ASSIGN(RedisReply);
bool SetBasicString(const std::string& str, RedisReplyType type);
void SetStringImpl(const std::string& str, RedisReplyType type);
void Reset();
RedisReplyType _type;
uint32_t _length; // length of short_str/long_str, count of replies
......@@ -180,17 +174,21 @@ inline std::ostream& operator<<(std::ostream& os, const RedisReply& r) {
return os;
}
inline RedisReply::RedisReply(butil::Arena* arena)
: RedisReply() {
inline void RedisReply::Reset() {
_type = REDIS_REPLY_NIL;
_length = 0;
_data.array.last_index = -1;
_data.array.replies = NULL;
}
inline RedisReply::RedisReply(butil::Arena* arena) {
Reset();
_arena = arena;
}
inline RedisReply::RedisReply()
: _type(REDIS_REPLY_NIL)
, _length(0)
, _arena(NULL) {
_data.array.last_index = -1;
_data.array.replies = NULL;
inline RedisReply::RedisReply() {
Reset();
_arena = NULL;
}
inline bool RedisReply::is_nil() const {
......@@ -213,44 +211,41 @@ inline int64_t RedisReply::integer() const {
return 0;
}
inline bool RedisReply::SetNullArray() {
inline void RedisReply::SetNullArray() {
if (_type != REDIS_REPLY_NIL) {
return false;
Reset();
}
_type = REDIS_REPLY_ARRAY;
_length = npos;
return true;
}
inline bool RedisReply::SetNullString() {
inline void RedisReply::SetNullString() {
if (_type != REDIS_REPLY_NIL) {
return false;
Reset();
}
_type = REDIS_REPLY_STRING;
_length = npos;
return true;
}
inline bool RedisReply::SetStatus(const std::string& str) {
return SetBasicString(str, REDIS_REPLY_STATUS);
inline void RedisReply::SetStatus(const std::string& str) {
return SetStringImpl(str, REDIS_REPLY_STATUS);
}
inline bool RedisReply::SetError(const std::string& str) {
return SetBasicString(str, REDIS_REPLY_ERROR);
inline void RedisReply::SetError(const std::string& str) {
return SetStringImpl(str, REDIS_REPLY_ERROR);
}
inline bool RedisReply::SetInteger(int64_t value) {
inline void RedisReply::SetInteger(int64_t value) {
if (_type != REDIS_REPLY_NIL) {
return false;
Reset();
}
_type = REDIS_REPLY_INTEGER;
_length = 0;
_data.integer = value;
return true;
}
inline bool RedisReply::SetString(const std::string& str) {
return SetBasicString(str, REDIS_REPLY_STRING);
inline void RedisReply::SetString(const std::string& str) {
return SetStringImpl(str, REDIS_REPLY_STRING);
}
inline const char* RedisReply::c_str() const {
......
......@@ -638,7 +638,7 @@ TEST_F(RedisTest, redis_reply_codec) {
{
brpc::RedisReply r(&arena);
butil::IOBuf buf;
ASSERT_TRUE(r.SetStatus("OK"));
r.SetStatus("OK");
ASSERT_TRUE(r.SerializeTo(&buf));
ASSERT_STREQ(buf.to_string().c_str(), "+OK\r\n");
ASSERT_STREQ(r.c_str(), "OK");
......@@ -652,7 +652,7 @@ TEST_F(RedisTest, redis_reply_codec) {
{
brpc::RedisReply r(&arena);
butil::IOBuf buf;
ASSERT_TRUE(r.SetError("not exist \'key\'"));
r.SetError("not exist \'key\'");
ASSERT_TRUE(r.SerializeTo(&buf));
ASSERT_STREQ(buf.to_string().c_str(), "-not exist \'key\'\r\n");
r.Clear();
......@@ -665,7 +665,7 @@ TEST_F(RedisTest, redis_reply_codec) {
{
brpc::RedisReply r(&arena);
butil::IOBuf buf;
ASSERT_TRUE(r.SetNullString());
r.SetNullString();
ASSERT_TRUE(r.SerializeTo(&buf));
ASSERT_STREQ(buf.to_string().c_str(), "$-1\r\n");
r.Clear();
......@@ -674,7 +674,7 @@ TEST_F(RedisTest, redis_reply_codec) {
ASSERT_TRUE(r.is_nil());
r.Clear();
ASSERT_TRUE(r.SetString("abcde'hello world"));
r.SetString("abcde'hello world");
ASSERT_TRUE(r.SerializeTo(&buf));
ASSERT_STREQ(buf.to_string().c_str(), "$17\r\nabcde'hello world\r\n");
ASSERT_STREQ(r.c_str(), "abcde'hello world");
......@@ -693,7 +693,7 @@ TEST_F(RedisTest, redis_reply_codec) {
const char* output[] = { ":-1\r\n", ":1234567\r\n" };
for (int i = 0; i < t; ++i) {
r.Clear();
ASSERT_TRUE(r.SetInteger(input[i]));
r.SetInteger(input[i]);
ASSERT_TRUE(r.SerializeTo(&buf));
ASSERT_STREQ(buf.to_string().c_str(), output[i]);
r.Clear();
......@@ -707,7 +707,7 @@ TEST_F(RedisTest, redis_reply_codec) {
{
brpc::RedisReply r(&arena);
butil::IOBuf buf;
ASSERT_TRUE(r.SetArray(3));
r.SetArray(3);
brpc::RedisReply& sub_reply = r[0];
sub_reply.SetArray(2);
sub_reply[0].SetString("hello, it's me");
......@@ -736,7 +736,7 @@ TEST_F(RedisTest, redis_reply_codec) {
r.Clear();
// null array
ASSERT_TRUE(r.SetNullArray());
r.SetNullArray();
ASSERT_TRUE(r.SerializeTo(&buf));
ASSERT_STREQ(buf.to_string().c_str(), "*-1\r\n");
ASSERT_EQ(r.ConsumePartialIOBuf(buf, &arena), brpc::PARSE_OK);
......@@ -746,7 +746,7 @@ TEST_F(RedisTest, redis_reply_codec) {
// CopyFromDifferentArena
{
brpc::RedisReply r(&arena);
ASSERT_TRUE(r.SetArray(1));
r.SetArray(1);
brpc::RedisReply& sub_reply = r[0];
sub_reply.SetArray(2);
sub_reply[0].SetString("hello, it's me");
......@@ -759,26 +759,21 @@ TEST_F(RedisTest, redis_reply_codec) {
ASSERT_STREQ(r2[0][0].c_str(), sub_reply[0].c_str());
ASSERT_EQ(r2[0][1].integer(), sub_reply[1].integer());
}
// SetXXX can only be called once
// SetXXX can be called multiple times.
{
brpc::RedisReply r(&arena);
ASSERT_TRUE(r.SetStatus("OK"));
ASSERT_FALSE(r.SetStatus("OK"));
ASSERT_FALSE(r.SetNullString());
ASSERT_FALSE(r.SetArray(2));
ASSERT_FALSE(r.SetString("OK"));
ASSERT_FALSE(r.SetError("OK"));
ASSERT_FALSE(r.SetInteger(42));
}
{
brpc::RedisReply r(&arena);
ASSERT_TRUE(r.SetInteger(42));
ASSERT_FALSE(r.SetStatus("OK"));
ASSERT_FALSE(r.SetNullString());
ASSERT_FALSE(r.SetArray(2));
ASSERT_FALSE(r.SetString("OK"));
ASSERT_FALSE(r.SetError("OK"));
ASSERT_FALSE(r.SetStatus("OK"));
r.SetStatus("OK");
ASSERT_TRUE(r.is_string());
r.SetNullString();
ASSERT_TRUE(r.is_nil());
r.SetArray(2);
ASSERT_TRUE(r.is_array());
r.SetString("OK");
ASSERT_TRUE(r.is_string());
r.SetError("OK");
ASSERT_TRUE(r.is_error());
r.SetInteger(42);
ASSERT_TRUE(r.is_integer());
}
}
......@@ -786,22 +781,19 @@ butil::Mutex s_mutex;
std::unordered_map<std::string, std::string> m;
std::unordered_map<std::string, int64_t> int_map;
class SetCommandHandler : public brpc::RedisCommandHandler {
class RedisServiceImpl : public brpc::RedisService {
public:
SetCommandHandler(bool batch_process = false)
: _batch_process(batch_process)
, _batch_count(0) {}
RedisServiceImpl()
: _batch_count(0) {}
brpc::RedisCommandHandler::Result Run(int size, const char* args[],
brpc::RedisReply* output,
bool is_last) {
if (size < 3) {
output->SetError("ERR wrong number of arguments for 'set' command");
return brpc::RedisCommandHandler::OK;
}
if (_batch_process) {
brpc::RedisCommandHandler::Result OnBatched(int size, const char* args[],
brpc::RedisReply* output, bool is_last) {
if (_batched_command.empty() && is_last) {
if (strcasecmp(args[0], "set") == 0) {
DoSet(args[1], args[2], output);
} else if (strcasecmp(args[0], "get") == 0) {
DoGet(args[1], output);
}
return brpc::RedisCommandHandler::OK;
}
std::vector<std::string> comm;
......@@ -812,7 +804,11 @@ public:
if (is_last) {
output->SetArray(_batched_command.size());
for (int i = 0; i < (int)_batched_command.size(); ++i) {
if (_batched_command[i][0] == "set") {
DoSet(_batched_command[i][1], _batched_command[i][2], &(*output)[i]);
} else if (_batched_command[i][0] == "get") {
DoGet(_batched_command[i][1], &(*output)[i]);
}
}
_batch_count++;
_batched_command.clear();
......@@ -820,6 +816,42 @@ public:
} else {
return brpc::RedisCommandHandler::BATCHED;
}
}
void DoSet(const std::string& key, const std::string& value, brpc::RedisReply* output) {
m[key] = value;
output->SetStatus("OK");
}
void DoGet(const std::string& key, brpc::RedisReply* output) {
auto it = m.find(key);
if (it != m.end()) {
output->SetString(it->second);
} else {
output->SetNullString();
}
}
std::vector<std::vector<std::string> > _batched_command;
int _batch_count;
};
class SetCommandHandler : public brpc::RedisCommandHandler {
public:
SetCommandHandler(bool batch_process = false)
: rs(NULL)
, _batch_process(batch_process) {}
brpc::RedisCommandHandler::Result Run(int size, const char* args[],
brpc::RedisReply* output,
bool is_last) {
if (size < 3) {
output->SetError("ERR wrong number of arguments for 'set' command");
return brpc::RedisCommandHandler::OK;
}
if (_batch_process) {
return rs->OnBatched(size, args, output, is_last);
} else {
DoSet(args[1], args[2], output);
return brpc::RedisCommandHandler::OK;
......@@ -830,17 +862,17 @@ public:
m[key] = value;
output->SetStatus("OK");
}
RedisServiceImpl* rs;
private:
bool _batch_process;
std::vector<std::vector<std::string> > _batched_command;
int _batch_count;
};
class GetCommandHandler : public brpc::RedisCommandHandler {
public:
GetCommandHandler(bool batch_process = false)
: _batch_process(batch_process)
, _batch_count(0) {}
: rs(NULL)
, _batch_process(batch_process) {}
brpc::RedisCommandHandler::Result Run(int size, const char* args[],
brpc::RedisReply* output,
......@@ -850,26 +882,7 @@ public:
return brpc::RedisCommandHandler::OK;
}
if (_batch_process) {
if (_batched_command.empty() && is_last) {
DoGet(args[1], output);
return brpc::RedisCommandHandler::OK;
}
std::vector<std::string> comm;
for (int i = 0; args[i]; ++i) {
comm.push_back(args[i]);
}
_batched_command.push_back(comm);
if (is_last) {
output->SetArray(_batched_command.size());
for (int i = 0; i < (int)_batched_command.size(); ++i) {
DoGet(_batched_command[i][1], &(*output)[i]);
}
_batch_count++;
_batched_command.clear();
return brpc::RedisCommandHandler::OK;
} else {
return brpc::RedisCommandHandler::BATCHED;
}
return rs->OnBatched(size, args, output, is_last);
} else {
DoGet(args[1], output);
return brpc::RedisCommandHandler::OK;
......@@ -884,10 +897,10 @@ public:
output->SetNullString();
}
}
RedisServiceImpl* rs;
private:
bool _batch_process;
std::vector<std::vector<std::string> > _batched_command;
int _batch_count;
};
class IncrCommandHandler : public brpc::RedisCommandHandler {
......@@ -911,8 +924,6 @@ public:
}
};
class RedisServiceImpl : public brpc::RedisService { };
TEST_F(RedisTest, server_sanity) {
brpc::Server server;
brpc::ServerOptions server_options;
......@@ -1139,6 +1150,8 @@ TEST_F(RedisTest, server_handle_pipeline) {
RedisServiceImpl* rsimpl = new RedisServiceImpl;
GetCommandHandler* getch = new GetCommandHandler(true);
SetCommandHandler* setch = new SetCommandHandler(true);
getch->rs = rsimpl;
setch->rs = rsimpl;
rsimpl->AddCommandHandler("get", getch);
rsimpl->AddCommandHandler("set", setch);
server_options.redis_service = rsimpl;
......@@ -1164,8 +1177,7 @@ TEST_F(RedisTest, server_handle_pipeline) {
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(8, response.reply_size());
ASSERT_EQ(1, getch->_batch_count);
ASSERT_EQ(2, setch->_batch_count);
ASSERT_EQ(1, rsimpl->_batch_count);
ASSERT_TRUE(response.reply(7).is_string());
ASSERT_STREQ(response.reply(7).c_str(), "world");
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment