Commit 3c4d745b authored by zhujiashun's avatar zhujiashun

redis_server_protocol: refine code

parent c7b63da2
...@@ -57,7 +57,7 @@ struct InputResponse : public InputMessageBase { ...@@ -57,7 +57,7 @@ struct InputResponse : public InputMessageBase {
} }
}; };
class QueueMeta; class RedisConnContext;
class ConsumeTaskDone : public google::protobuf::Closure { class ConsumeTaskDone : public google::protobuf::Closure {
public: public:
ConsumeTaskDone() ConsumeTaskDone()
...@@ -73,14 +73,14 @@ private: ...@@ -73,14 +73,14 @@ private:
public: public:
RedisMessage output_message; RedisMessage output_message;
QueueMeta* meta; RedisConnContext* meta;
butil::IOBuf sendbuf; butil::IOBuf sendbuf;
}; };
class QueueMeta : public brpc::SharedObject { class RedisConnContext : public brpc::SharedObject {
public: public:
QueueMeta() : handler_continue(NULL) {} RedisConnContext() : handler_continue(NULL) {}
void Push(ConsumeTaskDone* done) { void Push(ConsumeTaskDone* done) {
std::unique_lock<butil::Mutex> m(_mutex); std::unique_lock<butil::Mutex> m(_mutex);
...@@ -93,13 +93,13 @@ public: ...@@ -93,13 +93,13 @@ public:
LOG(WARNING) << "Fail to address redis socket"; LOG(WARNING) << "Fail to address redis socket";
return; return;
} }
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
{ {
std::unique_lock<butil::Mutex> m(_mutex); std::unique_lock<butil::Mutex> m(_mutex);
if (_writing) return; if (_writing) return;
_writing = true; _writing = true;
} }
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
std::queue<ConsumeTaskDone*> ready_to_delete; std::queue<ConsumeTaskDone*> ready_to_delete;
while (true) { while (true) {
std::unique_lock<butil::Mutex> m(_mutex); std::unique_lock<butil::Mutex> m(_mutex);
...@@ -158,13 +158,13 @@ const char** ParseArgs(const RedisMessage& message) { ...@@ -158,13 +158,13 @@ const char** ParseArgs(const RedisMessage& message) {
} }
void ConsumeTaskDone::Run() { void ConsumeTaskDone::Run() {
butil::intrusive_ptr<QueueMeta> delete_queue_meta(meta, false); butil::intrusive_ptr<RedisConnContext> delete_queue_meta(meta, false);
output_message.SerializeToIOBuf(&sendbuf); output_message.SerializeToIOBuf(&sendbuf);
_ready.store(true, butil::memory_order_release); _ready.store(true, butil::memory_order_release);
meta->Flush(); meta->Flush();
} }
int ConsumeTask(QueueMeta* meta, const RedisMessage& m) { int ConsumeTask(RedisConnContext* meta, const RedisMessage& m) {
ConsumeTaskDone* done = new ConsumeTaskDone; ConsumeTaskDone* done = new ConsumeTaskDone;
ClosureGuard done_guard(done); ClosureGuard done_guard(done);
meta->Push(done); meta->Push(done);
...@@ -174,7 +174,7 @@ int ConsumeTask(QueueMeta* meta, const RedisMessage& m) { ...@@ -174,7 +174,7 @@ int ConsumeTask(QueueMeta* meta, const RedisMessage& m) {
const char** args = ParseArgs(m); const char** args = ParseArgs(m);
if (!args) { if (!args) {
output.set_error("ERR command not string"); output.SetError("ERR command not string");
return -1; return -1;
} }
if (meta->handler_continue) { if (meta->handler_continue) {
...@@ -195,7 +195,7 @@ int ConsumeTask(QueueMeta* meta, const RedisMessage& m) { ...@@ -195,7 +195,7 @@ int ConsumeTask(QueueMeta* meta, const RedisMessage& m) {
if (it == meta->command_map.end()) { if (it == meta->command_map.end()) {
char buf[64]; char buf[64];
snprintf(buf, sizeof(buf), "ERR unknown command `%s`", comm.c_str()); snprintf(buf, sizeof(buf), "ERR unknown command `%s`", comm.c_str());
output.set_error(buf); output.SetError(buf);
} else { } else {
RedisCommandHandler::Result result = RedisCommandHandler::Result result =
it->second->Run(args, &output, done_guard.release()); it->second->Run(args, &output, done_guard.release());
...@@ -211,7 +211,7 @@ int ConsumeTask(QueueMeta* meta, const RedisMessage& m) { ...@@ -211,7 +211,7 @@ int ConsumeTask(QueueMeta* meta, const RedisMessage& m) {
} }
int Consume(void* meta, bthread::TaskIterator<TaskContext*>& iter) { int Consume(void* meta, bthread::TaskIterator<TaskContext*>& iter) {
QueueMeta* qmeta = static_cast<QueueMeta*>(meta); RedisConnContext* qmeta = static_cast<RedisConnContext*>(meta);
if (iter.is_queue_stopped()) { if (iter.is_queue_stopped()) {
qmeta->RemoveRefManually(); qmeta->RemoveRefManually();
return 0; return 0;
...@@ -232,7 +232,7 @@ public: ...@@ -232,7 +232,7 @@ public:
// @Destroyable // @Destroyable
void Destroy() { delete this; } void Destroy() { delete this; }
int init(QueueMeta* meta) { int init(RedisConnContext* meta) {
bthread::ExecutionQueueOptions q_opt; bthread::ExecutionQueueOptions q_opt;
q_opt.bthread_attr = q_opt.bthread_attr =
FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL; FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL;
...@@ -259,7 +259,7 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket, ...@@ -259,7 +259,7 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
} }
ServerContext* ctx = static_cast<ServerContext*>(socket->parsing_context()); ServerContext* ctx = static_cast<ServerContext*>(socket->parsing_context());
if (ctx == NULL) { if (ctx == NULL) {
QueueMeta* meta = new QueueMeta; RedisConnContext* meta = new RedisConnContext;
meta->AddRefManually(); meta->AddRefManually();
meta->socket_id = socket->id(); meta->socket_id = socket->id();
rs->CloneCommandMap(&meta->command_map); rs->CloneCommandMap(&meta->command_map);
......
...@@ -442,6 +442,10 @@ bool RedisService::AddCommandHandler(const std::string& name, RedisCommandHandle ...@@ -442,6 +442,10 @@ bool RedisService::AddCommandHandler(const std::string& name, RedisCommandHandle
for (auto c : name) { for (auto c : name) {
lcname.push_back(std::tolower(c)); lcname.push_back(std::tolower(c));
} }
if (_command_map.count(lcname)) {
LOG(ERROR) << "redis command name=" << name << " exist";
return false;
}
_command_map[lcname].reset(handler); _command_map[lcname].reset(handler);
return true; return true;
} }
......
...@@ -212,15 +212,34 @@ private: ...@@ -212,15 +212,34 @@ private:
std::ostream& operator<<(std::ostream& os, const RedisRequest&); std::ostream& operator<<(std::ostream& os, const RedisRequest&);
std::ostream& operator<<(std::ostream& os, const RedisResponse&); std::ostream& operator<<(std::ostream& os, const RedisResponse&);
class RedisCommandHandler;
// Implement this class and assign an instance to ServerOption.redis_service
// to enable redis support. To support a particular command, you should implement
// the corresponding handler and call AddCommandHandler to install it.
class RedisService {
public:
typedef std::unordered_map<std::string, std::shared_ptr<RedisCommandHandler>> CommandMap;
virtual ~RedisService() {}
bool AddCommandHandler(const std::string& name, RedisCommandHandler* handler);
void CloneCommandMap(CommandMap* map);
private:
CommandMap _command_map;
};
// The handler for a redis command. Run() and New() should be implemented // The handler for a redis command. Run() and New() should be implemented
// by user. For Run(), `args` is the redis command argument. For example, // by user.
//
// For Run(), `args` is the redis command argument. For example,
// "set foo bar" corresponds to args[0] == "set", args[1] == "foo" and // "set foo bar" corresponds to args[0] == "set", args[1] == "foo" and
// args[2] == "bar". `output` is the content that sent to client side, // args[2] == "bar". `output` is the content that sent to client side,
// which should be set by user. Read brpc/src/redis_message.h for more usage. // which should be set by user. Read brpc/src/redis_message.h for more usage.
// `arena` is the memory arena that `output` would use. // User has to call `done->Run()` when everything is set up into `output`.
//
// For New(), whenever a tcp connection is established, a bunch of new handlers // For New(), whenever a tcp connection is established, a bunch of new handlers
// would be created using New() of corresponding handler and brpc makes sure that // would be created using New() of the corresponding handler and brpc makes sure
// all requests of the same command name from one connection would be redirected // that all requests of the same command name from one connection would be redirected
// to the same New()-ed command handler. All requests in one connection are // to the same New()-ed command handler. All requests in one connection are
// executed sequentially, just like what redis-server does. // executed sequentially, just like what redis-server does.
class RedisCommandHandler { class RedisCommandHandler {
...@@ -238,20 +257,6 @@ public: ...@@ -238,20 +257,6 @@ public:
virtual RedisCommandHandler* New() = 0; virtual RedisCommandHandler* New() = 0;
}; };
// Implement this class and assign an instance to ServerOption.redis_service
// to enable redis support. To support a particular command, you should implement
// the corresponding handler and call AddCommandHandler to install it.
class RedisService {
public:
typedef std::unordered_map<std::string, std::shared_ptr<RedisCommandHandler>> CommandMap;
virtual ~RedisService() {}
bool AddCommandHandler(const std::string& name, RedisCommandHandler* handler);
void CloneCommandMap(CommandMap* map);
private:
CommandMap _command_map;
};
} // namespace brpc } // namespace brpc
#endif // BRPC_REDIS_H #endif // BRPC_REDIS_H
...@@ -46,6 +46,8 @@ class RedisMessage { ...@@ -46,6 +46,8 @@ class RedisMessage {
public: public:
// A default reply is a nil. // A default reply is a nil.
RedisMessage(); RedisMessage();
// All SetXXX Method would allocate memory from *arena.
RedisMessage(butil::Arena* arena); RedisMessage(butil::Arena* arena);
// Type of the reply. // Type of the reply.
...@@ -57,12 +59,12 @@ public: ...@@ -57,12 +59,12 @@ public:
bool is_string() const; // True if the reply is a string. bool is_string() const; // True if the reply is a string.
bool is_array() const; // True if the reply is an array. bool is_array() const; // True if the reply is an array.
bool set_nil_string(); // "$-1\r\n" bool SetNilString(); // "$-1\r\n"
bool set_array(int size); // size == -1 means nil array("*-1\r\n") bool SetArray(int size); // size == -1 means nil array("*-1\r\n")
bool set_status(const std::string& str); bool SetStatus(const std::string& str);
bool set_error(const std::string& str); bool SetError(const std::string& str);
bool set_integer(int64_t value); bool SetInteger(int64_t value);
bool set_bulk_string(const std::string& str); bool SetBulkString(const std::string& str);
// Convert the reply into a signed 64-bit integer(according to // Convert the reply into a signed 64-bit integer(according to
// http://redis.io/topics/protocol). If the reply is not an integer, // http://redis.io/topics/protocol). If the reply is not an integer,
...@@ -130,7 +132,7 @@ private: ...@@ -130,7 +132,7 @@ private:
// by calling CopyFrom[Different|Same]Arena. // by calling CopyFrom[Different|Same]Arena.
DISALLOW_COPY_AND_ASSIGN(RedisMessage); DISALLOW_COPY_AND_ASSIGN(RedisMessage);
bool set_basic_string(const std::string& str, RedisMessageType type); bool SetBasicString(const std::string& str, RedisMessageType type);
RedisMessageType _type; RedisMessageType _type;
uint32_t _length; // length of short_str/long_str, count of replies uint32_t _length; // length of short_str/long_str, count of replies
...@@ -187,14 +189,14 @@ inline int64_t RedisMessage::integer() const { ...@@ -187,14 +189,14 @@ inline int64_t RedisMessage::integer() const {
return 0; return 0;
} }
inline bool RedisMessage::set_nil_string() { inline bool RedisMessage::SetNilString() {
if (!_arena) return false; if (!_arena) return false;
_type = REDIS_MESSAGE_STRING; _type = REDIS_MESSAGE_STRING;
_length = npos; _length = npos;
return true; return true;
} }
inline bool RedisMessage::set_array(int size) { inline bool RedisMessage::SetArray(int size) {
if (!_arena) { if (!_arena) {
return false; return false;
} }
...@@ -219,7 +221,7 @@ inline bool RedisMessage::set_array(int size) { ...@@ -219,7 +221,7 @@ inline bool RedisMessage::set_array(int size) {
return true; return true;
} }
inline bool RedisMessage::set_basic_string(const std::string& str, RedisMessageType type) { inline bool RedisMessage::SetBasicString(const std::string& str, RedisMessageType type) {
if (!_arena) { if (!_arena) {
return false; return false;
} }
...@@ -242,23 +244,23 @@ inline bool RedisMessage::set_basic_string(const std::string& str, RedisMessageT ...@@ -242,23 +244,23 @@ inline bool RedisMessage::set_basic_string(const std::string& str, RedisMessageT
return true; return true;
} }
inline bool RedisMessage::set_status(const std::string& str) { inline bool RedisMessage::SetStatus(const std::string& str) {
return set_basic_string(str, REDIS_MESSAGE_STATUS); return SetBasicString(str, REDIS_MESSAGE_STATUS);
} }
inline bool RedisMessage::set_error(const std::string& str) { inline bool RedisMessage::SetError(const std::string& str) {
return set_basic_string(str, REDIS_MESSAGE_ERROR); return SetBasicString(str, REDIS_MESSAGE_ERROR);
} }
inline bool RedisMessage::set_integer(int64_t value) { inline bool RedisMessage::SetInteger(int64_t value) {
_type = REDIS_MESSAGE_INTEGER; _type = REDIS_MESSAGE_INTEGER;
_length = 0; _length = 0;
_data.integer = value; _data.integer = value;
return true; return true;
} }
inline bool RedisMessage::set_bulk_string(const std::string& str) { inline bool RedisMessage::SetBulkString(const std::string& str) {
return set_basic_string(str, REDIS_MESSAGE_STRING); return SetBasicString(str, REDIS_MESSAGE_STRING);
} }
inline const char* RedisMessage::c_str() const { inline const char* RedisMessage::c_str() const {
......
...@@ -555,7 +555,7 @@ TEST_F(RedisTest, codec) { ...@@ -555,7 +555,7 @@ TEST_F(RedisTest, codec) {
{ {
brpc::RedisMessage r(&arena); brpc::RedisMessage r(&arena);
butil::IOBuf buf; butil::IOBuf buf;
ASSERT_TRUE(r.set_status("OK")); ASSERT_TRUE(r.SetStatus("OK"));
ASSERT_TRUE(r.SerializeToIOBuf(&buf)); ASSERT_TRUE(r.SerializeToIOBuf(&buf));
ASSERT_STREQ(buf.to_string().c_str(), "+OK\r\n"); ASSERT_STREQ(buf.to_string().c_str(), "+OK\r\n");
ASSERT_STREQ(r.c_str(), "OK"); ASSERT_STREQ(r.c_str(), "OK");
...@@ -569,7 +569,7 @@ TEST_F(RedisTest, codec) { ...@@ -569,7 +569,7 @@ TEST_F(RedisTest, codec) {
{ {
brpc::RedisMessage r(&arena); brpc::RedisMessage r(&arena);
butil::IOBuf buf; butil::IOBuf buf;
ASSERT_TRUE(r.set_error("not exist \'key\'")); ASSERT_TRUE(r.SetError("not exist \'key\'"));
ASSERT_TRUE(r.SerializeToIOBuf(&buf)); ASSERT_TRUE(r.SerializeToIOBuf(&buf));
ASSERT_STREQ(buf.to_string().c_str(), "-not exist \'key\'\r\n"); ASSERT_STREQ(buf.to_string().c_str(), "-not exist \'key\'\r\n");
r.Clear(); r.Clear();
...@@ -582,7 +582,7 @@ TEST_F(RedisTest, codec) { ...@@ -582,7 +582,7 @@ TEST_F(RedisTest, codec) {
{ {
brpc::RedisMessage r(&arena); brpc::RedisMessage r(&arena);
butil::IOBuf buf; butil::IOBuf buf;
ASSERT_TRUE(r.set_nil_string()); ASSERT_TRUE(r.SetNilString());
ASSERT_TRUE(r.SerializeToIOBuf(&buf)); ASSERT_TRUE(r.SerializeToIOBuf(&buf));
ASSERT_STREQ(buf.to_string().c_str(), "$-1\r\n"); ASSERT_STREQ(buf.to_string().c_str(), "$-1\r\n");
r.Clear(); r.Clear();
...@@ -591,7 +591,7 @@ TEST_F(RedisTest, codec) { ...@@ -591,7 +591,7 @@ TEST_F(RedisTest, codec) {
ASSERT_TRUE(r.is_nil()); ASSERT_TRUE(r.is_nil());
r.Clear(); r.Clear();
ASSERT_TRUE(r.set_bulk_string("abcde'hello world")); ASSERT_TRUE(r.SetBulkString("abcde'hello world"));
ASSERT_TRUE(r.SerializeToIOBuf(&buf)); ASSERT_TRUE(r.SerializeToIOBuf(&buf));
ASSERT_STREQ(buf.to_string().c_str(), "$17\r\nabcde'hello world\r\n"); ASSERT_STREQ(buf.to_string().c_str(), "$17\r\nabcde'hello world\r\n");
ASSERT_STREQ(r.c_str(), "abcde'hello world"); ASSERT_STREQ(r.c_str(), "abcde'hello world");
...@@ -610,7 +610,7 @@ TEST_F(RedisTest, codec) { ...@@ -610,7 +610,7 @@ TEST_F(RedisTest, codec) {
const char* output[] = { ":-1\r\n", ":1234567\r\n" }; const char* output[] = { ":-1\r\n", ":1234567\r\n" };
for (int i = 0; i < t; ++i) { for (int i = 0; i < t; ++i) {
r.Clear(); r.Clear();
ASSERT_TRUE(r.set_integer(input[i])); ASSERT_TRUE(r.SetInteger(input[i]));
ASSERT_TRUE(r.SerializeToIOBuf(&buf)); ASSERT_TRUE(r.SerializeToIOBuf(&buf));
ASSERT_STREQ(buf.to_string().c_str(), output[i]); ASSERT_STREQ(buf.to_string().c_str(), output[i]);
r.Clear(); r.Clear();
...@@ -624,13 +624,13 @@ TEST_F(RedisTest, codec) { ...@@ -624,13 +624,13 @@ TEST_F(RedisTest, codec) {
{ {
brpc::RedisMessage r(&arena); brpc::RedisMessage r(&arena);
butil::IOBuf buf; butil::IOBuf buf;
ASSERT_TRUE(r.set_array(3)); ASSERT_TRUE(r.SetArray(3));
brpc::RedisMessage& sub_reply = r[0]; brpc::RedisMessage& sub_reply = r[0];
sub_reply.set_array(2); sub_reply.SetArray(2);
sub_reply[0].set_bulk_string("hello, it's me"); sub_reply[0].SetBulkString("hello, it's me");
sub_reply[1].set_integer(422); sub_reply[1].SetInteger(422);
r[1].set_bulk_string("To go over everything"); r[1].SetBulkString("To go over everything");
r[2].set_integer(1); r[2].SetInteger(1);
ASSERT_TRUE(r[3].is_nil()); ASSERT_TRUE(r[3].is_nil());
ASSERT_TRUE(r.SerializeToIOBuf(&buf)); ASSERT_TRUE(r.SerializeToIOBuf(&buf));
ASSERT_STREQ(buf.to_string().c_str(), ASSERT_STREQ(buf.to_string().c_str(),
...@@ -653,7 +653,7 @@ TEST_F(RedisTest, codec) { ...@@ -653,7 +653,7 @@ TEST_F(RedisTest, codec) {
r.Clear(); r.Clear();
// nil array // nil array
ASSERT_TRUE(r.set_array(-1)); ASSERT_TRUE(r.SetArray(-1));
ASSERT_TRUE(r.SerializeToIOBuf(&buf)); ASSERT_TRUE(r.SerializeToIOBuf(&buf));
ASSERT_STREQ(buf.to_string().c_str(), "*-1\r\n"); ASSERT_STREQ(buf.to_string().c_str(), "*-1\r\n");
ASSERT_EQ(r.ConsumePartialIOBuf(buf, &arena), brpc::PARSE_OK); ASSERT_EQ(r.ConsumePartialIOBuf(buf, &arena), brpc::PARSE_OK);
...@@ -686,7 +686,7 @@ public: ...@@ -686,7 +686,7 @@ public:
std::string key = args[1]; std::string key = args[1];
std::string value = args[2]; std::string value = args[2];
m[key] = value; m[key] = value;
output->set_status("OK"); output->SetStatus("OK");
if (_rand_sleep) { if (_rand_sleep) {
bthread_t bth; bthread_t bth;
bthread_start_background(&bth, NULL, random_sleep, done_guard.release()); bthread_start_background(&bth, NULL, random_sleep, done_guard.release());
...@@ -713,9 +713,9 @@ public: ...@@ -713,9 +713,9 @@ public:
std::string key = args[1]; std::string key = args[1];
auto it = m.find(key); auto it = m.find(key);
if (it != m.end()) { if (it != m.end()) {
output->set_bulk_string(it->second); output->SetBulkString(it->second);
} else { } else {
output->set_nil_string(); output->SetNilString();
} }
if (_rand_sleep) { if (_rand_sleep) {
bthread_t bth; bthread_t bth;
...@@ -744,7 +744,7 @@ public: ...@@ -744,7 +744,7 @@ public:
s_mutex.lock(); s_mutex.lock();
value = ++int_map[args[1]]; value = ++int_map[args[1]];
s_mutex.unlock(); s_mutex.unlock();
output->set_integer(value); output->SetInteger(value);
if (_rand_sleep) { if (_rand_sleep) {
bthread_t bth; bthread_t bth;
bthread_start_background(&bth, NULL, random_sleep, done_guard.release()); bthread_start_background(&bth, NULL, random_sleep, done_guard.release());
...@@ -866,7 +866,7 @@ public: ...@@ -866,7 +866,7 @@ public:
google::protobuf::Closure* done) { google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done); brpc::ClosureGuard done_guard(done);
if (strcmp(args[0], "multi") == 0) { if (strcmp(args[0], "multi") == 0) {
output->set_status("OK"); output->SetStatus("OK");
return brpc::RedisCommandHandler::CONTINUE; return brpc::RedisCommandHandler::CONTINUE;
} }
if (strcmp(args[0], "exec") != 0) { if (strcmp(args[0], "exec") != 0) {
...@@ -875,16 +875,16 @@ public: ...@@ -875,16 +875,16 @@ public:
sargs.push_back(*c); sargs.push_back(*c);
} }
commands.push_back(sargs); commands.push_back(sargs);
output->set_status("QUEUED"); output->SetStatus("QUEUED");
return brpc::RedisCommandHandler::CONTINUE; return brpc::RedisCommandHandler::CONTINUE;
} }
output->set_array(commands.size()); output->SetArray(commands.size());
s_mutex.lock(); s_mutex.lock();
for (size_t i = 0; i < commands.size(); ++i) { for (size_t i = 0; i < commands.size(); ++i) {
if (commands[i][0] == "incr") { if (commands[i][0] == "incr") {
int64_t value; int64_t value;
value = ++int_map[commands[i][1]]; value = ++int_map[commands[i][1]];
(*output)[i].set_integer(value); (*output)[i].SetInteger(value);
} }
} }
s_mutex.unlock(); s_mutex.unlock();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment