Commit 684f4ed1 authored by zhujiashun's avatar zhujiashun

redis_server_protocol: refine code, add comment and more ut

parent ba7f3d5f
......@@ -64,23 +64,26 @@ public:
brpc::RedisReply* output) {
std::string key;
bool parse_command = false;
butil::StringSplitter sp(args, ' ');
for (; sp; ++sp) {
const char* start = args;
const char* end = NULL;
while ((end = strchr(start, ' '))!= NULL) {
if (!parse_command) {
parse_command = true;
} else if (key.empty()) {
key.assign(sp.field(), sp.length());
} else {
LOG(WARNING) << "unknown args: " << sp;
key.assign(start, end);
}
start = end + 1;
}
if (key.empty()) {
if (!parse_command) {
output->SetError("ERR wrong number of arguments for 'get' command");
return brpc::RedisCommandHandler::OK;
}
if (key.empty()) {
key.assign(start);
}
std::string value;
if (_rsimpl->Get(key, &value)) {
output->SetBulkString(value);
output->SetString(value);
} else {
output->SetNilString();
}
......@@ -102,22 +105,25 @@ public:
std::string key;
std::string value;
bool parse_command = false;
butil::StringSplitter sp(args, ' ');
for (; sp; ++sp) {
const char* start = args;
const char* end = NULL;
while ((end = strchr(start, ' '))!= NULL) {
if (!parse_command) {
parse_command = true;
} else if (key.empty()) {
key.assign(sp.field(), sp.length());
key.assign(start, end);
} else if (value.empty()) {
value.assign(sp.field(), sp.length());
} else {
LOG(WARNING) << "unknown args: " << sp;
value.assign(start, end);
}
start = end + 1;
}
if (key.empty() || value.empty()) {
if (!parse_command || key.empty()) {
output->SetError("ERR wrong number of arguments for 'set' command");
return brpc::RedisCommandHandler::OK;
}
if (value.empty()) {
value.assign(start);
}
_rsimpl->Set(key, value);
output->SetStatus("OK");
return brpc::RedisCommandHandler::OK;
......
......@@ -44,8 +44,9 @@ namespace policy {
DEFINE_bool(redis_verbose, false,
"[DEBUG] Print EVERY redis request/response");
DEFINE_int32(redis_batch_flush_max_size, 4096, "beyond which the server response"
" are forced to write to socket");
DEFINE_int32(redis_batch_flush_data_size, 4096, "If the total data size of buffered "
"responses is beyond this value, then data is forced to write to socket"
"to avoid latency of the front responses being too big");
struct InputResponse : public InputMessageBase {
bthread_id_t id_wait;
......@@ -57,21 +58,34 @@ struct InputResponse : public InputMessageBase {
}
};
static bool ParseArgs(const RedisReply& message, std::ostringstream& os) {
static bool ParseArgs(const RedisReply& message, std::unique_ptr<char[]>* args) {
if (!message.is_array() || message.size() == 0) {
LOG(WARNING) << "request message is not array or size equals to zero";
return false;
}
int total_size = 0;
for (size_t i = 0; i < message.size(); ++i) {
if (!message[i].is_string()) {
LOG(WARNING) << "request message[" << i << "] is not array";
return false;
}
if (i != 0) {
os << " ";
total_size++; // add one byte for ' '
}
os << message[i].c_str();
total_size += message[i].size();
}
args->reset(new char[total_size + 1 /* NULL */]);
int len = 0;
for (size_t i = 0; i < message.size(); ++i) {
if (i != 0) {
(*args)[len++] = ' ';
}
memcpy(args->get() + len, message[i].c_str(), message[i].size());
len += message[i].size();
}
(*args)[len] = '\0';
CHECK(len == total_size) << "implementation of ParseArgs is buggy, len="
<< len << " expected=" << total_size;
return true;
}
......@@ -81,19 +95,19 @@ struct RedisTask {
};
// This class is as parsing_context in socket.
class RedisConnContext : public SharedObject
, public Destroyable {
class RedisConnContext : public Destroyable {
public:
RedisConnContext()
: handler_continue(NULL) {}
: redis_service(NULL)
, handler_continue(NULL) {}
~RedisConnContext();
// @Destroyable
void Destroy();
void Destroy() override;
int Init();
SocketId socket_id;
RedisService::CommandMap command_map;
RedisService* redis_service;
// If user starts a transaction, handler_continue indicates the
// first handler pointer that triggers the transaction.
RedisCommandHandler* handler_continue;
......@@ -106,33 +120,31 @@ public:
int ConsumeTask(RedisConnContext* ctx, RedisTask* task, butil::IOBuf* sendbuf) {
RedisReply output(&task->arena);
std::ostringstream os;
if (!ParseArgs(task->input_message, os)) {
std::unique_ptr<char[]> args;
if (!ParseArgs(task->input_message, &args)) {
LOG(ERROR) << "ERR command not string";
output.SetError("ERR command not string");
return -1;
}
if (ctx->handler_continue) {
RedisCommandHandler::Result result =
ctx->handler_continue->Run(os.str().c_str(), &output);
ctx->handler_continue->Run(args.get(), &output);
if (result == RedisCommandHandler::OK) {
ctx->handler_continue = NULL;
}
} else {
std::string comm;
comm.reserve(8);
for (const char* c = task->input_message[0].c_str(); *c; ++c) {
comm.push_back(std::tolower(*c));
}
auto it = ctx->command_map.find(comm);
if (it == ctx->command_map.end()) {
std::string comm = task->input_message[0].c_str();
std::transform(comm.begin(), comm.end(), comm.begin(),
[](unsigned char c){ return std::tolower(c); });
RedisCommandHandler* ch = ctx->redis_service->FindCommandHandler(comm);
if (!ch) {
char buf[64];
snprintf(buf, sizeof(buf), "ERR unknown command `%s`", comm.c_str());
output.SetError(buf);
} else {
RedisCommandHandler::Result result = it->second->Run(os.str().c_str(), &output);
RedisCommandHandler::Result result = ch->Run(args.get(), &output);
if (result == RedisCommandHandler::CONTINUE) {
ctx->handler_continue = it->second.get();
ctx->handler_continue = ch;
}
}
}
......@@ -143,7 +155,7 @@ int ConsumeTask(RedisConnContext* ctx, RedisTask* task, butil::IOBuf* sendbuf) {
int Consume(void* ctx, bthread::TaskIterator<RedisTask*>& iter) {
RedisConnContext* qctx = static_cast<RedisConnContext*>(ctx);
if (iter.is_queue_stopped()) {
qctx->RemoveRefManually();
delete qctx;
return 0;
}
SocketUniquePtr s;
......@@ -164,7 +176,12 @@ int Consume(void* ctx, bthread::TaskIterator<RedisTask*>& iter) {
has_err = true;
continue;
}
if ((int)sendbuf.size() >= FLAGS_redis_batch_flush_max_size) {
// If there are too many tasks to execute, latency of the front
// responses will be increased by waiting the following tasks to
// be completed. To prevent this, if the current buf size is greater
// than FLAGS_redis_batch_flush_max_size, we just write the current
// buf first.
if ((int)sendbuf.size() >= FLAGS_redis_batch_flush_data_size) {
LOG_IF(WARNING, s->Write(&sendbuf, &wopt) != 0)
<< "Fail to send redis reply";
}
......@@ -211,12 +228,10 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
RedisConnContext* ctx = static_cast<RedisConnContext*>(socket->parsing_context());
if (ctx == NULL) {
ctx = new RedisConnContext;
// add ref that removed in Consume()
ctx->AddRefManually();
ctx->socket_id = socket->id();
rs->CloneCommandMap(&ctx->command_map);
ctx->redis_service = rs;
if (ctx->Init() != 0) {
ctx->RemoveRefManually();
delete ctx;
LOG(ERROR) << "Fail to init redis RedisConnContext";
return MakeParseError(PARSE_ERROR_NO_RESOURCE);
}
......
......@@ -438,10 +438,9 @@ std::ostream& operator<<(std::ostream& os, const RedisResponse& response) {
bool RedisService::AddCommandHandler(const std::string& name, RedisCommandHandler* handler) {
std::string lcname;
lcname.reserve(name.size());
for (auto c : name) {
lcname.push_back(std::tolower(c));
}
lcname.resize(name.size());
std::transform(name.begin(), name.end(), lcname.begin(),
[](unsigned char c){ return std::tolower(c); });
if (_command_map.count(lcname)) {
LOG(ERROR) << "redis command name=" << name << " exist";
return false;
......@@ -450,10 +449,12 @@ bool RedisService::AddCommandHandler(const std::string& name, RedisCommandHandle
return true;
}
void RedisService::CloneCommandMap(CommandMap* map) {
for (auto it : _command_map) {
(*map)[it.first].reset(it.second->New());
RedisCommandHandler* RedisService::FindCommandHandler(const std::string& name) {
auto it = _command_map.find(name);
if (it != _command_map.end()) {
return it->second.get();
}
return NULL;
}
} // namespace brpc
......@@ -226,12 +226,12 @@ public:
bool AddCommandHandler(const std::string& name, RedisCommandHandler* handler);
// This function should be touched by user and used by brpc deverloper only.
void CloneCommandMap(CommandMap* map);
RedisCommandHandler* FindCommandHandler(const std::string& name);
private:
CommandMap _command_map;
};
// The Command handler for a redis request. User should impletement Run() and New().
// The Command handler for a redis request. User should impletement Run().
class RedisCommandHandler {
public:
enum Result {
......@@ -259,12 +259,6 @@ public:
// all once the ending marker is received.
virtual RedisCommandHandler::Result Run(const char* args,
RedisReply* output) = 0;
// Whenever a tcp connection is established, a bunch of new handlers would be created
// using New() of the corresponding handler and brpc makes sure that all requests from
// one connection with the same command name would be redirected to the same New()-ed
// command handler.
virtual RedisCommandHandler* New() = 0;
};
} // namespace brpc
......
......@@ -59,12 +59,32 @@ public:
bool is_string() const; // True if the reply is a string.
bool is_array() const; // True if the reply is an array.
bool SetNilString(); // "$-1\r\n"
bool SetArray(int size); // size == -1 means nil array("*-1\r\n")
// Set the reply to the nil string. Return True if it is set
// successfully. If the reply has already been set, return false.
bool SetNilString();
// Set the reply to the array with `size' elements. If `size'
// is -1, then it is a nil array. After call SetArray, use
// operator[] to visit sub replies and set their value. Return
// True if it is set successfully. If the reply has already
// been set, return false.
bool SetArray(int size);
// Set the reply to status message `str'. Return True if it is set
// successfully. If the reply has already been set, return false.
bool SetStatus(const std::string& str);
// Set the reply to error message `str'. Return True if it is set
// successfully. If the reply has already been set, return false.
bool SetError(const std::string& str);
// Set the reply to integer `value'. Return True if it is set
// successfully. If the reply has already been set, return false.
bool SetInteger(int64_t value);
bool SetBulkString(const std::string& str);
// Set the reply to string `str'. Return True if it is set
// successfully. If the reply has already been set, return false.
bool SetString(const std::string& str);
// Convert the reply into a signed 64-bit integer(according to
// http://redis.io/topics/protocol). If the reply is not an integer,
......@@ -84,8 +104,9 @@ public:
// If you need a std::string, call .data().as_string() (which allocates mem)
butil::StringPiece data() const;
// Return number of sub replies in the array. If this reply is not an array,
// 0 is returned (call stacks are not logged).
// Return number of sub replies in the array if this reply is an array, or
// return the length of string if this reply is a string, otherwise 0 is
// returned (call stacks are not logged).
size_t size() const;
// Get the index-th sub reply. If this reply is not an array, a nil reply
// is returned (call stacks are not logged)
......@@ -147,6 +168,7 @@ private:
uint64_t padding[2]; // For swapping, must cover all bytes.
} _data;
butil::Arena* _arena;
bool _has_set;
};
// =========== inline impl. ==============
......@@ -164,7 +186,8 @@ inline RedisReply::RedisReply(butil::Arena* arena)
inline RedisReply::RedisReply()
: _type(REDIS_REPLY_NIL)
, _length(0)
, _arena(NULL) {
, _arena(NULL)
, _has_set(false) {
_data.array.last_index = -1;
_data.array.replies = NULL;
}
......@@ -190,14 +213,15 @@ inline int64_t RedisReply::integer() const {
}
inline bool RedisReply::SetNilString() {
if (!_arena) return false;
if (!_arena || _has_set) return false;
_type = REDIS_REPLY_STRING;
_length = npos;
_has_set = true;
return true;
}
inline bool RedisReply::SetArray(int size) {
if (!_arena) {
if (!_arena || _has_set) {
return false;
}
_type = REDIS_REPLY_ARRAY;
......@@ -218,19 +242,20 @@ inline bool RedisReply::SetArray(int size) {
}
_length = size;
_data.array.replies = subs;
_has_set = true;
return true;
}
inline bool RedisReply::SetBasicString(const std::string& str, RedisReplyType type) {
if (!_arena) {
if (!_arena || _has_set) {
return false;
}
size_t size = str.size();
const size_t size = str.size();
if (size < sizeof(_data.short_str)) {
memcpy(_data.short_str, str.c_str(), size);
_data.short_str[size] = '\0';
} else {
char* d = (char*)_arena->allocate((_length/8 + 1) * 8);
char* d = (char*)_arena->allocate((size/8 + 1) * 8);
if (!d) {
LOG(FATAL) << "Fail to allocate string[" << size << "]";
return false;
......@@ -241,6 +266,7 @@ inline bool RedisReply::SetBasicString(const std::string& str, RedisReplyType ty
}
_type = type;
_length = size;
_has_set = true;
return true;
}
......@@ -253,13 +279,17 @@ inline bool RedisReply::SetError(const std::string& str) {
}
inline bool RedisReply::SetInteger(int64_t value) {
if (!_arena || _has_set) {
return false;
}
_type = REDIS_REPLY_INTEGER;
_length = 0;
_data.integer = value;
_has_set = true;
return true;
}
inline bool RedisReply::SetBulkString(const std::string& str) {
inline bool RedisReply::SetString(const std::string& str) {
return SetBasicString(str, REDIS_REPLY_STRING);
}
......@@ -303,7 +333,7 @@ inline const char* RedisReply::error_message() const {
}
inline size_t RedisReply::size() const {
return (is_array() ? _length : 0);
return ((is_array() || is_string()) ? _length : 0);
}
inline RedisReply& RedisReply::operator[](size_t index) {
......@@ -331,6 +361,7 @@ inline void RedisReply::Clear() {
_length = 0;
_data.array.last_index = -1;
_data.array.replies = NULL;
_has_set = false;
}
inline void RedisReply::CopyFromSameArena(const RedisReply& other) {
......
......@@ -549,7 +549,7 @@ TEST_F(RedisTest, quote_and_escape) {
request.Clear();
}
TEST_F(RedisTest, codec) {
TEST_F(RedisTest, redis_reply_codec) {
butil::Arena arena;
// status
{
......@@ -591,7 +591,7 @@ TEST_F(RedisTest, codec) {
ASSERT_TRUE(r.is_nil());
r.Clear();
ASSERT_TRUE(r.SetBulkString("abcde'hello world"));
ASSERT_TRUE(r.SetString("abcde'hello world"));
ASSERT_TRUE(r.SerializeToIOBuf(&buf));
ASSERT_STREQ(buf.to_string().c_str(), "$17\r\nabcde'hello world\r\n");
ASSERT_STREQ(r.c_str(), "abcde'hello world");
......@@ -627,9 +627,9 @@ TEST_F(RedisTest, codec) {
ASSERT_TRUE(r.SetArray(3));
brpc::RedisReply& sub_reply = r[0];
sub_reply.SetArray(2);
sub_reply[0].SetBulkString("hello, it's me");
sub_reply[0].SetString("hello, it's me");
sub_reply[1].SetInteger(422);
r[1].SetBulkString("To go over everything");
r[1].SetString("To go over everything");
r[2].SetInteger(1);
ASSERT_TRUE(r[3].is_nil());
ASSERT_TRUE(r.SerializeToIOBuf(&buf));
......@@ -659,6 +659,44 @@ TEST_F(RedisTest, codec) {
ASSERT_EQ(r.ConsumePartialIOBuf(buf, &arena), brpc::PARSE_OK);
ASSERT_TRUE(r.is_nil());
}
// CopyFromDifferentArena
{
brpc::RedisReply r(&arena);
ASSERT_TRUE(r.SetArray(1));
brpc::RedisReply& sub_reply = r[0];
sub_reply.SetArray(2);
sub_reply[0].SetString("hello, it's me");
sub_reply[1].SetInteger(422);
brpc::RedisReply r2(NULL);
r2.CopyFromDifferentArena(r, &arena);
ASSERT_TRUE(r2.is_array());
ASSERT_EQ((int)r2[0].size(), 2);
ASSERT_STREQ(r2[0][0].c_str(), sub_reply[0].c_str());
ASSERT_EQ(r2[0][1].integer(), sub_reply[1].integer());
}
// SetXXX can only be called once
{
brpc::RedisReply r(&arena);
ASSERT_TRUE(r.SetStatus("OK"));
ASSERT_FALSE(r.SetStatus("OK"));
ASSERT_FALSE(r.SetNilString());
ASSERT_FALSE(r.SetArray(2));
ASSERT_FALSE(r.SetString("OK"));
ASSERT_FALSE(r.SetError("OK"));
ASSERT_FALSE(r.SetInteger(42));
}
{
brpc::RedisReply r(&arena);
ASSERT_TRUE(r.SetInteger(42));
ASSERT_FALSE(r.SetStatus("OK"));
ASSERT_FALSE(r.SetNilString());
ASSERT_FALSE(r.SetArray(2));
ASSERT_FALSE(r.SetString("OK"));
ASSERT_FALSE(r.SetError("OK"));
ASSERT_FALSE(r.SetStatus("OK"));
}
}
butil::Mutex s_mutex;
......@@ -694,11 +732,6 @@ public:
output->SetStatus("OK");
return brpc::RedisCommandHandler::OK;
}
RedisCommandHandler* New() { _new_count++; return new SetCommandHandler(); }
int new_count() { return _new_count; }
private:
int _new_count = 0;
};
class GetCommandHandler : public brpc::RedisCommandHandler {
......@@ -725,17 +758,12 @@ public:
}
auto it = m.find(key);
if (it != m.end()) {
output->SetBulkString(it->second);
output->SetString(it->second);
} else {
output->SetNilString();
}
return brpc::RedisCommandHandler::OK;
}
RedisCommandHandler* New() { _new_count++; return new GetCommandHandler(); }
int new_count() { return _new_count; }
private:
int _new_count = 0;
};
class IncrCommandHandler : public brpc::RedisCommandHandler {
......@@ -767,11 +795,6 @@ public:
output->SetInteger(value);
return brpc::RedisCommandHandler::OK;
}
RedisCommandHandler* New() { _new_count++; return new IncrCommandHandler(); }
int new_count() { return _new_count; }
private:
int _new_count = 0;
};
class RedisServiceImpl : public brpc::RedisService { };
......@@ -820,10 +843,6 @@ TEST_F(RedisTest, server_sanity) {
ASSERT_STREQ("value2", response.reply(5).c_str());
ASSERT_EQ(brpc::REDIS_REPLY_ERROR, response.reply(6).type());
ASSERT_TRUE(butil::StringPiece(response.reply(6).error_message()).starts_with("ERR unknown command"));
ASSERT_EQ(gh->new_count(), 1);
ASSERT_EQ(sh->new_count(), 1);
ASSERT_EQ(ih->new_count(), 1);
}
void* incr_thread(void* arg) {
......@@ -871,7 +890,6 @@ TEST_F(RedisTest, server_concurrency) {
delete channels[i];
}
ASSERT_EQ(int_map["count"], 10 * 5000LL);
ASSERT_EQ(ih->new_count(), N);
}
class MultiCommandHandler : public brpc::RedisCommandHandler {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment