Commit e3117cfc authored by zhujiashun's avatar zhujiashun

redis_server_protocol: remove ExecQueue

parent 520858cf
...@@ -60,15 +60,14 @@ public: ...@@ -60,15 +60,14 @@ public:
GetCommandHandler(RedisServiceImpl* rsimpl) GetCommandHandler(RedisServiceImpl* rsimpl)
: _rsimpl(rsimpl) {} : _rsimpl(rsimpl) {}
brpc::RedisCommandHandler::Result Run(const char* args, brpc::RedisCommandHandler::Result Run(const std::vector<std::string>& commands,
brpc::RedisReply* output) override { brpc::RedisReply* output,
std::vector<std::string> args_array; bool is_last) override {
butil::SplitString(args, ' ', &args_array); if ((int)commands.size() <= 1) {
if ((int)args_array.size() <= 1) {
output->SetError("ERR wrong number of arguments for 'get' command"); output->SetError("ERR wrong number of arguments for 'get' command");
return brpc::RedisCommandHandler::OK; return brpc::RedisCommandHandler::OK;
} }
const std::string& key = args_array[1]; const std::string& key = commands[1];
std::string value; std::string value;
if (_rsimpl->Get(key, &value)) { if (_rsimpl->Get(key, &value)) {
output->SetString(value); output->SetString(value);
...@@ -87,20 +86,15 @@ public: ...@@ -87,20 +86,15 @@ public:
SetCommandHandler(RedisServiceImpl* rsimpl) SetCommandHandler(RedisServiceImpl* rsimpl)
: _rsimpl(rsimpl) {} : _rsimpl(rsimpl) {}
brpc::RedisCommandHandler::Result Run(const char* args, brpc::RedisCommandHandler::Result Run(const std::vector<std::string>& commands,
brpc::RedisReply* output) override { brpc::RedisReply* output,
std::vector<std::string> args_array; bool is_last) override {
butil::SplitString(args, ' ', &args_array); if ((int)commands.size() <= 2) {
if (args_array.size() <= 1) {
output->SetError("ERR wrong number of arguments for 'get' command");
return brpc::RedisCommandHandler::OK;
}
if ((int)args_array.size() <= 2) {
output->SetError("ERR wrong number of arguments for 'set' command"); output->SetError("ERR wrong number of arguments for 'set' command");
return brpc::RedisCommandHandler::OK; return brpc::RedisCommandHandler::OK;
} }
const std::string& key = args_array[1]; const std::string& key = commands[1];
const std::string& value = args_array[2]; const std::string& value = commands[2];
_rsimpl->Set(key, value); _rsimpl->Set(key, value);
output->SetStatus("OK"); output->SetStatus("OK");
return brpc::RedisCommandHandler::OK; return brpc::RedisCommandHandler::OK;
......
...@@ -33,7 +33,6 @@ ...@@ -33,7 +33,6 @@
#include "brpc/redis.h" #include "brpc/redis.h"
#include "brpc/redis_command.h" #include "brpc/redis_command.h"
#include "brpc/policy/redis_protocol.h" #include "brpc/policy/redis_protocol.h"
#include "bthread/execution_queue.h"
namespace brpc { namespace brpc {
...@@ -58,11 +57,6 @@ struct InputResponse : public InputMessageBase { ...@@ -58,11 +57,6 @@ struct InputResponse : public InputMessageBase {
} }
}; };
// This struct is pushed into ExecutionQueue of each connection.
struct CommandInfo {
std::string command;
};
// This class is as parsing_context in socket. // This class is as parsing_context in socket.
class RedisConnContext : public Destroyable { class RedisConnContext : public Destroyable {
public: public:
...@@ -73,85 +67,82 @@ public: ...@@ -73,85 +67,82 @@ public:
// @Destroyable // @Destroyable
void Destroy() override; void Destroy() override;
int Init();
SocketId socket_id; SocketId socket_id;
RedisService* redis_service; RedisService* redis_service;
// If user starts a transaction, handler_continue indicates the // If user starts a transaction, handler_continue indicates the
// first handler pointer that triggers the transaction. // first handler pointer that triggers the transaction.
RedisCommandHandler* handler_continue; RedisCommandHandler* handler_continue;
// The redis command are parsed and pushed into this queue
bthread::ExecutionQueueId<CommandInfo*> queue;
RedisCommandParser parser; RedisCommandParser parser;
std::string command; std::vector<std::string> command;
}; };
int ConsumeTask(RedisConnContext* ctx, const std::string& command, butil::IOBuf* sendbuf) { std::string ToLowercase(const std::string& command) {
std::string res;
res.resize(command.size());
std::transform(command.begin(), command.end(), res.begin(),
[](unsigned char c){ return std::tolower(c); });
return res;
}
int ConsumeTask(RedisConnContext* ctx,
const std::vector<std::vector<std::string> >& commands,
butil::IOBuf* sendbuf) {
butil::Arena arena; butil::Arena arena;
RedisReply output(&arena); int size = commands.size();
if (ctx->handler_continue) { std::string next_comm;
RedisCommandHandler::Result result = RedisReply reply(&arena);
ctx->handler_continue->Run(command.c_str(), &output); RedisReply* output = NULL;
if (result == RedisCommandHandler::OK) { if (size == 1) {
ctx->handler_continue = NULL; // Optimize for the most common case
} output = &reply;
} else { } else {
std::string comm; output = (RedisReply*)malloc(sizeof(RedisReply) * size);
comm.reserve(8); for (int i = 0; i < size; ++i) {
for (int i = 0; i < (int)command.size() && command[i] != ' '; ++i) { new (&output[i]) RedisReply(&arena);
comm.push_back(std::tolower(command[i]));
} }
RedisCommandHandler* ch = ctx->redis_service->FindCommandHandler(comm); }
if (!ch) { for (int i = 0; i < size; ++i) {
char buf[64]; if (ctx->handler_continue) {
snprintf(buf, sizeof(buf), "ERR unknown command `%s`", comm.c_str()); bool is_last = (i == size - 1);
output.SetError(buf); RedisCommandHandler::Result result =
ctx->handler_continue->Run(commands[i], &output[i], is_last);
if (result == RedisCommandHandler::OK) {
ctx->handler_continue = NULL;
}
} else { } else {
RedisCommandHandler::Result result = ch->Run(command.c_str(), &output); bool is_last = true;
if (result == RedisCommandHandler::CONTINUE) { std::string comm;
ctx->handler_continue = ch; if (i == 0) {
comm = ToLowercase(commands[i][0]);
} else {
comm.swap(next_comm);
}
if ((i + 1) < size) {
next_comm = ToLowercase(commands[i + 1][0]);
if (comm == next_comm) {
is_last = false;
}
}
RedisCommandHandler* ch = ctx->redis_service->FindCommandHandler(comm);
if (!ch) {
char buf[64];
snprintf(buf, sizeof(buf), "ERR unknown command `%s`", comm.c_str());
output[i].SetError(buf);
} else {
RedisCommandHandler::Result result =
ch->Run(commands[i], &output[i], is_last);
if (result == RedisCommandHandler::CONTINUE) {
ctx->handler_continue = ch;
}
} }
} }
} }
output.SerializeTo(sendbuf); for (int i = 0; i < size; ++i) {
return 0; output[i].SerializeTo(sendbuf);
}
int Consume(void* ctx, bthread::TaskIterator<CommandInfo*>& iter) {
RedisConnContext* qctx = static_cast<RedisConnContext*>(ctx);
if (iter.is_queue_stopped()) {
delete qctx;
return 0;
} }
SocketUniquePtr s; if (size != 1) {
bool has_err = false; free(output);
if (Socket::Address(qctx->socket_id, &s) != 0) {
LOG(WARNING) << "Fail to address redis socket";
has_err = true;
}
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
butil::IOBuf sendbuf;
for (; iter; ++iter) {
std::unique_ptr<CommandInfo> guard(*iter);
if (has_err) {
continue;
}
ConsumeTask(qctx, (*iter)->command, &sendbuf);
// If there are too many tasks to execute, latency of the front
// responses will be increased by waiting the following tasks to
// be completed. To prevent this, if the current buf size is greater
// than FLAGS_redis_batch_flush_max_size, we just write the current
// buf first.
if ((int)sendbuf.size() >= FLAGS_redis_batch_flush_data_size) {
LOG_IF(WARNING, s->Write(&sendbuf, &wopt) != 0)
<< "Fail to send redis reply";
}
}
if (!has_err && !sendbuf.empty()) {
LOG_IF(WARNING, s->Write(&sendbuf, &wopt) != 0)
<< "Fail to send redis reply";
} }
return 0; return 0;
} }
...@@ -161,18 +152,7 @@ int Consume(void* ctx, bthread::TaskIterator<CommandInfo*>& iter) { ...@@ -161,18 +152,7 @@ int Consume(void* ctx, bthread::TaskIterator<CommandInfo*>& iter) {
RedisConnContext::~RedisConnContext() { } RedisConnContext::~RedisConnContext() { }
void RedisConnContext::Destroy() { void RedisConnContext::Destroy() {
bthread::execution_queue_stop(queue); delete this;
}
int RedisConnContext::Init() {
bthread::ExecutionQueueOptions q_opt;
q_opt.bthread_attr =
FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL;
if (bthread::execution_queue_start(&queue, &q_opt, Consume, this) != 0) {
LOG(ERROR) << "Fail to start execution queue";
return -1;
}
return 0;
} }
// ========== impl of RedisConnContext ========== // ========== impl of RedisConnContext ==========
...@@ -193,25 +173,30 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket, ...@@ -193,25 +173,30 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
ctx = new RedisConnContext; ctx = new RedisConnContext;
ctx->socket_id = socket->id(); ctx->socket_id = socket->id();
ctx->redis_service = rs; ctx->redis_service = rs;
if (ctx->Init() != 0) {
delete ctx;
LOG(ERROR) << "Fail to init redis RedisConnContext";
return MakeParseError(PARSE_ERROR_NO_RESOURCE);
}
socket->reset_parsing_context(ctx); socket->reset_parsing_context(ctx);
} }
ParseError err = ctx->parser.Consume(*source, &ctx->command); std::vector<std::vector<std::string> > commands;
if (err != PARSE_OK) { ParseError err = PARSE_OK;
return MakeParseError(err); while (true) {
err = ctx->parser.Consume(*source, &ctx->command);
if (err != PARSE_OK) {
break;
}
commands.emplace_back(std::move(ctx->command));
CHECK(ctx->command.empty());
} }
std::unique_ptr<CommandInfo> info(new CommandInfo); if (!commands.empty()) {
info->command.swap(ctx->command); butil::IOBuf sendbuf;
if (bthread::execution_queue_execute(ctx->queue, info.get()) != 0) { if (ConsumeTask(ctx, commands, &sendbuf) != 0) {
LOG(ERROR) << "Fail to push execution queue"; return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
return MakeParseError(PARSE_ERROR_NO_RESOURCE); }
CHECK(sendbuf.size() > 0) << "invalid size=0 of sendbuf";
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
LOG_IF(WARNING, socket->Write(&sendbuf, &wopt) != 0)
<< "Fail to send redis reply";
} }
info.release(); return MakeParseError(err);
return MakeMessage(NULL);
} else { } else {
// NOTE(gejun): PopPipelinedInfo() is actually more contended than what // NOTE(gejun): PopPipelinedInfo() is actually more contended than what
// I thought before. The Socket._pipeline_q is a SPSC queue pushed before // I thought before. The Socket._pipeline_q is a SPSC queue pushed before
......
...@@ -243,11 +243,13 @@ public: ...@@ -243,11 +243,13 @@ public:
// Once Server receives commands, it will first find the corresponding handlers and // Once Server receives commands, it will first find the corresponding handlers and
// call them sequentially(one by one) according to the order that requests arrive, // call them sequentially(one by one) according to the order that requests arrive,
// just like what redis-server does. // just like what redis-server does.
// `args` is the redis request command string typed by remote side, ending with nullptr. // `args' is the array redis of request command. For example, "set somekey somevalue"
// For example, possible args value may be "set foo bar" or "incr somekey". User can // corresponds to args[0]=="set", args[1]=="somekey" and args[2]=="somevalue".
// use butil::StringSplitter to split and parse it. // `output', which should be filled by user, is the content that sent to client side.
// `output`, which should be filled by user, is the content that sent to client side.
// Read brpc/src/redis_reply.h for more usage. // Read brpc/src/redis_reply.h for more usage.
// `is_last' indicates whether the commands is the last command of this batch. If user
// want to do some batch processing, user should buffer the command and output. Once
// `is_last' is true, then run all the command and set the output of each command.
// The return value should be RedisCommandHandler::OK for normal cases. If you want // The return value should be RedisCommandHandler::OK for normal cases. If you want
// to implement transaction, return RedisCommandHandler::CONTINUE until server receives // to implement transaction, return RedisCommandHandler::CONTINUE until server receives
// an ending marker. The first handler that return RedisCommandHandler::CONTINUE will // an ending marker. The first handler that return RedisCommandHandler::CONTINUE will
...@@ -257,8 +259,9 @@ public: ...@@ -257,8 +259,9 @@ public:
// RedisCommandHandler::CONTINUE and one RedisCommandHandler::OK since exec is the // RedisCommandHandler::CONTINUE and one RedisCommandHandler::OK since exec is the
// marker that ends the transaction. User should queue the commands and execute them // marker that ends the transaction. User should queue the commands and execute them
// all once the ending marker is received. // all once the ending marker is received.
virtual RedisCommandHandler::Result Run(const char* args, virtual RedisCommandHandler::Result Run(const std::vector<std::string>& command,
RedisReply* output) = 0; brpc::RedisReply* output,
bool is_last) = 0;
}; };
} // namespace brpc } // namespace brpc
......
...@@ -364,7 +364,8 @@ RedisCommandParser::RedisCommandParser() { ...@@ -364,7 +364,8 @@ RedisCommandParser::RedisCommandParser() {
Reset(); Reset();
} }
ParseError RedisCommandParser::Consume(butil::IOBuf& buf, std::string* command) { ParseError RedisCommandParser::Consume(
butil::IOBuf& buf, std::vector<std::string>* command) {
const char* pfc = (const char*)buf.fetch1(); const char* pfc = (const char*)buf.fetch1();
if (pfc == NULL) { if (pfc == NULL) {
return PARSE_ERROR_NOT_ENOUGH_DATA; return PARSE_ERROR_NOT_ENOUGH_DATA;
...@@ -395,7 +396,7 @@ ParseError RedisCommandParser::Consume(butil::IOBuf& buf, std::string* command) ...@@ -395,7 +396,7 @@ ParseError RedisCommandParser::Consume(butil::IOBuf& buf, std::string* command)
_parsing_array = true; _parsing_array = true;
_length = value; _length = value;
_index = 0; _index = 0;
_command.clear(); _commands.clear();
return Consume(buf, command); return Consume(buf, command);
} }
CHECK(_index < _length) << "a complete command has been parsed. " CHECK(_index < _length) << "a complete command has been parsed. "
...@@ -414,10 +415,8 @@ ParseError RedisCommandParser::Consume(butil::IOBuf& buf, std::string* command) ...@@ -414,10 +415,8 @@ ParseError RedisCommandParser::Consume(butil::IOBuf& buf, std::string* command)
return PARSE_ERROR_NOT_ENOUGH_DATA; return PARSE_ERROR_NOT_ENOUGH_DATA;
} }
buf.pop_front(crlf_pos + 2/*CRLF*/); buf.pop_front(crlf_pos + 2/*CRLF*/);
if (!_command.empty()) { _commands.emplace_back();
_command.push_back(' '); // command is separated by ' ' buf.cutn(&_commands.back(), len);
}
buf.cutn(&_command, len);
char crlf[2]; char crlf[2];
buf.cutn(crlf, sizeof(crlf)); buf.cutn(crlf, sizeof(crlf));
if (crlf[0] != '\r' || crlf[1] != '\n') { if (crlf[0] != '\r' || crlf[1] != '\n') {
...@@ -428,7 +427,7 @@ ParseError RedisCommandParser::Consume(butil::IOBuf& buf, std::string* command) ...@@ -428,7 +427,7 @@ ParseError RedisCommandParser::Consume(butil::IOBuf& buf, std::string* command)
return Consume(buf, command); return Consume(buf, command);
} }
command->clear(); command->clear();
command->swap(_command); command->swap(_commands);
Reset(); Reset();
return PARSE_OK; return PARSE_OK;
} }
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#ifndef BRPC_REDIS_COMMAND_H #ifndef BRPC_REDIS_COMMAND_H
#define BRPC_REDIS_COMMAND_H #define BRPC_REDIS_COMMAND_H
#include <vector>
#include "butil/iobuf.h" #include "butil/iobuf.h"
#include "butil/status.h" #include "butil/status.h"
#include "brpc/parse_result.h" #include "brpc/parse_result.h"
...@@ -47,16 +48,16 @@ public: ...@@ -47,16 +48,16 @@ public:
// Parse raw message from `buf'. Return PARSE_OK and set the parsed command // Parse raw message from `buf'. Return PARSE_OK and set the parsed command
// to `command' if successful. // to `command' if successful.
ParseError Consume(butil::IOBuf& buf, std::string* command); ParseError Consume(butil::IOBuf& buf, std::vector<std::string>* command);
private: private:
// Reset parser to the initial state. // Reset parser to the initial state.
void Reset(); void Reset();
bool _parsing_array; // if the parser has met array indicator '*' bool _parsing_array; // if the parser has met array indicator '*'
int _length; // array length int _length; // array length
int _index; // current parsing array index int _index; // current parsing array index
std::string _command; // parsed command string std::vector<std::string> _commands; // parsed command string
}; };
} // namespace brpc } // namespace brpc
......
...@@ -550,17 +550,29 @@ TEST_F(RedisTest, quote_and_escape) { ...@@ -550,17 +550,29 @@ TEST_F(RedisTest, quote_and_escape) {
request.Clear(); request.Clear();
} }
std::string GetCompleteCommand(const std::vector<std::string>& commands) {
std::string res;
for (int i = 0; i < (int)commands.size(); ++i) {
if (i != 0) {
res.push_back(' ');
}
res.append(commands[i]);
}
return res;
}
TEST_F(RedisTest, command_parser) { TEST_F(RedisTest, command_parser) {
brpc::RedisCommandParser parser; brpc::RedisCommandParser parser;
butil::IOBuf buf; butil::IOBuf buf;
std::string command_out; std::vector<std::string> command_out;
{ {
// parse from whole command // parse from whole command
std::string command = "set abc edc"; std::string command = "set abc edc";
ASSERT_TRUE(brpc::RedisCommandNoFormat(&buf, command.c_str()).ok()); ASSERT_TRUE(brpc::RedisCommandNoFormat(&buf, command.c_str()).ok());
ASSERT_EQ(brpc::PARSE_OK, parser.Consume(buf, &command_out)); ASSERT_EQ(brpc::PARSE_OK, parser.Consume(buf, &command_out));
ASSERT_TRUE(buf.empty()); ASSERT_TRUE(buf.empty());
ASSERT_STREQ(command.c_str(), command_out.c_str()); ASSERT_STREQ(command.c_str(), GetCompleteCommand(command_out).c_str());
} }
{ {
// simulate parsing from network // simulate parsing from network
...@@ -580,7 +592,7 @@ TEST_F(RedisTest, command_parser) { ...@@ -580,7 +592,7 @@ TEST_F(RedisTest, command_parser) {
} }
} }
ASSERT_TRUE(buf.empty()); ASSERT_TRUE(buf.empty());
ASSERT_STREQ(command_out.c_str(), "set abc def"); ASSERT_STREQ(GetCompleteCommand(command_out).c_str(), "set abc def");
} }
} }
{ {
...@@ -774,89 +786,98 @@ std::unordered_map<std::string, int64_t> int_map; ...@@ -774,89 +786,98 @@ std::unordered_map<std::string, int64_t> int_map;
class SetCommandHandler : public brpc::RedisCommandHandler { class SetCommandHandler : public brpc::RedisCommandHandler {
public: public:
SetCommandHandler() {} SetCommandHandler(bool batch_process = false)
: _batch_process(batch_process)
brpc::RedisCommandHandler::Result Run(const char* args, , _batch_count(0) {}
brpc::RedisReply* output) {
std::string key; brpc::RedisCommandHandler::Result Run(const std::vector<std::string>& args,
std::string value; brpc::RedisReply* output,
bool parse_command = false; bool is_last) {
butil::StringSplitter sp(args, ' '); if (args.size() < 3) {
for (; sp; ++sp) {
if (!parse_command) {
parse_command = true;
} else if (key.empty()) {
key.assign(sp.field(), sp.length());
} else if (value.empty()) {
value.assign(sp.field(), sp.length());
} else {
LOG(WARNING) << "unknown args: " << sp;
}
}
if (key.empty() || value.empty()) {
output->SetError("ERR wrong number of arguments for 'set' command"); output->SetError("ERR wrong number of arguments for 'set' command");
return brpc::RedisCommandHandler::OK; return brpc::RedisCommandHandler::OK;
} }
if (_batch_process) {
_batched_command.emplace_back(args);
outputs.push_back(output);
if (is_last) {
for (int i = 0; i < (int)_batched_command.size(); ++i) {
DoSet(_batched_command[i][1], _batched_command[i][2], outputs[i]);
}
_batch_count++;
}
return brpc::RedisCommandHandler::OK;
}
DoSet(args[1], args[2], output);
return brpc::RedisCommandHandler::OK;
}
void DoSet(const std::string& key, const std::string& value, brpc::RedisReply* output) {
m[key] = value; m[key] = value;
output->SetStatus("OK"); output->SetStatus("OK");
return brpc::RedisCommandHandler::OK;
} }
private:
bool _batch_process;
std::vector<brpc::RedisReply*> outputs;
std::vector<std::vector<std::string> > _batched_command;
int _batch_count;
}; };
class GetCommandHandler : public brpc::RedisCommandHandler { class GetCommandHandler : public brpc::RedisCommandHandler {
public: public:
GetCommandHandler() {} GetCommandHandler(bool batch_process = false)
: _batch_process(batch_process)
brpc::RedisCommandHandler::Result Run(const char* args, , _batch_count(0) {}
brpc::RedisReply* output) {
std::string key; brpc::RedisCommandHandler::Result Run(const std::vector<std::string>& args,
bool parse_command = false; brpc::RedisReply* output,
butil::StringSplitter sp(args, ' '); bool is_last) {
for (; sp; ++sp) { if (args.size() < 2) {
if (!parse_command) {
parse_command = true;
} else if (key.empty()) {
key.assign(sp.field(), sp.length());
} else {
LOG(WARNING) << "unknown args: " << sp;
}
}
if (key.empty()) {
output->SetError("ERR wrong number of arguments for 'get' command"); output->SetError("ERR wrong number of arguments for 'get' command");
return brpc::RedisCommandHandler::OK; return brpc::RedisCommandHandler::OK;
} }
if (_batch_process) {
_batched_command.emplace_back(args);
outputs.push_back(output);
if (is_last) {
for (int i = 0; i < (int)_batched_command.size(); ++i) {
DoGet(_batched_command[i][1], outputs[i]);
}
_batch_count++;
}
return brpc::RedisCommandHandler::OK;
}
DoGet(args[1], output);
return brpc::RedisCommandHandler::OK;
}
void DoGet(const std::string& key, brpc::RedisReply* output) {
auto it = m.find(key); auto it = m.find(key);
if (it != m.end()) { if (it != m.end()) {
output->SetString(it->second); output->SetString(it->second);
} else { } else {
output->SetNilString(); output->SetNilString();
} }
return brpc::RedisCommandHandler::OK;
} }
private:
bool _batch_process;
std::vector<brpc::RedisReply*> outputs;
std::vector<std::vector<std::string> > _batched_command;
int _batch_count;
}; };
class IncrCommandHandler : public brpc::RedisCommandHandler { class IncrCommandHandler : public brpc::RedisCommandHandler {
public: public:
IncrCommandHandler() {} IncrCommandHandler() {}
brpc::RedisCommandHandler::Result Run(const char* args, brpc::RedisCommandHandler::Result Run(const std::vector<std::string>& args,
brpc::RedisReply* output) { brpc::RedisReply* output,
std::string key; bool is_last) {
bool parse_command = false; if (args.size() < 2) {
butil::StringSplitter sp(args, ' ');
for (; sp; ++sp) {
if (!parse_command) {
parse_command = true;
} else if (key.empty()) {
key.assign(sp.field(), sp.length());
} else {
LOG(WARNING) << "unknown args: " << sp;
}
}
if (key.empty()) {
output->SetError("ERR wrong number of arguments for 'incr' command"); output->SetError("ERR wrong number of arguments for 'incr' command");
return brpc::RedisCommandHandler::OK; return brpc::RedisCommandHandler::OK;
} }
const std::string& key = args[1];
int64_t value; int64_t value;
s_mutex.lock(); s_mutex.lock();
value = ++int_map[key]; value = ++int_map[key];
...@@ -966,47 +987,35 @@ public: ...@@ -966,47 +987,35 @@ public:
MultiCommandHandler() MultiCommandHandler()
: _started(false) {} : _started(false) {}
RedisCommandHandler::Result Run(const char* args, RedisCommandHandler::Result Run(const std::vector<std::string>& args,
brpc::RedisReply* output) { brpc::RedisReply* output,
butil::StringSplitter sp(args, ' '); bool is_last) {
std::string command; if (strcasecmp(args[0].c_str(), "multi") == 0) {
std::vector<std::string> sargs; if (!_started) {
for (; sp; ++sp) { output->SetStatus("OK");
if (command.empty()) { _started = true;
command.assign(sp.field(), sp.length());
}
if (strcasecmp(command.c_str(), "multi") == 0) {
if (!_started) {
output->SetStatus("OK");
_started = true;
} else {
output->SetError("ERR duplicate multi");
}
return brpc::RedisCommandHandler::CONTINUE;
}
if (strcasecmp(command.c_str(), "exec") != 0) {
std::string sarg(sp.field(), sp.length());
sargs.push_back(sarg);
} else { } else {
output->SetArray(_commands.size()); output->SetError("ERR duplicate multi");
s_mutex.lock(); }
for (size_t i = 0; i < _commands.size(); ++i) { return brpc::RedisCommandHandler::CONTINUE;
if (_commands[i][0] == "incr") { }
int64_t value; if (strcasecmp(args[0].c_str(), "exec") != 0) {
value = ++int_map[_commands[i][1]]; _commands.push_back(args);
(*output)[i].SetInteger(value); output->SetStatus("QUEUED");
} else { return brpc::RedisCommandHandler::CONTINUE;
LOG(WARNING) << "unknown command: " << _commands[i][0]; }
} output->SetArray(_commands.size());
} s_mutex.lock();
s_mutex.unlock(); for (size_t i = 0; i < _commands.size(); ++i) {
_started = false; if (_commands[i][0] == "incr") {
return brpc::RedisCommandHandler::OK; int64_t value;
value = ++int_map[_commands[i][1]];
(*output)[i].SetInteger(value);
} }
} }
_commands.push_back(sargs); s_mutex.unlock();
output->SetStatus("QUEUED"); _started = false;
return brpc::RedisCommandHandler::CONTINUE; return brpc::RedisCommandHandler::OK;
} }
RedisCommandHandler* New() { return new MultiCommandHandler; } RedisCommandHandler* New() { return new MultiCommandHandler; }
...@@ -1090,4 +1099,41 @@ TEST_F(RedisTest, server_command_continue) { ...@@ -1090,4 +1099,41 @@ TEST_F(RedisTest, server_command_continue) {
} }
} }
TEST_F(RedisTest, server_handle_pipeline) {
brpc::Server server;
brpc::ServerOptions server_options;
RedisServiceImpl* rsimpl = new RedisServiceImpl;
GetCommandHandler* getch = new GetCommandHandler(true);
SetCommandHandler* setch = new SetCommandHandler(true);
rsimpl->AddCommandHandler("get", getch);
rsimpl->AddCommandHandler("set", setch);
server_options.redis_service = rsimpl;
brpc::PortRange pr(8081, 8900);
ASSERT_EQ(0, server.Start("127.0.0.1", pr, &server_options));
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_REDIS;
brpc::Channel channel;
ASSERT_EQ(0, channel.Init("127.0.0.1", server.listen_address().port, &options));
brpc::RedisRequest request;
brpc::RedisResponse response;
brpc::Controller cntl;
ASSERT_TRUE(request.AddCommand("set key1 v1"));
ASSERT_TRUE(request.AddCommand("set key2 v2"));
ASSERT_TRUE(request.AddCommand("set key3 v3"));
ASSERT_TRUE(request.AddCommand("get hello"));
ASSERT_TRUE(request.AddCommand("get hello"));
ASSERT_TRUE(request.AddCommand("set key1 world"));
ASSERT_TRUE(request.AddCommand("set key2 world"));
ASSERT_TRUE(request.AddCommand("get key2"));
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(8, response.reply_size());
ASSERT_EQ(2, getch->_batch_count);
ASSERT_EQ(2, setch->_batch_count);
ASSERT_TRUE(response.reply(7).is_string());
ASSERT_STREQ(response.reply(7).c_str(), "world");
}
} //namespace } //namespace
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment