Commit a1b98a76 authored by zhujiashun's avatar zhujiashun

redis_server_protocol: fix bug that arena is local

parent 627526f0
......@@ -74,15 +74,15 @@ public:
int batched_size;
RedisCommandParser parser;
butil::Arena arena;
};
int ConsumeCommand(RedisConnContext* ctx,
const std::vector<const char*>& commands,
const std::string& next_command,
butil::Arena* arena,
bool flush_back,
butil::IOBufAppender* appender) {
RedisReply output(arena);
RedisReply output(&ctx->arena);
RedisCommandHandler::Result result = RedisCommandHandler::OK;
if (ctx->transaction_handler) {
result = ctx->transaction_handler->Run(commands, &output, flush_back);
......@@ -169,31 +169,29 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
ctx->redis_service = rs;
socket->reset_parsing_context(ctx);
}
butil::Arena arena;
std::vector<const char*> current_commands;
butil::IOBufAppender appender;
ParseError err = PARSE_OK;
err = ctx->parser.Consume(*source, &current_commands, &arena);
err = ctx->parser.Consume(*source, &current_commands, &ctx->arena);
if (err != PARSE_OK) {
return MakeParseError(err);
}
while (true) {
std::vector<const char*> next_commands;
err = ctx->parser.Consume(*source, &next_commands, &arena);
err = ctx->parser.Consume(*source, &next_commands, &ctx->arena);
if (err != PARSE_OK) {
break;
}
// next_commands must have at least one element, otherwise parse.Consume()
// should return error.
if (ConsumeCommand(ctx, current_commands, next_commands[0], &arena,
false, &appender) != 0) {
if (ConsumeCommand(ctx, current_commands, next_commands[0], false, &appender) != 0) {
return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
}
current_commands.swap(next_commands);
}
if (ConsumeCommand(ctx, current_commands, "", &arena,
true /* must be last message */, &appender) != 0) {
if (ConsumeCommand(ctx, current_commands, "",
true /*must be the last message*/, &appender) != 0) {
return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
}
butil::IOBuf sendbuf;
......@@ -203,6 +201,7 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
wopt.ignore_eovercrowded = true;
LOG_IF(WARNING, socket->Write(&sendbuf, &wopt) != 0)
<< "Fail to send redis reply";
ctx->arena.clear();
return MakeParseError(err);
} else {
// NOTE(gejun): PopPipelinedInfo() is actually more contended than what
......
......@@ -345,9 +345,10 @@ butil::Status RedisCommandByComponents(butil::IOBuf* output,
return butil::Status::OK();
}
RedisCommandParser::RedisCommandParser() {
Reset();
}
RedisCommandParser::RedisCommandParser()
: _parsing_array(false)
, _length(0)
, _index(0) {}
ParseError RedisCommandParser::Consume(butil::IOBuf& buf,
std::vector<const char*>* commands,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment