Commit 5301f739 authored by zhujiashun's avatar zhujiashun

redis_server_protocol: fix bug in long message

parent cb9e8c97
...@@ -47,6 +47,9 @@ DEFINE_bool(redis_verbose, false, ...@@ -47,6 +47,9 @@ DEFINE_bool(redis_verbose, false,
"[DEBUG] Print EVERY redis request/response"); "[DEBUG] Print EVERY redis request/response");
DEFINE_int32(redis_batch_flush_max_size, 2048, "beyond which the server response" DEFINE_int32(redis_batch_flush_max_size, 2048, "beyond which the server response"
" are forced to write to socket"); " are forced to write to socket");
DEFINE_int32(redis_max_request_count_before_clear_arena, 10000, "If the number of "
"incoming requests has reached multiple of this value, arena from which the "
"requests are allocated will be cleared.");
struct InputResponse : public InputMessageBase { struct InputResponse : public InputMessageBase {
bthread_id_t id_wait; bthread_id_t id_wait;
...@@ -59,10 +62,15 @@ struct InputResponse : public InputMessageBase { ...@@ -59,10 +62,15 @@ struct InputResponse : public InputMessageBase {
}; };
static const char** ParseArgs(const RedisReply& message) { static const char** ParseArgs(const RedisReply& message) {
if (!message.is_array() || message.size() == 0) {
LOG(WARNING) << "request message is not array or size equals to zero";
return NULL;
}
const char** args = (const char**) const char** args = (const char**)
malloc(sizeof(const char*) * (message.size() + 1 /* NULL */)); malloc(sizeof(const char*) * (message.size() + 1 /* NULL */));
for (size_t i = 0; i < message.size(); ++i) { for (size_t i = 0; i < message.size(); ++i) {
if (!message[i].is_string()) { if (!message[i].is_string()) {
LOG(WARNING) << "request message[" << i << "] is not array";
free(args); free(args);
return NULL; return NULL;
} }
...@@ -82,7 +90,9 @@ class ConsumeTaskDone; ...@@ -82,7 +90,9 @@ class ConsumeTaskDone;
class RedisConnContext : public SharedObject class RedisConnContext : public SharedObject
, public Destroyable { , public Destroyable {
public: public:
RedisConnContext() : handler_continue(NULL) {} RedisConnContext()
: handler_continue(NULL)
, message_count(0) {}
~RedisConnContext() { ~RedisConnContext() {
ClearQueue(dones); ClearQueue(dones);
} }
...@@ -102,6 +112,9 @@ public: ...@@ -102,6 +112,9 @@ public:
// The redis command are parsed and pushed into this queue // The redis command are parsed and pushed into this queue
bthread::ExecutionQueueId<ConsumeTaskDone*> queue; bthread::ExecutionQueueId<ConsumeTaskDone*> queue;
RedisReply parsing_message;
butil::Arena arena;
int64_t message_count;
private: private:
void ClearQueue(std::queue<ConsumeTaskDone*>& queue); void ClearQueue(std::queue<ConsumeTaskDone*>& queue);
...@@ -255,10 +268,10 @@ void RedisConnContext::Flush() { ...@@ -255,10 +268,10 @@ void RedisConnContext::Flush() {
ClearQueue(ready_to_delete); ClearQueue(ready_to_delete);
} }
void RedisConnContext::ClearQueue(std::queue<ConsumeTaskDone*>& queue) { void RedisConnContext::ClearQueue(std::queue<ConsumeTaskDone*>& ready_to_delete) {
while (!queue.empty()) { while (!ready_to_delete.empty()) {
ConsumeTaskDone* head = queue.front(); ConsumeTaskDone* head = ready_to_delete.front();
queue.pop(); ready_to_delete.pop();
delete head; delete head;
} }
} }
...@@ -298,11 +311,16 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket, ...@@ -298,11 +311,16 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
} }
socket->reset_parsing_context(ctx); socket->reset_parsing_context(ctx);
} }
std::unique_ptr<ConsumeTaskDone> done(new ConsumeTaskDone); ParseError err = ctx->parsing_message.ConsumePartialIOBuf(*source, &ctx->arena);
ParseError err = done->input_message.ConsumePartialIOBuf(*source, &done->arena);
if (err != PARSE_OK) { if (err != PARSE_OK) {
return MakeParseError(err); return MakeParseError(err);
} }
std::unique_ptr<ConsumeTaskDone> done(new ConsumeTaskDone);
done->input_message.CopyFromDifferentArena(ctx->parsing_message, &done->arena);
ctx->parsing_message.Clear();
if ((++ctx->message_count % FLAGS_redis_max_request_count_before_clear_arena) == 0) {
ctx->arena.clear();
}
// Add a ref that removed in ConsumeTaskDone::Run // Add a ref that removed in ConsumeTaskDone::Run
ctx->AddRefManually(); ctx->AddRefManually();
if (bthread::execution_queue_execute(ctx->queue, done.get()) != 0) { if (bthread::execution_queue_execute(ctx->queue, done.get()) != 0) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment