Commit b0fb8e62 authored by zhujiashun's avatar zhujiashun

redis_server_protocol: impl redisCommandParser

parent aa2d9128
......@@ -58,42 +58,6 @@ struct InputResponse : public InputMessageBase {
}
};
static bool ParseArgs(const RedisReply& message, std::unique_ptr<char[]>* args) {
if (!message.is_array() || message.size() == 0) {
LOG(WARNING) << "request message is not array or size equals to zero";
return false;
}
int total_size = 0;
for (size_t i = 0; i < message.size(); ++i) {
if (!message[i].is_string()) {
LOG(WARNING) << "request message[" << i << "] is not array";
return false;
}
if (i != 0) {
total_size++; // add one byte for ' '
}
total_size += message[i].size();
}
args->reset(new char[total_size + 1 /* NULL */]);
int len = 0;
for (size_t i = 0; i < message.size(); ++i) {
if (i != 0) {
(*args)[len++] = ' ';
}
memcpy(args->get() + len, message[i].c_str(), message[i].size());
len += message[i].size();
}
(*args)[len] = '\0';
CHECK(len == total_size) << "implementation of ParseArgs is buggy, len="
<< len << " expected=" << total_size;
return true;
}
struct RedisTask {
RedisReply input_message;
butil::Arena arena;
};
// This class is as parsing_context in socket.
class RedisConnContext : public Destroyable {
public:
......@@ -112,38 +76,34 @@ public:
// first handler pointer that triggers the transaction.
RedisCommandHandler* handler_continue;
// The redis command are parsed and pushed into this queue
bthread::ExecutionQueueId<RedisTask*> queue;
bthread::ExecutionQueueId<std::string*> queue;
RedisReply parsing_message;
butil::Arena arena;
RedisCommandParser parser;
std::string command;
};
int ConsumeTask(RedisConnContext* ctx, RedisTask* task, butil::IOBuf* sendbuf) {
RedisReply output(&task->arena);
std::unique_ptr<char[]> args;
if (!ParseArgs(task->input_message, &args)) {
LOG(ERROR) << "ERR command not string";
output.SetError("ERR command not string");
output.SerializeToIOBuf(sendbuf);
return -1;
}
int ConsumeTask(RedisConnContext* ctx, std::string* command, butil::IOBuf* sendbuf) {
butil::Arena arena;
RedisReply output(&arena);
if (ctx->handler_continue) {
RedisCommandHandler::Result result =
ctx->handler_continue->Run(args.get(), &output);
ctx->handler_continue->Run(command->c_str(), &output);
if (result == RedisCommandHandler::OK) {
ctx->handler_continue = NULL;
}
} else {
std::string comm = task->input_message[0].c_str();
std::transform(comm.begin(), comm.end(), comm.begin(),
[](unsigned char c){ return std::tolower(c); });
std::string comm;
comm.reserve(8);
for (int i = 0; i < (int)command->size() && (*command)[i] != ' '; ++i) {
comm.push_back(std::tolower((*command)[i]));
}
RedisCommandHandler* ch = ctx->redis_service->FindCommandHandler(comm);
if (!ch) {
char buf[64];
snprintf(buf, sizeof(buf), "ERR unknown command `%s`", comm.c_str());
output.SetError(buf);
} else {
RedisCommandHandler::Result result = ch->Run(args.get(), &output);
RedisCommandHandler::Result result = ch->Run(command->c_str(), &output);
if (result == RedisCommandHandler::CONTINUE) {
ctx->handler_continue = ch;
}
......@@ -153,7 +113,7 @@ int ConsumeTask(RedisConnContext* ctx, RedisTask* task, butil::IOBuf* sendbuf) {
return 0;
}
int Consume(void* ctx, bthread::TaskIterator<RedisTask*>& iter) {
int Consume(void* ctx, bthread::TaskIterator<std::string*>& iter) {
RedisConnContext* qctx = static_cast<RedisConnContext*>(ctx);
if (iter.is_queue_stopped()) {
delete qctx;
......@@ -169,7 +129,7 @@ int Consume(void* ctx, bthread::TaskIterator<RedisTask*>& iter) {
wopt.ignore_eovercrowded = true;
butil::IOBuf sendbuf;
for (; iter; ++iter) {
std::unique_ptr<RedisTask> guard(*iter);
std::unique_ptr<std::string> guard(*iter);
if (has_err) {
continue;
}
......@@ -235,19 +195,17 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
}
socket->reset_parsing_context(ctx);
}
ParseError err = ctx->parsing_message.ConsumePartialIOBuf(*source, &ctx->arena);
ParseError err = ctx->parser.ParseCommand(*source, &ctx->command);
if (err != PARSE_OK) {
return MakeParseError(err);
}
std::unique_ptr<RedisTask> task(new RedisTask);
task->input_message.CopyFromDifferentArena(ctx->parsing_message, &task->arena);
ctx->parsing_message.Clear();
ctx->arena.clear();
if (bthread::execution_queue_execute(ctx->queue, task.get()) != 0) {
std::unique_ptr<std::string> command(new std::string);
command->swap(ctx->command);
if (bthread::execution_queue_execute(ctx->queue, command.get()) != 0) {
LOG(ERROR) << "Fail to push execution queue";
return MakeParseError(PARSE_ERROR_NO_RESOURCE);
}
task.release();
command.release();
return MakeMessage(NULL);
} else {
// NOTE(gejun): PopPipelinedInfo() is actually more contended than what
......
......@@ -35,7 +35,9 @@ void ProcessRedisResponse(InputMessageBase* msg);
// Actions to a redis request, which is left unimplemented.
// All requests are processed in execution queue pushed in
// the parsing process.
// the parsing process. This function must be declared since
// server side will enable redis as a server side protocol
// when this function is declared.
void ProcessRedisRequest(InputMessageBase* msg);
// Serialize a redis request.
......
......@@ -436,6 +436,57 @@ std::ostream& operator<<(std::ostream& os, const RedisResponse& response) {
return os;
}
bool RedisReply::SetArray(int size) {
if (!_arena || _has_set) {
return false;
}
_type = REDIS_REPLY_ARRAY;
if (size < 0) {
_length = npos;
return true;
} else if (size == 0) {
_length = 0;
return true;
}
RedisReply* subs = (RedisReply*)_arena->allocate(sizeof(RedisReply) * size);
if (!subs) {
LOG(FATAL) << "Fail to allocate RedisReply[" << size << "]";
return false;
}
for (int i = 0; i < size; ++i) {
new (&subs[i]) RedisReply(_arena);
}
_length = size;
_data.array.replies = subs;
_has_set = true;
return true;
}
bool RedisReply::SetBasicString(const std::string& str, RedisReplyType type) {
if (!_arena || _has_set) {
return false;
}
const size_t size = str.size();
if (size < sizeof(_data.short_str)) {
memcpy(_data.short_str, str.c_str(), size);
_data.short_str[size] = '\0';
} else {
char* d = (char*)_arena->allocate((size/8 + 1) * 8);
if (!d) {
LOG(FATAL) << "Fail to allocate string[" << size << "]";
return false;
}
memcpy(d, str.c_str(), size);
d[size] = '\0';
_data.long_str = d;
}
_type = type;
_length = size;
_has_set = true;
return true;
}
bool RedisService::AddCommandHandler(const std::string& name, RedisCommandHandler* handler) {
std::string lcname;
lcname.resize(name.size());
......
......@@ -360,4 +360,83 @@ butil::Status RedisCommandByComponents(butil::IOBuf* output,
return butil::Status::OK();
}
RedisCommandParser::RedisCommandParser() {
Reset();
}
ParseError RedisCommandParser::ParseCommand(butil::IOBuf& buf, std::string* out) {
const char* pfc = (const char*)buf.fetch1();
if (pfc == NULL) {
return PARSE_ERROR_NOT_ENOUGH_DATA;
}
// '*' stands for array "*<size>\r\n<sub-reply1><sub-reply2>..."
if (!_parsing_array && *pfc != '*') {
return PARSE_ERROR_TRY_OTHERS;
}
// '$' stands for bulk string "$<length>\r\n<string>\r\n"
if (_parsing_array && *pfc != '$') {
return PARSE_ERROR_ABSOLUTELY_WRONG;
}
char intbuf[32]; // enough for fc + 64-bit decimal + \r\n
const size_t ncopied = buf.copy_to(intbuf, sizeof(intbuf) - 1);
intbuf[ncopied] = '\0';
const size_t crlf_pos = butil::StringPiece(intbuf, ncopied).find("\r\n");
if (crlf_pos == butil::StringPiece::npos) { // not enough data
return PARSE_ERROR_NOT_ENOUGH_DATA;
}
char* endptr = NULL;
int64_t value = strtoll(intbuf + 1/*skip fc*/, &endptr, 10);
if (endptr != intbuf + crlf_pos) {
LOG(ERROR) << '`' << intbuf + 1 << "' is not a valid 64-bit decimal";
return PARSE_ERROR_ABSOLUTELY_WRONG;
}
if (!_parsing_array) {
buf.pop_front(crlf_pos + 2/*CRLF*/);
_parsing_array = true;
_length = value;
_index = 0;
return ParseCommand(buf, out);
}
if (_index >= _length) {
LOG(WARNING) << "a complete command has been parsed. Do you forget "
"to call RedisCommandParser.Reset()?";
return PARSE_ERROR_ABSOLUTELY_WRONG;
}
const int64_t len = value; // `value' is length of the string
if (len < 0) {
LOG(ERROR) << "string in command is nil!";
return PARSE_ERROR_ABSOLUTELY_WRONG;
}
if (len > (int64_t)std::numeric_limits<uint32_t>::max()) {
LOG(ERROR) << "string in command is too long! max length=2^32-1,"
" actually=" << len;
return PARSE_ERROR_ABSOLUTELY_WRONG;
}
if (buf.size() < crlf_pos + 2 + (size_t)len + 2/*CRLF*/) {
return PARSE_ERROR_NOT_ENOUGH_DATA;
}
buf.pop_front(crlf_pos + 2/*CRLF*/);
if (!out->empty()) {
out->push_back(' '); // command is separated by ' '
}
buf.cutn(out, len);
char crlf[2];
buf.cutn(crlf, sizeof(crlf));
if (crlf[0] != '\r' || crlf[1] != '\n') {
LOG(ERROR) << "string in command is not ended with CRLF";
return PARSE_ERROR_ABSOLUTELY_WRONG;
}
if (++_index < _length) {
return ParseCommand(buf, out);
}
Reset();
return PARSE_OK;
}
void RedisCommandParser::Reset() {
_parsing_array = false;
_length = 0;
_index = 0;
}
} // namespace brpc
......@@ -22,7 +22,7 @@
#include "butil/iobuf.h"
#include "butil/status.h"
#include "brpc/parse_result.h"
namespace brpc {
......@@ -40,6 +40,23 @@ butil::Status RedisCommandByComponents(butil::IOBuf* buf,
const butil::StringPiece* components,
size_t num_components);
// A parser used to parse redis raw command.
class RedisCommandParser {
public:
RedisCommandParser();
// Parse raw message from `buf' and write the result to `out'.
ParseError ParseCommand(butil::IOBuf& buf, std::string* out);
private:
// Reset parser to the initial state.
void Reset();
bool _parsing_array; // if the parser has met array indicator '*'
int _length; // array length
int _index; // current parsing array index
};
} // namespace brpc
......
......@@ -220,56 +220,6 @@ inline bool RedisReply::SetNilString() {
return true;
}
inline bool RedisReply::SetArray(int size) {
if (!_arena || _has_set) {
return false;
}
_type = REDIS_REPLY_ARRAY;
if (size < 0) {
_length = npos;
return true;
} else if (size == 0) {
_length = 0;
return true;
}
RedisReply* subs = (RedisReply*)_arena->allocate(sizeof(RedisReply) * size);
if (!subs) {
LOG(FATAL) << "Fail to allocate RedisReply[" << size << "]";
return false;
}
for (int i = 0; i < size; ++i) {
new (&subs[i]) RedisReply(_arena);
}
_length = size;
_data.array.replies = subs;
_has_set = true;
return true;
}
inline bool RedisReply::SetBasicString(const std::string& str, RedisReplyType type) {
if (!_arena || _has_set) {
return false;
}
const size_t size = str.size();
if (size < sizeof(_data.short_str)) {
memcpy(_data.short_str, str.c_str(), size);
_data.short_str[size] = '\0';
} else {
char* d = (char*)_arena->allocate((size/8 + 1) * 8);
if (!d) {
LOG(FATAL) << "Fail to allocate string[" << size << "]";
return false;
}
memcpy(d, str.c_str(), size);
d[size] = '\0';
_data.long_str = d;
}
_type = type;
_length = size;
_has_set = true;
return true;
}
inline bool RedisReply::SetStatus(const std::string& str) {
return SetBasicString(str, REDIS_REPLY_STATUS);
}
......
......@@ -24,6 +24,7 @@
#include <brpc/channel.h>
#include <brpc/policy/redis_authenticator.h>
#include <brpc/server.h>
#include <brpc/redis_command.h>
#include <gtest/gtest.h>
namespace brpc {
......@@ -549,6 +550,75 @@ TEST_F(RedisTest, quote_and_escape) {
request.Clear();
}
TEST_F(RedisTest, command_parser) {
brpc::RedisCommandParser parser;
butil::IOBuf buf;
{
// parse from whole command
std::string command = "set abc edc";
ASSERT_TRUE(brpc::RedisCommandNoFormat(&buf, command.c_str()).ok());
std::string command_out;
ASSERT_EQ(brpc::PARSE_OK, parser.ParseCommand(buf, &command_out));
ASSERT_TRUE(buf.empty());
ASSERT_STREQ(command.c_str(), command_out.c_str());
}
{
// parse from two consecutive buf
buf.append("*3\r\n$3");
std::string command_out;
ASSERT_EQ(brpc::PARSE_ERROR_NOT_ENOUGH_DATA,
parser.ParseCommand(buf, &command_out));
ASSERT_EQ((int)buf.size(), 2); // left "$3"
buf.append("\r\nset\r\n$3\r\nabc\r\n$3\r\ndef\r\n");
ASSERT_EQ(brpc::PARSE_OK, parser.ParseCommand(buf, &command_out));
ASSERT_TRUE(buf.empty());
ASSERT_STREQ(command_out.c_str(), "set abc def");
}
{
// there is a non-string message and parse should fail
buf.append("*3\r\n$3");
std::string command_out;
ASSERT_EQ(brpc::PARSE_ERROR_NOT_ENOUGH_DATA,
parser.ParseCommand(buf, &command_out));
ASSERT_EQ((int)buf.size(), 2); // left "$3"
buf.append("\r\nset\r\n:123\r\n$3\r\ndef\r\n");
ASSERT_EQ(brpc::PARSE_ERROR_ABSOLUTELY_WRONG, parser.ParseCommand(buf, &command_out));
parser.Reset();
}
{
// not array
buf.append(":123456\r\n");
std::string command_out;
ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS,
parser.ParseCommand(buf, &command_out));
parser.Reset();
}
{
// not array
buf.append("+Error\r\n");
std::string command_out;
ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS,
parser.ParseCommand(buf, &command_out));
parser.Reset();
}
{
// not array
buf.append("+OK\r\n");
std::string command_out;
ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS,
parser.ParseCommand(buf, &command_out));
parser.Reset();
}
{
// not array
buf.append("$5\r\nhello\r\n");
std::string command_out;
ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS,
parser.ParseCommand(buf, &command_out));
parser.Reset();
}
}
TEST_F(RedisTest, redis_reply_codec) {
butil::Arena arena;
// status
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment