Commit cab9db70 authored by zhujiashun's avatar zhujiashun

redis_server_protocol: remove arg size

parent 6ce98ba1
...@@ -77,15 +77,14 @@ public: ...@@ -77,15 +77,14 @@ public:
}; };
int ConsumeCommand(RedisConnContext* ctx, int ConsumeCommand(RedisConnContext* ctx,
const std::unique_ptr<const char*[]>& commands, const std::vector<const char*>& commands,
int command_len, butil::Arena* arena, butil::Arena* arena,
bool is_last, bool is_last,
butil::IOBuf* sendbuf) { butil::IOBuf* sendbuf) {
RedisReply output(arena); RedisReply output(arena);
RedisCommandHandler::Result result = RedisCommandHandler::OK; RedisCommandHandler::Result result = RedisCommandHandler::OK;
if (ctx->transaction_handler) { if (ctx->transaction_handler) {
result = ctx->transaction_handler->Run( result = ctx->transaction_handler->Run(commands, &output, is_last);
command_len, commands.get(), &output, is_last);
if (result == RedisCommandHandler::OK) { if (result == RedisCommandHandler::OK) {
ctx->transaction_handler.reset(NULL); ctx->transaction_handler.reset(NULL);
} else if (result == RedisCommandHandler::BATCHED) { } else if (result == RedisCommandHandler::BATCHED) {
...@@ -99,7 +98,7 @@ int ConsumeCommand(RedisConnContext* ctx, ...@@ -99,7 +98,7 @@ int ConsumeCommand(RedisConnContext* ctx,
snprintf(buf, sizeof(buf), "ERR unknown command `%s`", commands[0]); snprintf(buf, sizeof(buf), "ERR unknown command `%s`", commands[0]);
output.SetError(buf); output.SetError(buf);
} else { } else {
result = ch->Run(command_len, commands.get(), &output, is_last); result = ch->Run(commands, &output, is_last);
if (result == RedisCommandHandler::CONTINUE) { if (result == RedisCommandHandler::CONTINUE) {
if (ctx->batched_size) { if (ctx->batched_size) {
LOG(ERROR) << "CONTINUE should not be returned in redis batched process."; LOG(ERROR) << "CONTINUE should not be returned in redis batched process.";
...@@ -140,8 +139,6 @@ int ConsumeCommand(RedisConnContext* ctx, ...@@ -140,8 +139,6 @@ int ConsumeCommand(RedisConnContext* ctx,
RedisConnContext::~RedisConnContext() { } RedisConnContext::~RedisConnContext() { }
void RedisConnContext::Destroy() {
delete this;
} }
// ========== impl of RedisConnContext ========== // ========== impl of RedisConnContext ==========
...@@ -165,30 +162,26 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket, ...@@ -165,30 +162,26 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
socket->reset_parsing_context(ctx); socket->reset_parsing_context(ctx);
} }
butil::Arena arena; butil::Arena arena;
std::unique_ptr<const char*[]> current_commands; std::vector<const char*> current_commands;
int current_len = 0;
butil::IOBuf sendbuf; butil::IOBuf sendbuf;
ParseError err = PARSE_OK; ParseError err = PARSE_OK;
err = ctx->parser.Consume(*source, &current_commands, &current_len, &arena); err = ctx->parser.Consume(*source, &current_commands, &arena);
if (err != PARSE_OK) { if (err != PARSE_OK) {
return MakeParseError(err); return MakeParseError(err);
} }
while (true) { while (true) {
std::unique_ptr<const char*[]> next_commands; std::vector<const char*> next_commands;
int next_len = 0; err = ctx->parser.Consume(*source, &next_commands, &arena);
err = ctx->parser.Consume(*source, &next_commands, &next_len, &arena);
if (err != PARSE_OK) { if (err != PARSE_OK) {
break; break;
} }
if (ConsumeCommand(ctx, current_commands, current_len, &arena, if (ConsumeCommand(ctx, current_commands, &arena, false, &sendbuf) != 0) {
false, &sendbuf) != 0) {
return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG); return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
} }
current_commands.swap(next_commands); current_commands.swap(next_commands);
current_len = next_len;
} }
if (ConsumeCommand(ctx, current_commands, current_len, &arena, if (ConsumeCommand(ctx, current_commands, &arena,
true /* must be last message */, &sendbuf) != 0) { true /* must be last message */, &sendbuf) != 0) {
return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG); return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
} }
......
...@@ -196,7 +196,7 @@ public: ...@@ -196,7 +196,7 @@ public:
static const ::google::protobuf::Descriptor* descriptor(); static const ::google::protobuf::Descriptor* descriptor();
protected: protected:
::google::protobuf::Metadata GetMetadata() const; ::google::protobuf::Metadata GetMetadata() const override;
private: private:
void SharedCtor(); void SharedCtor();
...@@ -244,7 +244,6 @@ public: ...@@ -244,7 +244,6 @@ public:
// Once Server receives commands, it will first find the corresponding handlers and // Once Server receives commands, it will first find the corresponding handlers and
// call them sequentially(one by one) according to the order that requests arrive, // call them sequentially(one by one) according to the order that requests arrive,
// just like what redis-server does. // just like what redis-server does.
// `args_len` is the length of request command.
// `args' is the array of request command. For example, "set somekey somevalue" // `args' is the array of request command. For example, "set somekey somevalue"
// corresponds to args[0]=="set", args[1]=="somekey" and args[2]=="somevalue". // corresponds to args[0]=="set", args[1]=="somekey" and args[2]=="somevalue".
// `output', which should be filled by user, is the content that sent to client side. // `output', which should be filled by user, is the content that sent to client side.
...@@ -260,7 +259,7 @@ public: ...@@ -260,7 +259,7 @@ public:
// an start marker and brpc will call MultiTransactionHandler() to new a transaction // an start marker and brpc will call MultiTransactionHandler() to new a transaction
// handler that all the following commands are sent to this tranction handler until // handler that all the following commands are sent to this tranction handler until
// it returns Result::OK. Read the comment below. // it returns Result::OK. Read the comment below.
virtual RedisCommandHandler::Result Run(int args_len, const char* args[], virtual RedisCommandHandler::Result Run(const std::vector<const char*>& args,
brpc::RedisReply* output, brpc::RedisReply* output,
bool is_last) = 0; bool is_last) = 0;
...@@ -279,6 +278,9 @@ public: ...@@ -279,6 +278,9 @@ public:
// 5) An ending marker(exec) is found in tran_handler.Run(), user exeuctes all // 5) An ending marker(exec) is found in tran_handler.Run(), user exeuctes all
// the command and return RedisCommandHandler::OK. This Transation is done. // the command and return RedisCommandHandler::OK. This Transation is done.
virtual RedisCommandHandler* NewTransactionHandler(); virtual RedisCommandHandler* NewTransactionHandler();
// return true if a transaction is started when met this command.
virtual bool TransactionMarker() { return false; }
}; };
} // namespace brpc } // namespace brpc
......
...@@ -350,8 +350,8 @@ RedisCommandParser::RedisCommandParser() { ...@@ -350,8 +350,8 @@ RedisCommandParser::RedisCommandParser() {
} }
ParseError RedisCommandParser::Consume(butil::IOBuf& buf, ParseError RedisCommandParser::Consume(butil::IOBuf& buf,
std::unique_ptr<const char*[]>* commands, std::vector<const char*>* commands,
int* len_out, butil::Arena* arena) { butil::Arena* arena) {
const char* pfc = (const char*)buf.fetch1(); const char* pfc = (const char*)buf.fetch1();
if (pfc == NULL) { if (pfc == NULL) {
return PARSE_ERROR_NOT_ENOUGH_DATA; return PARSE_ERROR_NOT_ENOUGH_DATA;
...@@ -382,8 +382,8 @@ ParseError RedisCommandParser::Consume(butil::IOBuf& buf, ...@@ -382,8 +382,8 @@ ParseError RedisCommandParser::Consume(butil::IOBuf& buf,
_parsing_array = true; _parsing_array = true;
_length = value; _length = value;
_index = 0; _index = 0;
_commands.reset(new const char*[value + 1/* for ending NULL */]); _commands.resize(value);
return Consume(buf, commands, len_out, arena); return Consume(buf, commands, arena);
} }
CHECK(_index < _length) << "a complete command has been parsed. " CHECK(_index < _length) << "a complete command has been parsed. "
"impl of RedisCommandParser::Parse is buggy"; "impl of RedisCommandParser::Parse is buggy";
...@@ -412,11 +412,9 @@ ParseError RedisCommandParser::Consume(butil::IOBuf& buf, ...@@ -412,11 +412,9 @@ ParseError RedisCommandParser::Consume(butil::IOBuf& buf,
return PARSE_ERROR_ABSOLUTELY_WRONG; return PARSE_ERROR_ABSOLUTELY_WRONG;
} }
if (++_index < _length) { if (++_index < _length) {
return Consume(buf, commands, len_out, arena); return Consume(buf, commands, arena);
} }
_commands[_index] = NULL;
commands->swap(_commands); commands->swap(_commands);
*len_out = _index;
Reset(); Reset();
return PARSE_OK; return PARSE_OK;
} }
...@@ -425,7 +423,7 @@ void RedisCommandParser::Reset() { ...@@ -425,7 +423,7 @@ void RedisCommandParser::Reset() {
_parsing_array = false; _parsing_array = false;
_length = 0; _length = 0;
_index = 0; _index = 0;
_commands.reset(NULL); _commands.clear();
} }
} // namespace brpc } // namespace brpc
...@@ -51,8 +51,8 @@ public: ...@@ -51,8 +51,8 @@ public:
// Parse raw message from `buf'. Return PARSE_OK and set the parsed command // Parse raw message from `buf'. Return PARSE_OK and set the parsed command
// to `commands' and length to `len' if successful. Memory of commands are // to `commands' and length to `len' if successful. Memory of commands are
// allocated in `arena'. // allocated in `arena'.
ParseError Consume(butil::IOBuf& buf, std::unique_ptr<const char*[]>* commands, ParseError Consume(butil::IOBuf& buf, std::vector<const char*>* commands,
int* len, butil::Arena* arena); butil::Arena* arena);
private: private:
// Reset parser to the initial state. // Reset parser to the initial state.
...@@ -61,7 +61,7 @@ private: ...@@ -61,7 +61,7 @@ private:
bool _parsing_array; // if the parser has met array indicator '*' bool _parsing_array; // if the parser has met array indicator '*'
int _length; // array length int _length; // array length
int _index; // current parsing array index int _index; // current parsing array index
std::unique_ptr<const char*[]> _commands; // parsed command string std::vector<const char*> _commands; // parsed command string
}; };
} // namespace brpc } // namespace brpc
......
...@@ -550,9 +550,9 @@ TEST_F(RedisTest, quote_and_escape) { ...@@ -550,9 +550,9 @@ TEST_F(RedisTest, quote_and_escape) {
request.Clear(); request.Clear();
} }
std::string GetCompleteCommand(const std::unique_ptr<const char*[]>& commands) { std::string GetCompleteCommand(const std::vector<const char*>& commands) {
std::string res; std::string res;
for (int i = 0; commands[i]; ++i) { for (int i = 0; i < (int)commands.size(); ++i) {
if (i != 0) { if (i != 0) {
res.push_back(' '); res.push_back(' ');
} }
...@@ -565,14 +565,13 @@ std::string GetCompleteCommand(const std::unique_ptr<const char*[]>& commands) { ...@@ -565,14 +565,13 @@ std::string GetCompleteCommand(const std::unique_ptr<const char*[]>& commands) {
TEST_F(RedisTest, command_parser) { TEST_F(RedisTest, command_parser) {
brpc::RedisCommandParser parser; brpc::RedisCommandParser parser;
butil::IOBuf buf; butil::IOBuf buf;
std::unique_ptr<const char*[]> command_out; std::vector<const char*> command_out;
butil::Arena arena; butil::Arena arena;
int len = 0;
{ {
// parse from whole command // parse from whole command
std::string command = "set abc edc"; std::string command = "set abc edc";
ASSERT_TRUE(brpc::RedisCommandNoFormat(&buf, command.c_str()).ok()); ASSERT_TRUE(brpc::RedisCommandNoFormat(&buf, command.c_str()).ok());
ASSERT_EQ(brpc::PARSE_OK, parser.Consume(buf, &command_out, &len, &arena)); ASSERT_EQ(brpc::PARSE_OK, parser.Consume(buf, &command_out, &arena));
ASSERT_TRUE(buf.empty()); ASSERT_TRUE(buf.empty());
ASSERT_STREQ(command.c_str(), GetCompleteCommand(command_out).c_str()); ASSERT_STREQ(command.c_str(), GetCompleteCommand(command_out).c_str());
} }
...@@ -585,11 +584,11 @@ TEST_F(RedisTest, command_parser) { ...@@ -585,11 +584,11 @@ TEST_F(RedisTest, command_parser) {
for (int i = 0; i < size; ++i) { for (int i = 0; i < size; ++i) {
buf.push_back(raw_string[i]); buf.push_back(raw_string[i]);
if (i == size - 1) { if (i == size - 1) {
ASSERT_EQ(brpc::PARSE_OK, parser.Consume(buf, &command_out, &len, &arena)); ASSERT_EQ(brpc::PARSE_OK, parser.Consume(buf, &command_out, &arena));
} else { } else {
if (butil::fast_rand_less_than(2) == 0) { if (butil::fast_rand_less_than(2) == 0) {
ASSERT_EQ(brpc::PARSE_ERROR_NOT_ENOUGH_DATA, ASSERT_EQ(brpc::PARSE_ERROR_NOT_ENOUGH_DATA,
parser.Consume(buf, &command_out, &len, &arena)); parser.Consume(buf, &command_out, &arena));
} }
} }
} }
...@@ -600,34 +599,34 @@ TEST_F(RedisTest, command_parser) { ...@@ -600,34 +599,34 @@ TEST_F(RedisTest, command_parser) {
{ {
// there is a non-string message in command and parse should fail // there is a non-string message in command and parse should fail
buf.append("*3\r\n$3"); buf.append("*3\r\n$3");
ASSERT_EQ(brpc::PARSE_ERROR_NOT_ENOUGH_DATA, parser.Consume(buf, &command_out, &len, &arena)); ASSERT_EQ(brpc::PARSE_ERROR_NOT_ENOUGH_DATA, parser.Consume(buf, &command_out, &arena));
ASSERT_EQ((int)buf.size(), 2); // left "$3" ASSERT_EQ((int)buf.size(), 2); // left "$3"
buf.append("\r\nset\r\n:123\r\n$3\r\ndef\r\n"); buf.append("\r\nset\r\n:123\r\n$3\r\ndef\r\n");
ASSERT_EQ(brpc::PARSE_ERROR_ABSOLUTELY_WRONG, parser.Consume(buf, &command_out, &len, &arena)); ASSERT_EQ(brpc::PARSE_ERROR_ABSOLUTELY_WRONG, parser.Consume(buf, &command_out, &arena));
parser.Reset(); parser.Reset();
} }
{ {
// not array // not array
buf.append(":123456\r\n"); buf.append(":123456\r\n");
ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Consume(buf, &command_out, &len, &arena)); ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Consume(buf, &command_out, &arena));
parser.Reset(); parser.Reset();
} }
{ {
// not array // not array
buf.append("+Error\r\n"); buf.append("+Error\r\n");
ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Consume(buf, &command_out, &len, &arena)); ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Consume(buf, &command_out, &arena));
parser.Reset(); parser.Reset();
} }
{ {
// not array // not array
buf.append("+OK\r\n"); buf.append("+OK\r\n");
ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Consume(buf, &command_out, &len, &arena)); ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Consume(buf, &command_out, &arena));
parser.Reset(); parser.Reset();
} }
{ {
// not array // not array
buf.append("$5\r\nhello\r\n"); buf.append("$5\r\nhello\r\n");
ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Consume(buf, &command_out, &len, &arena)); ASSERT_EQ(brpc::PARSE_ERROR_TRY_OTHERS, parser.Consume(buf, &command_out, &arena));
parser.Reset(); parser.Reset();
} }
} }
...@@ -786,7 +785,7 @@ public: ...@@ -786,7 +785,7 @@ public:
RedisServiceImpl() RedisServiceImpl()
: _batch_count(0) {} : _batch_count(0) {}
brpc::RedisCommandHandler::Result OnBatched(int size, const char* args[], brpc::RedisCommandHandler::Result OnBatched(const std::vector<const char*> args,
brpc::RedisReply* output, bool is_last) { brpc::RedisReply* output, bool is_last) {
if (_batched_command.empty() && is_last) { if (_batched_command.empty() && is_last) {
if (strcasecmp(args[0], "set") == 0) { if (strcasecmp(args[0], "set") == 0) {
...@@ -843,15 +842,15 @@ public: ...@@ -843,15 +842,15 @@ public:
: rs(NULL) : rs(NULL)
, _batch_process(batch_process) {} , _batch_process(batch_process) {}
brpc::RedisCommandHandler::Result Run(int size, const char* args[], brpc::RedisCommandHandler::Result Run(const std::vector<const char*>& args,
brpc::RedisReply* output, brpc::RedisReply* output,
bool is_last) { bool is_last) {
if (size < 3) { if (args.size() < 3) {
output->SetError("ERR wrong number of arguments for 'set' command"); output->SetError("ERR wrong number of arguments for 'set' command");
return brpc::RedisCommandHandler::OK; return brpc::RedisCommandHandler::OK;
} }
if (_batch_process) { if (_batch_process) {
return rs->OnBatched(size, args, output, is_last); return rs->OnBatched(args, output, is_last);
} else { } else {
DoSet(args[1], args[2], output); DoSet(args[1], args[2], output);
return brpc::RedisCommandHandler::OK; return brpc::RedisCommandHandler::OK;
...@@ -874,15 +873,15 @@ public: ...@@ -874,15 +873,15 @@ public:
: rs(NULL) : rs(NULL)
, _batch_process(batch_process) {} , _batch_process(batch_process) {}
brpc::RedisCommandHandler::Result Run(int size, const char* args[], brpc::RedisCommandHandler::Result Run(const std::vector<const char*>& args,
brpc::RedisReply* output, brpc::RedisReply* output,
bool is_last) { bool is_last) {
if (size < 2) { if (args.size() < 2) {
output->SetError("ERR wrong number of arguments for 'get' command"); output->SetError("ERR wrong number of arguments for 'get' command");
return brpc::RedisCommandHandler::OK; return brpc::RedisCommandHandler::OK;
} }
if (_batch_process) { if (_batch_process) {
return rs->OnBatched(size, args, output, is_last); return rs->OnBatched(args, output, is_last);
} else { } else {
DoGet(args[1], output); DoGet(args[1], output);
return brpc::RedisCommandHandler::OK; return brpc::RedisCommandHandler::OK;
...@@ -907,10 +906,10 @@ class IncrCommandHandler : public brpc::RedisCommandHandler { ...@@ -907,10 +906,10 @@ class IncrCommandHandler : public brpc::RedisCommandHandler {
public: public:
IncrCommandHandler() {} IncrCommandHandler() {}
brpc::RedisCommandHandler::Result Run(int size, const char* args[], brpc::RedisCommandHandler::Result Run(const std::vector<const char*>& args,
brpc::RedisReply* output, brpc::RedisReply* output,
bool is_last) { bool is_last) {
if (size < 2) { if (args.size() < 2) {
output->SetError("ERR wrong number of arguments for 'incr' command"); output->SetError("ERR wrong number of arguments for 'incr' command");
return brpc::RedisCommandHandler::OK; return brpc::RedisCommandHandler::OK;
} }
...@@ -1021,7 +1020,7 @@ class MultiCommandHandler : public brpc::RedisCommandHandler { ...@@ -1021,7 +1020,7 @@ class MultiCommandHandler : public brpc::RedisCommandHandler {
public: public:
MultiCommandHandler() {} MultiCommandHandler() {}
brpc::RedisCommandHandler::Result Run(int size, const char* args[], brpc::RedisCommandHandler::Result Run(const std::vector<const char*>& args,
brpc::RedisReply* output, brpc::RedisReply* output,
bool is_last) { bool is_last) {
output->SetStatus("OK"); output->SetStatus("OK");
...@@ -1034,9 +1033,9 @@ public: ...@@ -1034,9 +1033,9 @@ public:
class MultiTransactionHandler : public brpc::RedisCommandHandler { class MultiTransactionHandler : public brpc::RedisCommandHandler {
public: public:
brpc::RedisCommandHandler::Result Run(int size, const char* args[], brpc::RedisCommandHandler::Result Run(const std::vector<const char*>& args,
brpc::RedisReply* output, brpc::RedisReply* output,
bool is_last) { bool is_last) {
if (strcasecmp(args[0], "multi") == 0) { if (strcasecmp(args[0], "multi") == 0) {
output->SetError("ERR duplicate multi"); output->SetError("ERR duplicate multi");
return brpc::RedisCommandHandler::CONTINUE; return brpc::RedisCommandHandler::CONTINUE;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment