Commit da671ff4 authored by zhujiashun's avatar zhujiashun

redis_server_protocol: add BATCHED_DONE code

parent ad7081f8
...@@ -88,8 +88,10 @@ int ConsumeCommand(RedisConnContext* ctx, ...@@ -88,8 +88,10 @@ int ConsumeCommand(RedisConnContext* ctx,
result = ctx->transaction_handler->Run(commands, &output, is_last); result = ctx->transaction_handler->Run(commands, &output, is_last);
if (result == RedisCommandHandler::OK) { if (result == RedisCommandHandler::OK) {
ctx->transaction_handler.reset(NULL); ctx->transaction_handler.reset(NULL);
} else if (result == RedisCommandHandler::BATCHED) { } else if (result == RedisCommandHandler::BATCHED ||
LOG(ERROR) << "BATCHED should not be returned by a transaction handler."; result == RedisCommandHandler::BATCHED_DONE) {
LOG(ERROR) << "BATCHED/BATCHED_DONE should not be returned by a "
" transaction handler.";
return -1; return -1;
} }
} else { } else {
...@@ -116,8 +118,11 @@ int ConsumeCommand(RedisConnContext* ctx, ...@@ -116,8 +118,11 @@ int ConsumeCommand(RedisConnContext* ctx,
} }
} }
} }
if (result == RedisCommandHandler::OK) { if (result == RedisCommandHandler::OK || result == RedisCommandHandler::CONTINUE) {
if (ctx->batched_size) { output.SerializeTo(sendbuf);
} else if (result == RedisCommandHandler::BATCHED) {
// just do nothing and wait handler to return BATCHED_DONE.
} else if (result == RedisCommandHandler::BATCHED_DONE) {
if ((int)output.size() != (ctx->batched_size + 1)) { if ((int)output.size() != (ctx->batched_size + 1)) {
LOG(ERROR) << "reply array size can't be matched with batched size, " LOG(ERROR) << "reply array size can't be matched with batched size, "
<< " expected=" << ctx->batched_size + 1 << " actual=" << output.size(); << " expected=" << ctx->batched_size + 1 << " actual=" << output.size();
...@@ -127,13 +132,6 @@ int ConsumeCommand(RedisConnContext* ctx, ...@@ -127,13 +132,6 @@ int ConsumeCommand(RedisConnContext* ctx,
output[i].SerializeTo(sendbuf); output[i].SerializeTo(sendbuf);
} }
ctx->batched_size = 0; ctx->batched_size = 0;
} else {
output.SerializeTo(sendbuf);
}
} else if (result == RedisCommandHandler::CONTINUE) {
output.SerializeTo(sendbuf);
} else if (result == RedisCommandHandler::BATCHED) {
// just do nothing and wait handler to return OK.
} else { } else {
LOG(ERROR) << "unknown status=" << result; LOG(ERROR) << "unknown status=" << result;
return -1; return -1;
......
...@@ -238,6 +238,7 @@ public: ...@@ -238,6 +238,7 @@ public:
OK = 0, OK = 0,
CONTINUE = 1, CONTINUE = 1,
BATCHED = 2, BATCHED = 2,
BATCHED_DONE = 3,
}; };
~RedisCommandHandler() {} ~RedisCommandHandler() {}
...@@ -250,9 +251,9 @@ public: ...@@ -250,9 +251,9 @@ public:
// Read brpc/src/redis_reply.h for more usage. // Read brpc/src/redis_reply.h for more usage.
// `is_last' indicates whether the commands is the last command of this batch. If user // `is_last' indicates whether the commands is the last command of this batch. If user
// want to do some batch processing, user should buffer the command and return // want to do some batch processing, user should buffer the command and return
// RedisCommandHandler::BATCHED. Once `is_last' is true, run all the commands and // RedisCommandHandler::BATCHED. Once `is_last' is true, run all the commands,
// set `output' to be an array and set all the results to the corresponding element // set `output' to be an array in which every element is the result of batched
// of array. // commands and return RedisCommandHandler::BATCHED_DONE.
// //
// The return value should be RedisCommandHandler::OK for normal cases. If you want // The return value should be RedisCommandHandler::OK for normal cases. If you want
// to implement transaction, return RedisCommandHandler::CONTINUE once server receives // to implement transaction, return RedisCommandHandler::CONTINUE once server receives
......
...@@ -787,14 +787,6 @@ public: ...@@ -787,14 +787,6 @@ public:
brpc::RedisCommandHandler::Result OnBatched(const std::vector<const char*> args, brpc::RedisCommandHandler::Result OnBatched(const std::vector<const char*> args,
brpc::RedisReply* output, bool is_last) { brpc::RedisReply* output, bool is_last) {
if (_batched_command.empty() && is_last) {
if (strcmp(args[0], "set") == 0) {
DoSet(args[1], args[2], output);
} else if (strcmp(args[0], "get") == 0) {
DoGet(args[1], output);
}
return brpc::RedisCommandHandler::OK;
}
std::vector<std::string> comm; std::vector<std::string> comm;
for (int i = 0; i < (int)args.size(); ++i) { for (int i = 0; i < (int)args.size(); ++i) {
comm.push_back(args[i]); comm.push_back(args[i]);
...@@ -811,7 +803,7 @@ public: ...@@ -811,7 +803,7 @@ public:
} }
_batch_count++; _batch_count++;
_batched_command.clear(); _batched_command.clear();
return brpc::RedisCommandHandler::OK; return brpc::RedisCommandHandler::BATCHED_DONE;
} else { } else {
return brpc::RedisCommandHandler::BATCHED; return brpc::RedisCommandHandler::BATCHED;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment