Commit 22ad6a8d authored by zhujiashun's avatar zhujiashun

redis_server_protocol: disable transaction in batched mode

parent a1b98a76
...@@ -79,7 +79,6 @@ public: ...@@ -79,7 +79,6 @@ public:
int ConsumeCommand(RedisConnContext* ctx, int ConsumeCommand(RedisConnContext* ctx,
const std::vector<const char*>& commands, const std::vector<const char*>& commands,
const std::string& next_command,
bool flush_back, bool flush_back,
butil::IOBufAppender* appender) { butil::IOBufAppender* appender) {
RedisReply output(&ctx->arena); RedisReply output(&ctx->arena);
...@@ -99,15 +98,10 @@ int ConsumeCommand(RedisConnContext* ctx, ...@@ -99,15 +98,10 @@ int ConsumeCommand(RedisConnContext* ctx,
snprintf(buf, sizeof(buf), "ERR unknown command `%s`", commands[0]); snprintf(buf, sizeof(buf), "ERR unknown command `%s`", commands[0]);
output.SetError(buf); output.SetError(buf);
} else { } else {
RedisCommandHandler* next_ch =
ctx->redis_service->FindCommandHandler(next_command);
if (next_ch && next_ch->TransactionMarker()) {
flush_back = true;
}
result = ch->Run(commands, &output, flush_back); result = ch->Run(commands, &output, flush_back);
if (result == RedisCommandHandler::CONTINUE) { if (result == RedisCommandHandler::CONTINUE) {
if (ctx->batched_size != 0) { if (ctx->batched_size != 0) {
LOG(ERROR) << "Do you forget to return OK when flush_back is true?"; LOG(ERROR) << "CONTINUE should not be returned in a batched process.";
return -1; return -1;
} }
ctx->transaction_handler.reset(ch->NewTransactionHandler()); ctx->transaction_handler.reset(ch->NewTransactionHandler());
...@@ -183,14 +177,12 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket, ...@@ -183,14 +177,12 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
if (err != PARSE_OK) { if (err != PARSE_OK) {
break; break;
} }
// next_commands must have at least one element, otherwise parse.Consume() if (ConsumeCommand(ctx, current_commands, false, &appender) != 0) {
// should return error.
if (ConsumeCommand(ctx, current_commands, next_commands[0], false, &appender) != 0) {
return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG); return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
} }
current_commands.swap(next_commands); current_commands.swap(next_commands);
} }
if (ConsumeCommand(ctx, current_commands, "", if (ConsumeCommand(ctx, current_commands,
true /*must be the last message*/, &appender) != 0) { true /*must be the last message*/, &appender) != 0) {
return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG); return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
} }
......
...@@ -278,9 +278,6 @@ public: ...@@ -278,9 +278,6 @@ public:
// 5) An ending marker(exec) is found in transaction_handler.Run(), user exeuctes all // 5) An ending marker(exec) is found in transaction_handler.Run(), user exeuctes all
// the commands and return OK. This Transation is done. // the commands and return OK. This Transation is done.
virtual RedisCommandHandler* NewTransactionHandler(); virtual RedisCommandHandler* NewTransactionHandler();
// return true if a transaction is started when met this command.
virtual bool TransactionMarker() { return false; }
}; };
} // namespace brpc } // namespace brpc
......
...@@ -1042,9 +1042,6 @@ public: ...@@ -1042,9 +1042,6 @@ public:
RedisCommandHandler* NewTransactionHandler() override { RedisCommandHandler* NewTransactionHandler() override {
return new MultiTransactionHandler; return new MultiTransactionHandler;
} }
bool TransactionMarker() override {
return true;
}
class MultiTransactionHandler : public brpc::RedisCommandHandler { class MultiTransactionHandler : public brpc::RedisCommandHandler {
public: public:
...@@ -1189,20 +1186,12 @@ TEST_F(RedisTest, server_handle_pipeline) { ...@@ -1189,20 +1186,12 @@ TEST_F(RedisTest, server_handle_pipeline) {
ASSERT_TRUE(request.AddCommand("set key1 world")); ASSERT_TRUE(request.AddCommand("set key1 world"));
ASSERT_TRUE(request.AddCommand("set key2 world")); ASSERT_TRUE(request.AddCommand("set key2 world"));
ASSERT_TRUE(request.AddCommand("get key2")); ASSERT_TRUE(request.AddCommand("get key2"));
ASSERT_TRUE(request.AddCommand("multi"));
ASSERT_TRUE(request.AddCommand("incr key4"));
ASSERT_TRUE(request.AddCommand("exec"));
ASSERT_TRUE(request.AddCommand("set key1 v1"));
ASSERT_TRUE(request.AddCommand("set key2 v2"));
channel.CallMethod(NULL, &cntl, &request, &response, NULL); channel.CallMethod(NULL, &cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText(); ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(13, response.reply_size()); ASSERT_EQ(8, response.reply_size());
ASSERT_EQ(2, rsimpl->_batch_count); ASSERT_EQ(1, rsimpl->_batch_count);
ASSERT_TRUE(response.reply(7).is_string()); ASSERT_TRUE(response.reply(7).is_string());
ASSERT_STREQ(response.reply(7).c_str(), "world"); ASSERT_STREQ(response.reply(7).c_str(), "world");
ASSERT_TRUE(response.reply(10).is_array());
ASSERT_TRUE(response.reply(10)[0].is_integer());
ASSERT_EQ(response.reply(10)[0].integer(), 1);
} }
} //namespace } //namespace
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment