Commit eee11d90 authored by zhujiashun's avatar zhujiashun

redis_server_protocol: revert BATCH_DONE

parent 8c13ae4c
...@@ -88,10 +88,8 @@ int ConsumeCommand(RedisConnContext* ctx, ...@@ -88,10 +88,8 @@ int ConsumeCommand(RedisConnContext* ctx,
result = ctx->transaction_handler->Run(commands, &output, is_last); result = ctx->transaction_handler->Run(commands, &output, is_last);
if (result == RedisCommandHandler::OK) { if (result == RedisCommandHandler::OK) {
ctx->transaction_handler.reset(NULL); ctx->transaction_handler.reset(NULL);
} else if (result == RedisCommandHandler::BATCHED || } else if (result == RedisCommandHandler::BATCHED) {
result == RedisCommandHandler::BATCHED_DONE) { LOG(ERROR) << "BATCHED should not be returned by a transaction handler.";
LOG(ERROR) << "BATCHED/BATCHED_DONE should not be returned by a "
" transaction handler.";
return -1; return -1;
} }
} else { } else {
...@@ -109,7 +107,7 @@ int ConsumeCommand(RedisConnContext* ctx, ...@@ -109,7 +107,7 @@ int ConsumeCommand(RedisConnContext* ctx,
result = ch->Run(commands, &output, is_last); result = ch->Run(commands, &output, is_last);
if (result == RedisCommandHandler::CONTINUE) { if (result == RedisCommandHandler::CONTINUE) {
if (ctx->batched_size != 0) { if (ctx->batched_size != 0) {
LOG(ERROR) << "Do you forget to return BATCHED_DONE when is_last is true?"; LOG(ERROR) << "Do you forget to return OK when is_last is true?";
return -1; return -1;
} }
ctx->transaction_handler.reset(ch->NewTransactionHandler()); ctx->transaction_handler.reset(ch->NewTransactionHandler());
...@@ -118,20 +116,24 @@ int ConsumeCommand(RedisConnContext* ctx, ...@@ -118,20 +116,24 @@ int ConsumeCommand(RedisConnContext* ctx,
} }
} }
} }
if (result == RedisCommandHandler::OK || result == RedisCommandHandler::CONTINUE) { if (result == RedisCommandHandler::OK) {
if (ctx->batched_size) {
if ((int)output.size() != (ctx->batched_size + 1)) {
LOG(ERROR) << "reply array size can't be matched with batched size, "
<< " expected=" << ctx->batched_size + 1 << " actual=" << output.size();
return -1;
}
for (int i = 0; i < (int)output.size(); ++i) {
output[i].SerializeTo(sendbuf);
}
ctx->batched_size = 0;
} else {
output.SerializeTo(sendbuf);
}
} else if (result == RedisCommandHandler::CONTINUE) {
output.SerializeTo(sendbuf); output.SerializeTo(sendbuf);
} else if (result == RedisCommandHandler::BATCHED) { } else if (result == RedisCommandHandler::BATCHED) {
// just do nothing and wait handler to return BATCHED_DONE. // just do nothing and wait handler to return OK.
} else if (result == RedisCommandHandler::BATCHED_DONE) {
if ((int)output.size() != (ctx->batched_size + 1)) {
LOG(ERROR) << "reply array size can't be matched with batched size, "
<< " expected=" << ctx->batched_size + 1 << " actual=" << output.size();
return -1;
}
for (int i = 0; i < (int)output.size(); ++i) {
output[i].SerializeTo(sendbuf);
}
ctx->batched_size = 0;
} else { } else {
LOG(ERROR) << "unknown status=" << result; LOG(ERROR) << "unknown status=" << result;
return -1; return -1;
......
...@@ -238,7 +238,6 @@ public: ...@@ -238,7 +238,6 @@ public:
OK = 0, OK = 0,
CONTINUE = 1, CONTINUE = 1,
BATCHED = 2, BATCHED = 2,
BATCHED_DONE = 3,
}; };
~RedisCommandHandler() {} ~RedisCommandHandler() {}
...@@ -253,13 +252,13 @@ public: ...@@ -253,13 +252,13 @@ public:
// want to do some batch processing, user should buffer the command and return // want to do some batch processing, user should buffer the command and return
// RedisCommandHandler::BATCHED. Once `is_last' is true, run all the commands, // RedisCommandHandler::BATCHED. Once `is_last' is true, run all the commands,
// set `output' to be an array in which every element is the result of batched // set `output' to be an array in which every element is the result of batched
// commands and return RedisCommandHandler::BATCHED_DONE. // commands and return RedisCommandHandler::OK.
// //
// The return value should be RedisCommandHandler::OK for normal cases. If you want // The return value should be RedisCommandHandler::OK for normal cases. If you want
// to implement transaction, return RedisCommandHandler::CONTINUE once server receives // to implement transaction, return RedisCommandHandler::CONTINUE once server receives
// an start marker and brpc will call MultiTransactionHandler() to new a transaction // an start marker and brpc will call MultiTransactionHandler() to new a transaction
// handler that all the following commands are sent to this tranction handler until // handler that all the following commands are sent to this tranction handler until
// it returns Result::OK. Read the comment below. // it returns RedisCommandHandler::OK. Read the comment below.
virtual RedisCommandHandler::Result Run(const std::vector<const char*>& args, virtual RedisCommandHandler::Result Run(const std::vector<const char*>& args,
brpc::RedisReply* output, brpc::RedisReply* output,
bool is_last) = 0; bool is_last) = 0;
...@@ -270,12 +269,12 @@ public: ...@@ -270,12 +269,12 @@ public:
// exec": // exec":
// 1) First command is "multi" and Run() should return RedisCommandHandler::CONTINUE, // 1) First command is "multi" and Run() should return RedisCommandHandler::CONTINUE,
// then brpc calls NewTransactionHandler() to new a transaction_handler. // then brpc calls NewTransactionHandler() to new a transaction_handler.
// 2) brpc calls transaction_handler.Run() with command "set k1 v1", which should return // 2) brpc calls transaction_handler.Run() with command "set k1 v1",
// CONTINUE. // which should return CONTINUE.
// 3) brpc calls transaction_handler.Run() with command "set k2 v2", which should return // 3) brpc calls transaction_handler.Run() with command "set k2 v2",
// CONTINUE. // which should return CONTINUE.
// 4) brpc calls transaction_handler.Run() with command "set k3 v3", which should return // 4) brpc calls transaction_handler.Run() with command "set k3 v3",
// CONTINUE. // which should return CONTINUE.
// 5) An ending marker(exec) is found in transaction_handler.Run(), user exeuctes all // 5) An ending marker(exec) is found in transaction_handler.Run(), user exeuctes all
// the commands and return OK. This Transation is done. // the commands and return OK. This Transation is done.
virtual RedisCommandHandler* NewTransactionHandler(); virtual RedisCommandHandler* NewTransactionHandler();
......
...@@ -787,6 +787,14 @@ public: ...@@ -787,6 +787,14 @@ public:
brpc::RedisCommandHandler::Result OnBatched(const std::vector<const char*> args, brpc::RedisCommandHandler::Result OnBatched(const std::vector<const char*> args,
brpc::RedisReply* output, bool is_last) { brpc::RedisReply* output, bool is_last) {
if (_batched_command.empty() && is_last) {
if (strcmp(args[0], "set") == 0) {
DoSet(args[1], args[2], output);
} else if (strcmp(args[0], "get") == 0) {
DoGet(args[1], output);
}
return brpc::RedisCommandHandler::OK;
}
std::vector<std::string> comm; std::vector<std::string> comm;
for (int i = 0; i < (int)args.size(); ++i) { for (int i = 0; i < (int)args.size(); ++i) {
comm.push_back(args[i]); comm.push_back(args[i]);
...@@ -803,7 +811,7 @@ public: ...@@ -803,7 +811,7 @@ public:
} }
_batch_count++; _batch_count++;
_batched_command.clear(); _batched_command.clear();
return brpc::RedisCommandHandler::BATCHED_DONE; return brpc::RedisCommandHandler::OK;
} else { } else {
return brpc::RedisCommandHandler::BATCHED; return brpc::RedisCommandHandler::BATCHED;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment