Commit 5c8794c4 authored by zhujiashun's avatar zhujiashun

redis_server_protocol: offer user redis string instead of arrays

parent 2d0c8c3b
...@@ -57,21 +57,21 @@ struct InputResponse : public InputMessageBase { ...@@ -57,21 +57,21 @@ struct InputResponse : public InputMessageBase {
} }
}; };
static bool ParseArgs(const RedisReply& message, std::unique_ptr<const char*[]>* args_out) { static bool ParseArgs(const RedisReply& message, std::ostringstream& os) {
if (!message.is_array() || message.size() == 0) { if (!message.is_array() || message.size() == 0) {
LOG(WARNING) << "request message is not array or size equals to zero"; LOG(WARNING) << "request message is not array or size equals to zero";
return false; return false;
} }
args_out->reset(new const char*[message.size() + 1 /* NULL */]);
for (size_t i = 0; i < message.size(); ++i) { for (size_t i = 0; i < message.size(); ++i) {
if (!message[i].is_string()) { if (!message[i].is_string()) {
LOG(WARNING) << "request message[" << i << "] is not array"; LOG(WARNING) << "request message[" << i << "] is not array";
return false; return false;
} }
(*args_out)[i] = message[i].c_str(); if (i != 0) {
os << " ";
}
os << message[i].c_str();
} }
(*args_out)[message.size()] = NULL;
return true; return true;
} }
...@@ -106,22 +106,22 @@ public: ...@@ -106,22 +106,22 @@ public:
int ConsumeTask(RedisConnContext* ctx, RedisTask* task, butil::IOBuf* sendbuf) { int ConsumeTask(RedisConnContext* ctx, RedisTask* task, butil::IOBuf* sendbuf) {
RedisReply output(&task->arena); RedisReply output(&task->arena);
std::unique_ptr<const char*[]> args; std::ostringstream os;
if (!ParseArgs(task->input_message, &args)) { if (!ParseArgs(task->input_message, os)) {
LOG(ERROR) << "ERR command not string"; LOG(ERROR) << "ERR command not string";
output.SetError("ERR command not string"); output.SetError("ERR command not string");
return -1; return -1;
} }
if (ctx->handler_continue) { if (ctx->handler_continue) {
RedisCommandHandler::Result result = RedisCommandHandler::Result result =
ctx->handler_continue->Run(args.get(), &output); ctx->handler_continue->Run(os.str().c_str(), &output);
if (result == RedisCommandHandler::OK) { if (result == RedisCommandHandler::OK) {
ctx->handler_continue = NULL; ctx->handler_continue = NULL;
} }
} else { } else {
std::string comm; std::string comm;
comm.reserve(8); comm.reserve(8);
for (const char* c = args[0]; *c; ++c) { for (const char* c = task->input_message[0].c_str(); *c; ++c) {
comm.push_back(std::tolower(*c)); comm.push_back(std::tolower(*c));
} }
auto it = ctx->command_map.find(comm); auto it = ctx->command_map.find(comm);
...@@ -130,7 +130,7 @@ int ConsumeTask(RedisConnContext* ctx, RedisTask* task, butil::IOBuf* sendbuf) { ...@@ -130,7 +130,7 @@ int ConsumeTask(RedisConnContext* ctx, RedisTask* task, butil::IOBuf* sendbuf) {
snprintf(buf, sizeof(buf), "ERR unknown command `%s`", comm.c_str()); snprintf(buf, sizeof(buf), "ERR unknown command `%s`", comm.c_str());
output.SetError(buf); output.SetError(buf);
} else { } else {
RedisCommandHandler::Result result = it->second->Run(args.get(), &output); RedisCommandHandler::Result result = it->second->Run(os.str().c_str(), &output);
if (result == RedisCommandHandler::CONTINUE) { if (result == RedisCommandHandler::CONTINUE) {
ctx->handler_continue = it->second.get(); ctx->handler_continue = it->second.get();
} }
......
...@@ -243,9 +243,9 @@ public: ...@@ -243,9 +243,9 @@ public:
// Once Server receives commands, it will first find the corresponding handlers and // Once Server receives commands, it will first find the corresponding handlers and
// call them sequentially(one by one) according to the order that requests arrive, // call them sequentially(one by one) according to the order that requests arrive,
// just like what redis-server does. // just like what redis-server does.
// `args` is an array of redis command arguments, ending with nullptr. For example, // `args` is the redis request command string typed by remote side, ending with nullptr.
// command "set foo bar" corresponds to args[0] == "set", args[1] == "foo", // For example, possible args value may be "set foo bar" or "incr somekey". User can
// args[2] == "bar" and args[3] == nullptr. // use butil::StringSplitter to split and parse it.
// `output`, which should be filled by user, is the content that sent to client side. // `output`, which should be filled by user, is the content that sent to client side.
// Read brpc/src/redis_reply.h for more usage. // Read brpc/src/redis_reply.h for more usage.
// The return value should be RedisCommandHandler::OK for normal cases. If you want // The return value should be RedisCommandHandler::OK for normal cases. If you want
...@@ -257,7 +257,7 @@ public: ...@@ -257,7 +257,7 @@ public:
// RedisCommandHandler::CONTINUE and one RedisCommandHandler::OK since exec is the // RedisCommandHandler::CONTINUE and one RedisCommandHandler::OK since exec is the
// marker that ends the transaction. User should queue the commands and execute them // marker that ends the transaction. User should queue the commands and execute them
// all once the ending marker is received. // all once the ending marker is received.
virtual RedisCommandHandler::Result Run(const char* args[], virtual RedisCommandHandler::Result Run(const char* args,
RedisReply* output) = 0; RedisReply* output) = 0;
// Whenever a tcp connection is established, a bunch of new handlers would be created // Whenever a tcp connection is established, a bunch of new handlers would be created
......
...@@ -669,10 +669,27 @@ class SetCommandHandler : public brpc::RedisCommandHandler { ...@@ -669,10 +669,27 @@ class SetCommandHandler : public brpc::RedisCommandHandler {
public: public:
SetCommandHandler() {} SetCommandHandler() {}
brpc::RedisCommandHandler::Result Run(const char* args[], brpc::RedisCommandHandler::Result Run(const char* args,
brpc::RedisReply* output) { brpc::RedisReply* output) {
std::string key = args[1]; std::string key;
std::string value = args[2]; std::string value;
bool parse_command = false;
butil::StringSplitter sp(args, ' ');
for (; sp; ++sp) {
if (!parse_command) {
parse_command = true;
} else if (key.empty()) {
key.assign(sp.field(), sp.length());
} else if (value.empty()) {
value.assign(sp.field(), sp.length());
} else {
LOG(WARNING) << "unknown args: " << sp;
}
}
if (key.empty() || value.empty()) {
output->SetError("ERR wrong number of arguments for 'set' command");
return brpc::RedisCommandHandler::OK;
}
m[key] = value; m[key] = value;
output->SetStatus("OK"); output->SetStatus("OK");
return brpc::RedisCommandHandler::OK; return brpc::RedisCommandHandler::OK;
...@@ -688,9 +705,24 @@ class GetCommandHandler : public brpc::RedisCommandHandler { ...@@ -688,9 +705,24 @@ class GetCommandHandler : public brpc::RedisCommandHandler {
public: public:
GetCommandHandler() {} GetCommandHandler() {}
brpc::RedisCommandHandler::Result Run(const char* args[], brpc::RedisCommandHandler::Result Run(const char* args,
brpc::RedisReply* output) { brpc::RedisReply* output) {
std::string key = args[1]; std::string key;
bool parse_command = false;
butil::StringSplitter sp(args, ' ');
for (; sp; ++sp) {
if (!parse_command) {
parse_command = true;
} else if (key.empty()) {
key.assign(sp.field(), sp.length());
} else {
LOG(WARNING) << "unknown args: " << sp;
}
}
if (key.empty()) {
output->SetError("ERR wrong number of arguments for 'get' command");
return brpc::RedisCommandHandler::OK;
}
auto it = m.find(key); auto it = m.find(key);
if (it != m.end()) { if (it != m.end()) {
output->SetBulkString(it->second); output->SetBulkString(it->second);
...@@ -710,11 +742,27 @@ class IncrCommandHandler : public brpc::RedisCommandHandler { ...@@ -710,11 +742,27 @@ class IncrCommandHandler : public brpc::RedisCommandHandler {
public: public:
IncrCommandHandler() {} IncrCommandHandler() {}
brpc::RedisCommandHandler::Result Run(const char* args[], brpc::RedisCommandHandler::Result Run(const char* args,
brpc::RedisReply* output) { brpc::RedisReply* output) {
std::string key;
bool parse_command = false;
butil::StringSplitter sp(args, ' ');
for (; sp; ++sp) {
if (!parse_command) {
parse_command = true;
} else if (key.empty()) {
key.assign(sp.field(), sp.length());
} else {
LOG(WARNING) << "unknown args: " << sp;
}
}
if (key.empty()) {
output->SetError("ERR wrong number of arguments for 'incr' command");
return brpc::RedisCommandHandler::OK;
}
int64_t value; int64_t value;
s_mutex.lock(); s_mutex.lock();
value = ++int_map[args[1]]; value = ++int_map[key];
s_mutex.unlock(); s_mutex.unlock();
output->SetInteger(value); output->SetInteger(value);
return brpc::RedisCommandHandler::OK; return brpc::RedisCommandHandler::OK;
...@@ -831,38 +879,47 @@ public: ...@@ -831,38 +879,47 @@ public:
MultiCommandHandler() MultiCommandHandler()
: _started(false) {} : _started(false) {}
RedisCommandHandler::Result Run(const char* args[], RedisCommandHandler::Result Run(const char* args,
brpc::RedisReply* output) { brpc::RedisReply* output) {
if (strcasecmp(args[0], "multi") == 0) { butil::StringSplitter sp(args, ' ');
if (!_started) { std::string command;
output->SetStatus("OK"); std::vector<std::string> sargs;
_started = true; for (; sp; ++sp) {
} else { if (command.empty()) {
output->SetError("ERR duplicate multi"); command.assign(sp.field(), sp.length());
} }
return brpc::RedisCommandHandler::CONTINUE; if (strcasecmp(command.c_str(), "multi") == 0) {
} if (!_started) {
if (strcasecmp(args[0], "exec") != 0) { output->SetStatus("OK");
std::vector<std::string> sargs; _started = true;
for (const char** c = args; *c; ++c) { } else {
sargs.push_back(*c); output->SetError("ERR duplicate multi");
}
return brpc::RedisCommandHandler::CONTINUE;
} }
_commands.push_back(sargs); if (strcasecmp(command.c_str(), "exec") != 0) {
output->SetStatus("QUEUED"); std::string sarg(sp.field(), sp.length());
return brpc::RedisCommandHandler::CONTINUE; sargs.push_back(sarg);
} } else {
output->SetArray(_commands.size()); output->SetArray(_commands.size());
s_mutex.lock(); s_mutex.lock();
for (size_t i = 0; i < _commands.size(); ++i) { for (size_t i = 0; i < _commands.size(); ++i) {
if (_commands[i][0] == "incr") { if (_commands[i][0] == "incr") {
int64_t value; int64_t value;
value = ++int_map[_commands[i][1]]; value = ++int_map[_commands[i][1]];
(*output)[i].SetInteger(value); (*output)[i].SetInteger(value);
} else {
LOG(WARNING) << "unknown command: " << _commands[i][0];
}
}
s_mutex.unlock();
_started = false;
return brpc::RedisCommandHandler::OK;
} }
} }
s_mutex.unlock(); _commands.push_back(sargs);
_started = false; output->SetStatus("QUEUED");
return brpc::RedisCommandHandler::OK; return brpc::RedisCommandHandler::CONTINUE;
} }
RedisCommandHandler* New() { return new MultiCommandHandler; } RedisCommandHandler* New() { return new MultiCommandHandler; }
...@@ -904,21 +961,23 @@ TEST_F(RedisTest, server_command_continue) { ...@@ -904,21 +961,23 @@ TEST_F(RedisTest, server_command_continue) {
brpc::RedisResponse response; brpc::RedisResponse response;
brpc::Controller cntl; brpc::Controller cntl;
ASSERT_TRUE(request.AddCommand("multi")); ASSERT_TRUE(request.AddCommand("multi"));
ASSERT_TRUE(request.AddCommand("mUltI"));
int count = 10; int count = 10;
for (int i = 0; i < count; ++i) { for (int i = 0; i < count; ++i) {
ASSERT_TRUE(request.AddCommand("incr hello 1")); ASSERT_TRUE(request.AddCommand("incr hello 1"));
} }
ASSERT_TRUE(request.AddCommand("exec")); ASSERT_TRUE(request.AddCommand("exec"));
channel.CallMethod(NULL, &cntl, &request, &response, NULL); channel.CallMethod(NULL, &cntl, &request, &response, NULL);
ASSERT_EQ(12, response.reply_size()); ASSERT_EQ(13, response.reply_size());
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText(); ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(0).type()); ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(0).type());
ASSERT_STREQ("OK", response.reply(0).c_str()); ASSERT_STREQ("OK", response.reply(0).c_str());
for (int i = 1; i < count + 1; ++i) { ASSERT_EQ(brpc::REDIS_REPLY_ERROR, response.reply(1).type());
for (int i = 2; i < count + 2; ++i) {
ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(i).type()); ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(i).type());
ASSERT_STREQ("QUEUED", response.reply(i).c_str()); ASSERT_STREQ("QUEUED", response.reply(i).c_str());
} }
const brpc::RedisReply& m = response.reply(count+1); const brpc::RedisReply& m = response.reply(count + 2);
ASSERT_EQ(count, (int)m.size()); ASSERT_EQ(count, (int)m.size());
for (int i = 0; i < count; ++i) { for (int i = 0; i < count; ++i) {
ASSERT_EQ(i+1, m[i].integer()); ASSERT_EQ(i+1, m[i].integer());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment