Commit bb09b153 authored by zhujiashun's avatar zhujiashun

redis_server_protocol: combine ServerContext and RedisConnContext

parent 13ffbdf5
...@@ -77,11 +77,33 @@ public: ...@@ -77,11 +77,33 @@ public:
}; };
class RedisConnContext : public brpc::SharedObject { struct TaskContext {
RedisMessage message;
butil::Arena arena;
};
int Consume(void* meta, bthread::TaskIterator<TaskContext*>& iter);
class RedisConnContext : public SharedObject
, public Destroyable {
public: public:
RedisConnContext() : handler_continue(NULL) {} RedisConnContext() : handler_continue(NULL) {}
~RedisConnContext() { ~RedisConnContext() {
ClearQueue(dones); CHECK(dones.empty());
}
// @Destroyable
void Destroy() {
bthread::execution_queue_stop(queue);
}
int Init() {
bthread::ExecutionQueueOptions q_opt;
q_opt.bthread_attr =
FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL;
if (bthread::execution_queue_start(&queue, &q_opt, Consume, this) != 0) {
LOG(ERROR) << "Fail to start execution queue";
return -1;
}
return 0;
} }
void Push(ConsumeTaskDone* done) { void Push(ConsumeTaskDone* done) {
...@@ -129,7 +151,6 @@ public: ...@@ -129,7 +151,6 @@ public:
SocketId socket_id; SocketId socket_id;
RedisService::CommandMap command_map; RedisService::CommandMap command_map;
RedisCommandHandler* handler_continue; RedisCommandHandler* handler_continue;
std::queue<ConsumeTaskDone*> dones;
private: private:
void ClearQueue(std::queue<ConsumeTaskDone*>& queue) { void ClearQueue(std::queue<ConsumeTaskDone*>& queue) {
...@@ -140,14 +161,25 @@ private: ...@@ -140,14 +161,25 @@ private:
} }
} }
bthread::ExecutionQueueId<TaskContext*> queue;
bool _writing = false; bool _writing = false;
butil::Mutex _mutex; butil::Mutex _mutex;
std::queue<ConsumeTaskDone*> dones;
}; };
struct TaskContext { int ConsumeTask(RedisConnContext* meta, const RedisMessage& m);
RedisMessage message; int Consume(void* meta, bthread::TaskIterator<TaskContext*>& iter) {
butil::Arena arena; RedisConnContext* qmeta = static_cast<RedisConnContext*>(meta);
}; if (iter.is_queue_stopped()) {
qmeta->RemoveRefManually();
return 0;
}
for (; iter; ++iter) {
std::unique_ptr<TaskContext> ctx(*iter);
ConsumeTask(qmeta, ctx->message);
}
return 0;
}
const char** ParseArgs(const RedisMessage& message) { const char** ParseArgs(const RedisMessage& message) {
const char** args = (const char**) const char** args = (const char**)
...@@ -212,42 +244,6 @@ int ConsumeTask(RedisConnContext* meta, const RedisMessage& m) { ...@@ -212,42 +244,6 @@ int ConsumeTask(RedisConnContext* meta, const RedisMessage& m) {
return 0; return 0;
} }
int Consume(void* meta, bthread::TaskIterator<TaskContext*>& iter) {
RedisConnContext* qmeta = static_cast<RedisConnContext*>(meta);
if (iter.is_queue_stopped()) {
qmeta->RemoveRefManually();
return 0;
}
for (; iter; ++iter) {
std::unique_ptr<TaskContext> ctx(*iter);
ConsumeTask(qmeta, ctx->message);
}
return 0;
}
class ServerContext : public Destroyable {
public:
~ServerContext() {
bthread::execution_queue_stop(queue);
}
// @Destroyable
void Destroy() { delete this; }
int init(RedisConnContext* meta) {
bthread::ExecutionQueueOptions q_opt;
q_opt.bthread_attr =
FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL;
if (bthread::execution_queue_start(&queue, &q_opt, Consume, meta) != 0) {
LOG(ERROR) << "Fail to start execution queue";
return -1;
}
return 0;
}
bthread::ExecutionQueueId<TaskContext*> queue;
};
ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket, ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
bool read_eof, const void* arg) { bool read_eof, const void* arg) {
if (read_eof || source->empty()) { if (read_eof || source->empty()) {
...@@ -259,17 +255,16 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket, ...@@ -259,17 +255,16 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
if (!rs) { if (!rs) {
return MakeParseError(PARSE_ERROR_TRY_OTHERS); return MakeParseError(PARSE_ERROR_TRY_OTHERS);
} }
ServerContext* ctx = static_cast<ServerContext*>(socket->parsing_context()); RedisConnContext* ctx = static_cast<RedisConnContext*>(socket->parsing_context());
if (ctx == NULL) { if (ctx == NULL) {
RedisConnContext* meta = new RedisConnContext; ctx = new RedisConnContext;
meta->AddRefManually(); // add ref for Consume()
meta->socket_id = socket->id(); ctx->AddRefManually();
rs->CloneCommandMap(&meta->command_map); ctx->socket_id = socket->id();
ctx = new ServerContext; rs->CloneCommandMap(&ctx->command_map);
if (ctx->init(meta) != 0) { if (ctx->Init() != 0) {
delete ctx; ctx->RemoveRefManually();
meta->RemoveRefManually(); LOG(ERROR) << "Fail to init redis RedisConnContext";
LOG(ERROR) << "Fail to init redis ServerContext";
return MakeParseError(PARSE_ERROR_NO_RESOURCE); return MakeParseError(PARSE_ERROR_NO_RESOURCE);
} }
socket->reset_parsing_context(ctx); socket->reset_parsing_context(ctx);
......
...@@ -30,6 +30,7 @@ ...@@ -30,6 +30,7 @@
#include "brpc/redis_message.h" #include "brpc/redis_message.h"
#include "brpc/parse_result.h" #include "brpc/parse_result.h"
#include "brpc/callback.h" #include "brpc/callback.h"
#include "brpc/socket.h"
namespace brpc { namespace brpc {
...@@ -215,45 +216,56 @@ std::ostream& operator<<(std::ostream& os, const RedisResponse&); ...@@ -215,45 +216,56 @@ std::ostream& operator<<(std::ostream& os, const RedisResponse&);
class RedisCommandHandler; class RedisCommandHandler;
// Implement this class and assign an instance to ServerOption.redis_service // Implement this class and assign an instance to ServerOption.redis_service
// to enable redis support. To support a particular command, you should implement // to enable redis support.
// the corresponding handler and call AddCommandHandler to install it.
class RedisService { class RedisService {
public: public:
typedef std::unordered_map<std::string, std::shared_ptr<RedisCommandHandler>> CommandMap;
virtual ~RedisService() {} virtual ~RedisService() {}
// Call this function to register `handler` that can handle command `name`.
bool AddCommandHandler(const std::string& name, RedisCommandHandler* handler); bool AddCommandHandler(const std::string& name, RedisCommandHandler* handler);
void CloneCommandMap(CommandMap* map);
private: private:
typedef std::unordered_map<std::string, std::shared_ptr<RedisCommandHandler>> CommandMap;
friend ParseResult ParseRedisMessage(butil::IOBuf*, Socket*, bool, const void*);
void CloneCommandMap(CommandMap* map);
CommandMap _command_map; CommandMap _command_map;
}; };
// The handler for a redis command. Run() and New() should be implemented // The Command handler for a redis request. User should impletement Run() and New().
// by user.
//
// For Run(), `args` is the redis command argument. For example,
// "set foo bar" corresponds to args[0] == "set", args[1] == "foo" and
// args[2] == "bar". `output` is the content that sent to client side,
// which should be set by user. Read brpc/src/redis_message.h for more usage.
// User has to call `done->Run()` when everything is set up into `output`.
//
// For New(), whenever a tcp connection is established, a bunch of new handlers
// would be created using New() of the corresponding handler and brpc makes sure
// that all requests of the same command name from one connection would be redirected
// to the same New()-ed command handler. All requests in one connection are
// executed sequentially, just like what redis-server does.
class RedisCommandHandler { class RedisCommandHandler {
public: public:
enum Result { enum Result {
OK = 0, OK = 0,
CONTINUE = 1, CONTINUE = 1,
}; };
~RedisCommandHandler() {} ~RedisCommandHandler() {}
// Once Server receives commands, it will first find the corresponding handlers and
// call them sequentially(one by one) according to the order that requests arrive,
// just like what redis-server does.
// `args` is an array of redis command arguments, ending with nullptr. For example,
// command "set foo bar" corresponds to args[0] == "set", args[1] == "foo",
// args[2] == "bar" and args[3] == nullptr.
// `output`, which should be filled by user, is the content that sent to client side.
// Read brpc/src/redis_message.h for more usage.
// Remember to call `done->Run()` when everything is set up into `output`. The return
// value should be RedisCommandHandler::OK for normal cases. If you want to implement
// transaction, return RedisCommandHandler::CONTINUE until server receives an ending
// marker. The first handler that return RedisCommandHandler::CONTINUE will continue
// receiving the following commands until it receives a ending marker and return
// RedisCommandHandler::OK to end transaction. For example, the return value of
// commands "multi; set k1 v1; set k2 v2; set k3 v3; exec" should be four
// RedisCommandHandler::CONTINUE and one RedisCommandHandler::OK since exec is the
// marker that ends the transaction. User may queue the commands and execute them
// all once an ending marker is received.
virtual RedisCommandHandler::Result Run(const char* args[], virtual RedisCommandHandler::Result Run(const char* args[],
RedisMessage* output, RedisMessage* output,
google::protobuf::Closure* done) = 0; google::protobuf::Closure* done) = 0;
// Whenever a tcp connection is established, a bunch of new handlers would be created
// using New() of the corresponding handler and brpc makes sure that all requests from
// one connection with the same command name would be redirected to the same New()-ed
// command handler.
virtual RedisCommandHandler* New() = 0; virtual RedisCommandHandler* New() = 0;
}; };
......
...@@ -880,38 +880,49 @@ TEST_F(RedisTest, server_concurrency) { ...@@ -880,38 +880,49 @@ TEST_F(RedisTest, server_concurrency) {
class MultiCommandHandler : public brpc::RedisCommandHandler { class MultiCommandHandler : public brpc::RedisCommandHandler {
public: public:
MultiCommandHandler()
: _started(false) {}
RedisCommandHandler::Result Run(const char* args[], RedisCommandHandler::Result Run(const char* args[],
brpc::RedisMessage* output, brpc::RedisMessage* output,
google::protobuf::Closure* done) { google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done); brpc::ClosureGuard done_guard(done);
if (strcmp(args[0], "multi") == 0) { if (strcasecmp(args[0], "multi") == 0) {
if (!_started) {
output->SetStatus("OK"); output->SetStatus("OK");
_started = true;
} else {
output->SetError("ERR duplicate multi");
}
return brpc::RedisCommandHandler::CONTINUE; return brpc::RedisCommandHandler::CONTINUE;
} }
if (strcmp(args[0], "exec") != 0) { if (strcasecmp(args[0], "exec") != 0) {
std::vector<std::string> sargs; std::vector<std::string> sargs;
for (const char** c = args; *c; ++c) { for (const char** c = args; *c; ++c) {
sargs.push_back(*c); sargs.push_back(*c);
} }
commands.push_back(sargs); _commands.push_back(sargs);
output->SetStatus("QUEUED"); output->SetStatus("QUEUED");
return brpc::RedisCommandHandler::CONTINUE; return brpc::RedisCommandHandler::CONTINUE;
} }
output->SetArray(commands.size()); output->SetArray(_commands.size());
s_mutex.lock(); s_mutex.lock();
for (size_t i = 0; i < commands.size(); ++i) { for (size_t i = 0; i < _commands.size(); ++i) {
if (commands[i][0] == "incr") { if (_commands[i][0] == "incr") {
int64_t value; int64_t value;
value = ++int_map[commands[i][1]]; value = ++int_map[_commands[i][1]];
(*output)[i].SetInteger(value); (*output)[i].SetInteger(value);
} }
} }
s_mutex.unlock(); s_mutex.unlock();
_started = false;
return brpc::RedisCommandHandler::OK; return brpc::RedisCommandHandler::OK;
} }
RedisCommandHandler* New() { return new MultiCommandHandler; } RedisCommandHandler* New() { return new MultiCommandHandler; }
std::vector<std::vector<std::string>> commands; private:
std::vector<std::vector<std::string>> _commands;
bool _started;
}; };
TEST_F(RedisTest, server_command_continue) { TEST_F(RedisTest, server_command_continue) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment