Commit a378c9f1 authored by zhujiashun's avatar zhujiashun

redis_server_protocol: remove vector<vector>

parent da3939ca
...@@ -61,7 +61,9 @@ struct InputResponse : public InputMessageBase { ...@@ -61,7 +61,9 @@ struct InputResponse : public InputMessageBase {
class RedisConnContext : public Destroyable { class RedisConnContext : public Destroyable {
public: public:
RedisConnContext() RedisConnContext()
: redis_service(NULL) {} : redis_service(NULL)
, batched_size(0) {}
~RedisConnContext(); ~RedisConnContext();
// @Destroyable // @Destroyable
void Destroy() override; void Destroy() override;
...@@ -71,9 +73,10 @@ public: ...@@ -71,9 +73,10 @@ public:
// If user starts a transaction, handler_continue indicates the // If user starts a transaction, handler_continue indicates the
// handler pointer that runs the transaction command. // handler pointer that runs the transaction command.
std::unique_ptr<RedisCommandHandler> handler_continue; std::unique_ptr<RedisCommandHandler> handler_continue;
// >0 if command handler is run in batched mode.
int batched_size;
RedisCommandParser parser; RedisCommandParser parser;
std::vector<std::string> command;
}; };
static std::string ToLowercase(const std::string& command) { static std::string ToLowercase(const std::string& command) {
...@@ -84,65 +87,54 @@ static std::string ToLowercase(const std::string& command) { ...@@ -84,65 +87,54 @@ static std::string ToLowercase(const std::string& command) {
return res; return res;
} }
int ConsumeTask(RedisConnContext* ctx, int ConsumeCommand(RedisConnContext* ctx,
const std::vector<std::vector<std::string> >& commands, const std::unique_ptr<const char*[]>& commands,
butil::IOBuf* sendbuf) { int len, butil::Arena* arena,
butil::Arena arena; bool is_last,
int size = commands.size(); butil::IOBuf* sendbuf) {
std::string next_comm; RedisReply output(arena);
RedisReply reply(&arena); RedisCommandHandler::Result result = RedisCommandHandler::OK;
RedisReply* output = NULL; if (ctx->handler_continue) {
if (size == 1) { result = ctx->handler_continue->Run(len, commands.get(), &output, is_last);
// Optimize for the most common case if (result == RedisCommandHandler::OK) {
output = &reply; ctx->handler_continue.reset(NULL);
} else { } else if (result == RedisCommandHandler::BATCHED) {
output = (RedisReply*)malloc(sizeof(RedisReply) * size); LOG(ERROR) << "BATCHED should not be returned in redis transaction process.";
for (int i = 0; i < size; ++i) { return -1;
new (&output[i]) RedisReply(&arena);
} }
} } else {
for (int i = 0; i < size; ++i) { std::string lcname = ToLowercase(commands[0]);
if (ctx->handler_continue) { RedisCommandHandler* ch = ctx->redis_service->FindCommandHandler(lcname);
bool is_last = (i == size - 1); if (!ch) {
RedisCommandHandler::Result result = char buf[64];
ctx->handler_continue->Run(commands[i], &output[i], is_last); snprintf(buf, sizeof(buf), "ERR unknown command `%s`", lcname.c_str());
if (result == RedisCommandHandler::OK) { output.SetError(buf);
ctx->handler_continue.reset(NULL);
}
} else { } else {
bool is_last = true; result = ch->Run(len, commands.get(), &output, is_last);
std::string comm; if (result == RedisCommandHandler::CONTINUE) {
if (i == 0) { if (ctx->batched_size) {
comm = ToLowercase(commands[i][0]); LOG(ERROR) << "CONTINUE should not be returned in redis batched process.";
} else { return -1;
comm.swap(next_comm);
}
if ((i + 1) < size) {
next_comm = ToLowercase(commands[i + 1][0]);
if (comm == next_comm) {
is_last = false;
}
}
RedisCommandHandler* ch = ctx->redis_service->FindCommandHandler(comm);
if (!ch) {
char buf[64];
snprintf(buf, sizeof(buf), "ERR unknown command `%s`", comm.c_str());
output[i].SetError(buf);
} else {
RedisCommandHandler::Result result =
ch->Run(commands[i], &output[i], is_last);
if (result == RedisCommandHandler::CONTINUE) {
ctx->handler_continue.reset(ch->NewTransactionHandler());
} }
ctx->handler_continue.reset(ch->NewTransactionHandler());
} else if (result == RedisCommandHandler::BATCHED) {
ctx->batched_size++;
} }
} }
} }
for (int i = 0; i < size; ++i) { if (result == RedisCommandHandler::OK && ctx->batched_size) {
output[i].SerializeTo(sendbuf); if ((int)output.size() != (ctx->batched_size + 1)) {
} LOG(ERROR) << "reply array size can't be matched with batched size, "
if (size != 1) { << " expected=" << ctx->batched_size + 1 << " actual=" << output.size();
free(output); return -1;
} }
for (int i = 0; i < (int)output.size(); ++i) {
output[i].SerializeTo(sendbuf);
}
ctx->batched_size = 0;
} else if (result != RedisCommandHandler::BATCHED) {
output.SerializeTo(sendbuf);
} // else result == RedisCommandHandler::BATCHED, do not serialize to buf
return 0; return 0;
} }
...@@ -174,27 +166,40 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket, ...@@ -174,27 +166,40 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
ctx->redis_service = rs; ctx->redis_service = rs;
socket->reset_parsing_context(ctx); socket->reset_parsing_context(ctx);
} }
std::vector<std::vector<std::string> > commands; butil::Arena arena;
std::unique_ptr<const char*[]> current_commands;
int current_len = 0;
butil::IOBuf sendbuf;
ParseError err = PARSE_OK; ParseError err = PARSE_OK;
err = ctx->parser.Consume(*source, &current_commands, &current_len, &arena);
if (err != PARSE_OK) {
return MakeParseError(err);
}
while (true) { while (true) {
err = ctx->parser.Consume(*source, &ctx->command); std::unique_ptr<const char*[]> next_commands;
int next_len = 0;
err = ctx->parser.Consume(*source, &next_commands, &next_len, &arena);
if (err != PARSE_OK) { if (err != PARSE_OK) {
break; break;
} }
commands.emplace_back(std::move(ctx->command)); // safe to read first element.
CHECK(ctx->command.empty()); // current_commands and next_commands both have at least one element(NULL).
} bool is_last = (strcasecmp(current_commands[0], next_commands[0]) != 0);
if (!commands.empty()) { if (ConsumeCommand(ctx, current_commands, current_len, &arena, is_last, &sendbuf) != 0) {
butil::IOBuf sendbuf;
if (ConsumeTask(ctx, commands, &sendbuf) != 0) {
return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG); return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
} }
CHECK(sendbuf.size() > 0) << "invalid size=0 of sendbuf"; current_commands.swap(next_commands);
Socket::WriteOptions wopt; current_len = next_len;
wopt.ignore_eovercrowded = true; }
LOG_IF(WARNING, socket->Write(&sendbuf, &wopt) != 0) if (ConsumeCommand(ctx, current_commands, current_len, &arena, true, &sendbuf) != 0) {
<< "Fail to send redis reply"; return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
} }
CHECK(!sendbuf.empty());
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
LOG_IF(WARNING, socket->Write(&sendbuf, &wopt) != 0)
<< "Fail to send redis reply";
return MakeParseError(err); return MakeParseError(err);
} else { } else {
// NOTE(gejun): PopPipelinedInfo() is actually more contended than what // NOTE(gejun): PopPipelinedInfo() is actually more contended than what
......
...@@ -237,25 +237,30 @@ public: ...@@ -237,25 +237,30 @@ public:
enum Result { enum Result {
OK = 0, OK = 0,
CONTINUE = 1, CONTINUE = 1,
BATCHED = 2,
}; };
~RedisCommandHandler() {} ~RedisCommandHandler() {}
// Once Server receives commands, it will first find the corresponding handlers and // Once Server receives commands, it will first find the corresponding handlers and
// call them sequentially(one by one) according to the order that requests arrive, // call them sequentially(one by one) according to the order that requests arrive,
// just like what redis-server does. // just like what redis-server does.
// `args' is the array redis of request command. For example, "set somekey somevalue" // `args_len` is the length of request command.
// `args' is the array of request command. For example, "set somekey somevalue"
// corresponds to args[0]=="set", args[1]=="somekey" and args[2]=="somevalue". // corresponds to args[0]=="set", args[1]=="somekey" and args[2]=="somevalue".
// `output', which should be filled by user, is the content that sent to client side. // `output', which should be filled by user, is the content that sent to client side.
// Read brpc/src/redis_reply.h for more usage. // Read brpc/src/redis_reply.h for more usage.
// `is_last' indicates whether the commands is the last command of this batch. If user // `is_last' indicates whether the commands is the last command of this batch. If user
// want to do some batch processing, user should buffer the command and output. Once // want to do some batch processing, user should buffer the command and return
// `is_last' is true, then run all the command and set the output of each command. // RedisCommandHandler::BATCHED. Once `is_last' is true, run all the commands and
// set `output' to be an array and set all the results to the corresponding element
// of array.
//
// The return value should be RedisCommandHandler::OK for normal cases. If you want // The return value should be RedisCommandHandler::OK for normal cases. If you want
// to implement transaction, return RedisCommandHandler::CONTINUE once server receives // to implement transaction, return RedisCommandHandler::CONTINUE once server receives
// an start marker and brpc will call MultiTransactionHandler() to new a transaction // an start marker and brpc will call MultiTransactionHandler() to new a transaction
// handler that all the following commands are sent to this tranction handler until // handler that all the following commands are sent to this tranction handler until
// it returns Result::OK. Read the comment below. // it returns Result::OK. Read the comment below.
virtual RedisCommandHandler::Result Run(const std::vector<std::string>& command, virtual RedisCommandHandler::Result Run(int args_len, const char* args[],
brpc::RedisReply* output, brpc::RedisReply* output,
bool is_last) = 0; bool is_last) = 0;
......
...@@ -364,8 +364,9 @@ RedisCommandParser::RedisCommandParser() { ...@@ -364,8 +364,9 @@ RedisCommandParser::RedisCommandParser() {
Reset(); Reset();
} }
ParseError RedisCommandParser::Consume( ParseError RedisCommandParser::Consume(butil::IOBuf& buf,
butil::IOBuf& buf, std::vector<std::string>* command) { std::unique_ptr<const char*[]>* commands,
int* len_out, butil::Arena* arena) {
const char* pfc = (const char*)buf.fetch1(); const char* pfc = (const char*)buf.fetch1();
if (pfc == NULL) { if (pfc == NULL) {
return PARSE_ERROR_NOT_ENOUGH_DATA; return PARSE_ERROR_NOT_ENOUGH_DATA;
...@@ -396,8 +397,8 @@ ParseError RedisCommandParser::Consume( ...@@ -396,8 +397,8 @@ ParseError RedisCommandParser::Consume(
_parsing_array = true; _parsing_array = true;
_length = value; _length = value;
_index = 0; _index = 0;
_commands.clear(); _commands.reset(new const char*[value + 1/* for ending NULL */]);
return Consume(buf, command); return Consume(buf, commands, len_out, arena);
} }
CHECK(_index < _length) << "a complete command has been parsed. " CHECK(_index < _length) << "a complete command has been parsed. "
"impl of RedisCommandParser::Parse is buggy"; "impl of RedisCommandParser::Parse is buggy";
...@@ -415,8 +416,10 @@ ParseError RedisCommandParser::Consume( ...@@ -415,8 +416,10 @@ ParseError RedisCommandParser::Consume(
return PARSE_ERROR_NOT_ENOUGH_DATA; return PARSE_ERROR_NOT_ENOUGH_DATA;
} }
buf.pop_front(crlf_pos + 2/*CRLF*/); buf.pop_front(crlf_pos + 2/*CRLF*/);
_commands.emplace_back(); char* d = (char*)arena->allocate((len/8 + 1) * 8);
buf.cutn(&_commands.back(), len); buf.cutn(d, len);
d[len] = '\0';
_commands[_index] = d;
char crlf[2]; char crlf[2];
buf.cutn(crlf, sizeof(crlf)); buf.cutn(crlf, sizeof(crlf));
if (crlf[0] != '\r' || crlf[1] != '\n') { if (crlf[0] != '\r' || crlf[1] != '\n') {
...@@ -424,10 +427,11 @@ ParseError RedisCommandParser::Consume( ...@@ -424,10 +427,11 @@ ParseError RedisCommandParser::Consume(
return PARSE_ERROR_ABSOLUTELY_WRONG; return PARSE_ERROR_ABSOLUTELY_WRONG;
} }
if (++_index < _length) { if (++_index < _length) {
return Consume(buf, command); return Consume(buf, commands, len_out, arena);
} }
command->clear(); _commands[_index] = NULL;
command->swap(_commands); commands->swap(_commands);
*len_out = _index;
Reset(); Reset();
return PARSE_OK; return PARSE_OK;
} }
...@@ -436,6 +440,7 @@ void RedisCommandParser::Reset() { ...@@ -436,6 +440,7 @@ void RedisCommandParser::Reset() {
_parsing_array = false; _parsing_array = false;
_length = 0; _length = 0;
_index = 0; _index = 0;
_commands.reset(NULL);
} }
} // namespace brpc } // namespace brpc
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include <vector> #include <vector>
#include "butil/iobuf.h" #include "butil/iobuf.h"
#include "butil/status.h" #include "butil/status.h"
#include "butil/arena.h"
#include "brpc/parse_result.h" #include "brpc/parse_result.h"
namespace brpc { namespace brpc {
...@@ -47,8 +48,10 @@ public: ...@@ -47,8 +48,10 @@ public:
RedisCommandParser(); RedisCommandParser();
// Parse raw message from `buf'. Return PARSE_OK and set the parsed command // Parse raw message from `buf'. Return PARSE_OK and set the parsed command
// to `command' if successful. // to `commands' and length to `len' if successful. Memory of commands are
ParseError Consume(butil::IOBuf& buf, std::vector<std::string>* command); // allocated in `arena'.
ParseError Consume(butil::IOBuf& buf, std::unique_ptr<const char*[]>* commands,
int*len, butil::Arena* arena);
private: private:
// Reset parser to the initial state. // Reset parser to the initial state.
...@@ -57,7 +60,7 @@ private: ...@@ -57,7 +60,7 @@ private:
bool _parsing_array; // if the parser has met array indicator '*' bool _parsing_array; // if the parser has met array indicator '*'
int _length; // array length int _length; // array length
int _index; // current parsing array index int _index; // current parsing array index
std::vector<std::string> _commands; // parsed command string std::unique_ptr<const char*[]> _commands; // parsed command string
}; };
} // namespace brpc } // namespace brpc
......
...@@ -419,8 +419,8 @@ bool RedisReply::SetArray(int size) { ...@@ -419,8 +419,8 @@ bool RedisReply::SetArray(int size) {
} }
_type = REDIS_REPLY_ARRAY; _type = REDIS_REPLY_ARRAY;
if (size < 0) { if (size < 0) {
_length = npos; LOG(ERROR) << "negative size=" << size << " when calling SetArray";
return true; return false;
} else if (size == 0) { } else if (size == 0) {
_length = 0; _length = 0;
return true; return true;
......
...@@ -59,15 +59,18 @@ public: ...@@ -59,15 +59,18 @@ public:
bool is_string() const; // True if the reply is a string. bool is_string() const; // True if the reply is a string.
bool is_array() const; // True if the reply is an array. bool is_array() const; // True if the reply is an array.
// Set the reply to the nil string. Return True if it is set // Set the reply to the null string. Return True if it is set
// successfully. If the reply has already been set, return false. // successfully. If the reply has already been set, return false.
bool SetNilString(); bool SetNullString();
// Set the reply to the array with `size' elements. If `size' // Set the reply to the null array. Return True if it is set
// is -1, then it is a nil array. After call SetArray, use // successfully. If the reply has already been set, return false.
// operator[] to visit sub replies and set their value. Return bool SetNullArray();
// True if it is set successfully. If the reply has already
// been set, return false. // Set the reply to the array with `size' elements. After calling
// SetArray, use operator[] to visit sub replies and set their
// value. Return True if it is set successfully. If the reply has
// already been set, return false.
bool SetArray(int size); bool SetArray(int size);
// Set the reply to status message `str'. Return True if it is set // Set the reply to status message `str'. Return True if it is set
...@@ -210,8 +213,17 @@ inline int64_t RedisReply::integer() const { ...@@ -210,8 +213,17 @@ inline int64_t RedisReply::integer() const {
return 0; return 0;
} }
inline bool RedisReply::SetNilString() { inline bool RedisReply::SetNullArray() {
if (!_arena || _type != REDIS_REPLY_NIL) { if (_type != REDIS_REPLY_NIL) {
return false;
}
_type = REDIS_REPLY_ARRAY;
_length = npos;
return true;
}
inline bool RedisReply::SetNullString() {
if (_type != REDIS_REPLY_NIL) {
return false; return false;
} }
_type = REDIS_REPLY_STRING; _type = REDIS_REPLY_STRING;
...@@ -228,7 +240,7 @@ inline bool RedisReply::SetError(const std::string& str) { ...@@ -228,7 +240,7 @@ inline bool RedisReply::SetError(const std::string& str) {
} }
inline bool RedisReply::SetInteger(int64_t value) { inline bool RedisReply::SetInteger(int64_t value) {
if (!_arena || _type != REDIS_REPLY_NIL) { if (_type != REDIS_REPLY_NIL) {
return false; return false;
} }
_type = REDIS_REPLY_INTEGER; _type = REDIS_REPLY_INTEGER;
......
...@@ -550,9 +550,9 @@ TEST_F(RedisTest, quote_and_escape) { ...@@ -550,9 +550,9 @@ TEST_F(RedisTest, quote_and_escape) {
request.Clear(); request.Clear();
} }
std::string GetCompleteCommand(const std::vector<std::string>& commands) { std::string GetCompleteCommand(const std::unique_ptr<const char*[]>& commands) {
std::string res; std::string res;
for (int i = 0; i < (int)commands.size(); ++i) { for (int i = 0; commands[i]; ++i) {
if (i != 0) { if (i != 0) {
res.push_back(' '); res.push_back(' ');
} }
...@@ -565,12 +565,14 @@ std::string GetCompleteCommand(const std::vector<std::string>& commands) { ...@@ -565,12 +565,14 @@ std::string GetCompleteCommand(const std::vector<std::string>& commands) {
TEST_F(RedisTest, command_parser) { TEST_F(RedisTest, command_parser) {
brpc::RedisCommandParser parser; brpc::RedisCommandParser parser;
butil::IOBuf buf; butil::IOBuf buf;
std::vector<std::string> command_out; std::unique_ptr<const char*[]> command_out;
butil::Arena arena;
int len = 0;
{ {
// parse from whole command // parse from whole command
std::string command = "set abc edc"; std::string command = "set abc edc";
ASSERT_TRUE(brpc::RedisCommandNoFormat(&buf, command.c_str()).ok()); ASSERT_TRUE(brpc::RedisCommandNoFormat(&buf, command.c_str()).ok());
ASSERT_EQ(brpc::PARSE_OK, parser.Consume(buf, &command_out)); ASSERT_EQ(brpc::PARSE_OK, parser.Consume(buf, &command_out, &len, &arena));
ASSERT_TRUE(buf.empty()); ASSERT_TRUE(buf.empty());
ASSERT_STREQ(command.c_str(), GetCompleteCommand(command_out).c_str()); ASSERT_STREQ(command.c_str(), GetCompleteCommand(command_out).c_str());
} }
...@@ -583,11 +585,11 @@ TEST_F(RedisTest, command_parser) { ...@@ -583,11 +585,11 @@ TEST_F(RedisTest, command_parser) {
for (int i = 0; i < size; ++i) { for (int i = 0; i < size; ++i) {
buf.push_back(raw_string[i]); buf.push_back(raw_string[i]);
if (i == size - 1) { if (i == size - 1) {
ASSERT_EQ(brpc::PARSE_OK, parser.Consume(buf, &command_out)); ASSERT_EQ(brpc::PARSE_OK, parser.Consume(buf, &command_out, &len, &arena));
} else { } else {
if (butil::fast_rand_less_than(2) == 0) { if (butil::fast_rand_less_than(2) == 0) {
ASSERT_EQ(brpc::PARSE_ERROR_NOT_ENOUGH_DATA, ASSERT_EQ(brpc::PARSE_ERROR_NOT_ENOUGH_DATA,
parser.Consume(buf, &command_out)); parser.Consume(buf, &command_out, &len, &arena));
} }
} }
} }
...@@ -598,34 +600,34 @@ TEST_F(RedisTest, command_parser) { ...@@ -598,34 +600,34 @@ TEST_F(RedisTest, command_parser) {
{ {
// there is a non-string message in command and parse should fail // there is a non-string message in command and parse should fail
buf.append("*3\r\n$3"); buf.append("*3\r\n$3");
ASSERT_EQ(brpc::PARSE_ERROR_NOT_ENOUGH_DATA, parser.Consume(buf, &command_out)); ASSERT_EQ(brpc::PARSE_ERROR_NOT_ENOUGH_DATA, parser.Consume(buf, &command_out, &len, &arena));
ASSERT_EQ((int)buf.size(), 2); // left "$3" ASSERT_EQ((int)buf.size(), 2); // left "$3"
buf.append("\r\nset\r\n:123\r\n$3\r\ndef\r\n"); buf.append("\r\nset\r\n:123\r\n$3\r\ndef\r\n");
ASSERT_EQ(brpc::PARSE_ERROR_ABSOLUTELY_WRONG, parser.Consume(buf, &command_out)); ASSERT_EQ(brpc::PARSE_ERROR_ABSOLUTELY_WRONG, parser.Consume(buf, &command_out, &len, &arena));
parser.Reset(); parser.Reset();
} }
{ {
// not array // not array
buf.append(":123456\r\n"); buf.append(":123456\r\n");
ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Consume(buf, &command_out)); ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Consume(buf, &command_out, &len, &arena));
parser.Reset(); parser.Reset();
} }
{ {
// not array // not array
buf.append("+Error\r\n"); buf.append("+Error\r\n");
ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Consume(buf, &command_out)); ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Consume(buf, &command_out, &len, &arena));
parser.Reset(); parser.Reset();
} }
{ {
// not array // not array
buf.append("+OK\r\n"); buf.append("+OK\r\n");
ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Consume(buf, &command_out)); ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Consume(buf, &command_out, &len, &arena));
parser.Reset(); parser.Reset();
} }
{ {
// not array // not array
buf.append("$5\r\nhello\r\n"); buf.append("$5\r\nhello\r\n");
ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Consume(buf, &command_out)); ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Consume(buf, &command_out, &len, &arena));
parser.Reset(); parser.Reset();
} }
} }
...@@ -663,7 +665,7 @@ TEST_F(RedisTest, redis_reply_codec) { ...@@ -663,7 +665,7 @@ TEST_F(RedisTest, redis_reply_codec) {
{ {
brpc::RedisReply r(&arena); brpc::RedisReply r(&arena);
butil::IOBuf buf; butil::IOBuf buf;
ASSERT_TRUE(r.SetNilString()); ASSERT_TRUE(r.SetNullString());
ASSERT_TRUE(r.SerializeTo(&buf)); ASSERT_TRUE(r.SerializeTo(&buf));
ASSERT_STREQ(buf.to_string().c_str(), "$-1\r\n"); ASSERT_STREQ(buf.to_string().c_str(), "$-1\r\n");
r.Clear(); r.Clear();
...@@ -733,8 +735,8 @@ TEST_F(RedisTest, redis_reply_codec) { ...@@ -733,8 +735,8 @@ TEST_F(RedisTest, redis_reply_codec) {
ASSERT_EQ(1, r[2].integer()); ASSERT_EQ(1, r[2].integer());
r.Clear(); r.Clear();
// nil array // null array
ASSERT_TRUE(r.SetArray(-1)); ASSERT_TRUE(r.SetNullArray());
ASSERT_TRUE(r.SerializeTo(&buf)); ASSERT_TRUE(r.SerializeTo(&buf));
ASSERT_STREQ(buf.to_string().c_str(), "*-1\r\n"); ASSERT_STREQ(buf.to_string().c_str(), "*-1\r\n");
ASSERT_EQ(r.ConsumePartialIOBuf(buf, &arena), brpc::PARSE_OK); ASSERT_EQ(r.ConsumePartialIOBuf(buf, &arena), brpc::PARSE_OK);
...@@ -762,7 +764,7 @@ TEST_F(RedisTest, redis_reply_codec) { ...@@ -762,7 +764,7 @@ TEST_F(RedisTest, redis_reply_codec) {
brpc::RedisReply r(&arena); brpc::RedisReply r(&arena);
ASSERT_TRUE(r.SetStatus("OK")); ASSERT_TRUE(r.SetStatus("OK"));
ASSERT_FALSE(r.SetStatus("OK")); ASSERT_FALSE(r.SetStatus("OK"));
ASSERT_FALSE(r.SetNilString()); ASSERT_FALSE(r.SetNullString());
ASSERT_FALSE(r.SetArray(2)); ASSERT_FALSE(r.SetArray(2));
ASSERT_FALSE(r.SetString("OK")); ASSERT_FALSE(r.SetString("OK"));
ASSERT_FALSE(r.SetError("OK")); ASSERT_FALSE(r.SetError("OK"));
...@@ -772,7 +774,7 @@ TEST_F(RedisTest, redis_reply_codec) { ...@@ -772,7 +774,7 @@ TEST_F(RedisTest, redis_reply_codec) {
brpc::RedisReply r(&arena); brpc::RedisReply r(&arena);
ASSERT_TRUE(r.SetInteger(42)); ASSERT_TRUE(r.SetInteger(42));
ASSERT_FALSE(r.SetStatus("OK")); ASSERT_FALSE(r.SetStatus("OK"));
ASSERT_FALSE(r.SetNilString()); ASSERT_FALSE(r.SetNullString());
ASSERT_FALSE(r.SetArray(2)); ASSERT_FALSE(r.SetArray(2));
ASSERT_FALSE(r.SetString("OK")); ASSERT_FALSE(r.SetString("OK"));
ASSERT_FALSE(r.SetError("OK")); ASSERT_FALSE(r.SetError("OK"));
...@@ -790,26 +792,38 @@ public: ...@@ -790,26 +792,38 @@ public:
: _batch_process(batch_process) : _batch_process(batch_process)
, _batch_count(0) {} , _batch_count(0) {}
brpc::RedisCommandHandler::Result Run(const std::vector<std::string>& args, brpc::RedisCommandHandler::Result Run(int size, const char* args[],
brpc::RedisReply* output, brpc::RedisReply* output,
bool is_last) { bool is_last) {
if (args.size() < 3) { if (size < 3) {
output->SetError("ERR wrong number of arguments for 'set' command"); output->SetError("ERR wrong number of arguments for 'set' command");
return brpc::RedisCommandHandler::OK; return brpc::RedisCommandHandler::OK;
} }
if (_batch_process) { if (_batch_process) {
_batched_command.emplace_back(args); if (_batched_command.empty() && is_last) {
outputs.push_back(output); DoSet(args[1], args[2], output);
return brpc::RedisCommandHandler::OK;
}
std::vector<std::string> comm;
for (int i = 0; args[i]; ++i) {
comm.push_back(args[i]);
}
_batched_command.push_back(comm);
if (is_last) { if (is_last) {
output->SetArray(_batched_command.size());
for (int i = 0; i < (int)_batched_command.size(); ++i) { for (int i = 0; i < (int)_batched_command.size(); ++i) {
DoSet(_batched_command[i][1], _batched_command[i][2], outputs[i]); DoSet(_batched_command[i][1], _batched_command[i][2], &(*output)[i]);
} }
_batch_count++; _batch_count++;
_batched_command.clear();
return brpc::RedisCommandHandler::OK;
} else {
return brpc::RedisCommandHandler::BATCHED;
} }
} else {
DoSet(args[1], args[2], output);
return brpc::RedisCommandHandler::OK; return brpc::RedisCommandHandler::OK;
} }
DoSet(args[1], args[2], output);
return brpc::RedisCommandHandler::OK;
} }
void DoSet(const std::string& key, const std::string& value, brpc::RedisReply* output) { void DoSet(const std::string& key, const std::string& value, brpc::RedisReply* output) {
...@@ -818,7 +832,6 @@ public: ...@@ -818,7 +832,6 @@ public:
} }
private: private:
bool _batch_process; bool _batch_process;
std::vector<brpc::RedisReply*> outputs;
std::vector<std::vector<std::string> > _batched_command; std::vector<std::vector<std::string> > _batched_command;
int _batch_count; int _batch_count;
}; };
...@@ -829,26 +842,38 @@ public: ...@@ -829,26 +842,38 @@ public:
: _batch_process(batch_process) : _batch_process(batch_process)
, _batch_count(0) {} , _batch_count(0) {}
brpc::RedisCommandHandler::Result Run(const std::vector<std::string>& args, brpc::RedisCommandHandler::Result Run(int size, const char* args[],
brpc::RedisReply* output, brpc::RedisReply* output,
bool is_last) { bool is_last) {
if (args.size() < 2) { if (size < 2) {
output->SetError("ERR wrong number of arguments for 'get' command"); output->SetError("ERR wrong number of arguments for 'get' command");
return brpc::RedisCommandHandler::OK; return brpc::RedisCommandHandler::OK;
} }
if (_batch_process) { if (_batch_process) {
_batched_command.emplace_back(args); if (_batched_command.empty() && is_last) {
outputs.push_back(output); DoGet(args[1], output);
return brpc::RedisCommandHandler::OK;
}
std::vector<std::string> comm;
for (int i = 0; args[i]; ++i) {
comm.push_back(args[i]);
}
_batched_command.push_back(comm);
if (is_last) { if (is_last) {
output->SetArray(_batched_command.size());
for (int i = 0; i < (int)_batched_command.size(); ++i) { for (int i = 0; i < (int)_batched_command.size(); ++i) {
DoGet(_batched_command[i][1], outputs[i]); DoGet(_batched_command[i][1], &(*output)[i]);
} }
_batch_count++; _batch_count++;
_batched_command.clear();
return brpc::RedisCommandHandler::OK;
} else {
return brpc::RedisCommandHandler::BATCHED;
} }
} else {
DoGet(args[1], output);
return brpc::RedisCommandHandler::OK; return brpc::RedisCommandHandler::OK;
} }
DoGet(args[1], output);
return brpc::RedisCommandHandler::OK;
} }
void DoGet(const std::string& key, brpc::RedisReply* output) { void DoGet(const std::string& key, brpc::RedisReply* output) {
...@@ -856,12 +881,11 @@ public: ...@@ -856,12 +881,11 @@ public:
if (it != m.end()) { if (it != m.end()) {
output->SetString(it->second); output->SetString(it->second);
} else { } else {
output->SetNilString(); output->SetNullString();
} }
} }
private: private:
bool _batch_process; bool _batch_process;
std::vector<brpc::RedisReply*> outputs;
std::vector<std::vector<std::string> > _batched_command; std::vector<std::vector<std::string> > _batched_command;
int _batch_count; int _batch_count;
}; };
...@@ -870,10 +894,10 @@ class IncrCommandHandler : public brpc::RedisCommandHandler { ...@@ -870,10 +894,10 @@ class IncrCommandHandler : public brpc::RedisCommandHandler {
public: public:
IncrCommandHandler() {} IncrCommandHandler() {}
brpc::RedisCommandHandler::Result Run(const std::vector<std::string>& args, brpc::RedisCommandHandler::Result Run(int size, const char* args[],
brpc::RedisReply* output, brpc::RedisReply* output,
bool is_last) { bool is_last) {
if (args.size() < 2) { if (size < 2) {
output->SetError("ERR wrong number of arguments for 'incr' command"); output->SetError("ERR wrong number of arguments for 'incr' command");
return brpc::RedisCommandHandler::OK; return brpc::RedisCommandHandler::OK;
} }
...@@ -986,9 +1010,9 @@ class MultiCommandHandler : public brpc::RedisCommandHandler { ...@@ -986,9 +1010,9 @@ class MultiCommandHandler : public brpc::RedisCommandHandler {
public: public:
MultiCommandHandler() {} MultiCommandHandler() {}
RedisCommandHandler::Result Run(const std::vector<std::string>& args, brpc::RedisCommandHandler::Result Run(int size, const char* args[],
brpc::RedisReply* output, brpc::RedisReply* output,
bool is_last) { bool is_last) {
output->SetStatus("OK"); output->SetStatus("OK");
return brpc::RedisCommandHandler::CONTINUE; return brpc::RedisCommandHandler::CONTINUE;
} }
...@@ -999,15 +1023,19 @@ public: ...@@ -999,15 +1023,19 @@ public:
class MultiTransactionHandler : public brpc::RedisCommandHandler { class MultiTransactionHandler : public brpc::RedisCommandHandler {
public: public:
RedisCommandHandler::Result Run(const std::vector<std::string>& args, brpc::RedisCommandHandler::Result Run(int size, const char* args[],
brpc::RedisReply* output, brpc::RedisReply* output,
bool is_last) { bool is_last) {
if (strcasecmp(args[0].c_str(), "multi") == 0) { if (strcasecmp(args[0], "multi") == 0) {
output->SetError("ERR duplicate multi"); output->SetError("ERR duplicate multi");
return brpc::RedisCommandHandler::CONTINUE; return brpc::RedisCommandHandler::CONTINUE;
} }
if (strcasecmp(args[0].c_str(), "exec") != 0) { if (strcasecmp(args[0], "exec") != 0) {
_commands.push_back(args); std::vector<std::string> comm;
for (int i = 0; args[i]; ++i) {
comm.push_back(args[i]);
}
_commands.push_back(comm);
output->SetStatus("QUEUED"); output->SetStatus("QUEUED");
return brpc::RedisCommandHandler::CONTINUE; return brpc::RedisCommandHandler::CONTINUE;
} }
...@@ -1026,7 +1054,7 @@ public: ...@@ -1026,7 +1054,7 @@ public:
return brpc::RedisCommandHandler::OK; return brpc::RedisCommandHandler::OK;
} }
private: private:
std::vector<std::vector<std::string>> _commands; std::vector<std::vector<std::string> > _commands;
}; };
}; };
...@@ -1136,7 +1164,7 @@ TEST_F(RedisTest, server_handle_pipeline) { ...@@ -1136,7 +1164,7 @@ TEST_F(RedisTest, server_handle_pipeline) {
channel.CallMethod(NULL, &cntl, &request, &response, NULL); channel.CallMethod(NULL, &cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText(); ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(8, response.reply_size()); ASSERT_EQ(8, response.reply_size());
ASSERT_EQ(2, getch->_batch_count); ASSERT_EQ(1, getch->_batch_count);
ASSERT_EQ(2, setch->_batch_count); ASSERT_EQ(2, setch->_batch_count);
ASSERT_TRUE(response.reply(7).is_string()); ASSERT_TRUE(response.reply(7).is_string());
ASSERT_STREQ(response.reply(7).c_str(), "world"); ASSERT_STREQ(response.reply(7).c_str(), "world");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment