Commit b8c35d85 authored by zhujiashun's avatar zhujiashun

redis_server_protocol: add TransactionHandler

parent e3117cfc
......@@ -61,8 +61,7 @@ struct InputResponse : public InputMessageBase {
class RedisConnContext : public Destroyable {
public:
RedisConnContext()
: redis_service(NULL)
, handler_continue(NULL) {}
: redis_service(NULL) {}
~RedisConnContext();
// @Destroyable
void Destroy() override;
......@@ -70,14 +69,14 @@ public:
SocketId socket_id;
RedisService* redis_service;
// If user starts a transaction, handler_continue indicates the
// first handler pointer that triggers the transaction.
RedisCommandHandler* handler_continue;
// handler pointer that runs the transaction handler.
std::unique_ptr<RedisCommandHandler> handler_continue;
RedisCommandParser parser;
std::vector<std::string> command;
};
std::string ToLowercase(const std::string& command) {
static std::string ToLowercase(const std::string& command) {
std::string res;
res.resize(command.size());
std::transform(command.begin(), command.end(), res.begin(),
......@@ -108,7 +107,7 @@ int ConsumeTask(RedisConnContext* ctx,
RedisCommandHandler::Result result =
ctx->handler_continue->Run(commands[i], &output[i], is_last);
if (result == RedisCommandHandler::OK) {
ctx->handler_continue = NULL;
ctx->handler_continue.reset(NULL);
}
} else {
bool is_last = true;
......@@ -133,7 +132,7 @@ int ConsumeTask(RedisConnContext* ctx,
RedisCommandHandler::Result result =
ch->Run(commands[i], &output[i], is_last);
if (result == RedisCommandHandler::CONTINUE) {
ctx->handler_continue = ch;
ctx->handler_continue.reset(ch->NewTransactionHandler());
}
}
}
......
......@@ -457,4 +457,9 @@ RedisCommandHandler* RedisService::FindCommandHandler(const std::string& name) {
return NULL;
}
RedisCommandHandler* RedisCommandHandler::NewTransactionHandler() {
LOG(ERROR) << "NewTransactionHandler is not implemented";
return NULL;
}
} // namespace brpc
......@@ -251,17 +251,29 @@ public:
// want to do some batch processing, user should buffer the command and output. Once
// `is_last' is true, then run all the command and set the output of each command.
// The return value should be RedisCommandHandler::OK for normal cases. If you want
// to implement transaction, return RedisCommandHandler::CONTINUE until server receives
// an ending marker. The first handler that return RedisCommandHandler::CONTINUE will
// continue receiving the following commands until it receives an ending marker and
// return RedisCommandHandler::OK to end transaction. For example, the return value
// of commands "multi; set k1 v1; set k2 v2; set k3 v3; exec" should be four
// RedisCommandHandler::CONTINUE and one RedisCommandHandler::OK since exec is the
// marker that ends the transaction. User should queue the commands and execute them
// all once the ending marker is received.
// to implement transaction, return RedisCommandHandler::CONTINUE once server receives
// an start marker and brpc will call MultiTransactionHandler() to new a transaction
// handler that all the following commands are sent to this tranction handler until
// it returns Result::OK. Read the comment below.
virtual RedisCommandHandler::Result Run(const std::vector<std::string>& command,
brpc::RedisReply* output,
bool is_last) = 0;
// This function is called to new a transaction handler once Run() returns
// RedisCommandHandler::CONTINUE. All the following commands are sent to this
// handler until it return Result::OK. For example, for command "multi; set k1 v1;
// set k2 v2; set k3 v3; exec":
// 1) In Run(), command is "multi", so return RedisCommandHandler::CONTINUE, and
// brpc calls NewTransactionHandler() to new a handler tran_handler.
// 2) brpc calls tran_handler.Run() with command "set k1 v1", which should return
// RedisCommandHandler::CONTINUE and buffer the command and output.
// 3) brpc calls tran_handler.Run() with command "set k2 v2", which should return
// RedisCommandHandler::CONTINUE and buffer the command and output.
// 4) brpc calls tran_handler.Run() with command "set k3 v3", which should return
// RedisCommandHandler::CONTINUE and buffer the command and output.
// 5) An ending marker(multi) is found in tran_handler.Run(), user exeuctes all
// the command and return RedisCommandHandler::OK. This Transation is done.
virtual RedisCommandHandler* NewTransactionHandler();
};
} // namespace brpc
......
......@@ -984,44 +984,50 @@ TEST_F(RedisTest, server_concurrency) {
class MultiCommandHandler : public brpc::RedisCommandHandler {
public:
MultiCommandHandler()
: _started(false) {}
MultiCommandHandler() {}
RedisCommandHandler::Result Run(const std::vector<std::string>& args,
brpc::RedisReply* output,
bool is_last) {
if (strcasecmp(args[0].c_str(), "multi") == 0) {
if (!_started) {
output->SetStatus("OK");
_started = true;
} else {
output->SetStatus("OK");
return brpc::RedisCommandHandler::CONTINUE;
}
RedisCommandHandler* NewTransactionHandler() {
return new MultiTransactionHandler;
}
class MultiTransactionHandler : public brpc::RedisCommandHandler {
public:
RedisCommandHandler::Result Run(const std::vector<std::string>& args,
brpc::RedisReply* output,
bool is_last) {
if (strcasecmp(args[0].c_str(), "multi") == 0) {
output->SetError("ERR duplicate multi");
return brpc::RedisCommandHandler::CONTINUE;
}
return brpc::RedisCommandHandler::CONTINUE;
}
if (strcasecmp(args[0].c_str(), "exec") != 0) {
_commands.push_back(args);
output->SetStatus("QUEUED");
return brpc::RedisCommandHandler::CONTINUE;
}
output->SetArray(_commands.size());
s_mutex.lock();
for (size_t i = 0; i < _commands.size(); ++i) {
if (_commands[i][0] == "incr") {
int64_t value;
value = ++int_map[_commands[i][1]];
(*output)[i].SetInteger(value);
if (strcasecmp(args[0].c_str(), "exec") != 0) {
_commands.push_back(args);
output->SetStatus("QUEUED");
return brpc::RedisCommandHandler::CONTINUE;
}
output->SetArray(_commands.size());
s_mutex.lock();
for (size_t i = 0; i < _commands.size(); ++i) {
if (_commands[i][0] == "incr") {
int64_t value;
value = ++int_map[_commands[i][1]];
(*output)[i].SetInteger(value);
} else {
(*output)[i].SetStatus("unknowne command");
}
}
s_mutex.unlock();
return brpc::RedisCommandHandler::OK;
}
s_mutex.unlock();
_started = false;
return brpc::RedisCommandHandler::OK;
}
RedisCommandHandler* New() { return new MultiCommandHandler; }
private:
std::vector<std::vector<std::string>> _commands;
bool _started;
private:
std::vector<std::vector<std::string>> _commands;
};
};
TEST_F(RedisTest, server_command_continue) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment