Commit 26ff9d47 authored by zhujiashun's avatar zhujiashun

redis_server_protocol: add comments

parent f2444e57
...@@ -58,7 +58,7 @@ struct InputResponse : public InputMessageBase { ...@@ -58,7 +58,7 @@ struct InputResponse : public InputMessageBase {
} }
}; };
const char** ParseArgs(const RedisReply& message) { static const char** ParseArgs(const RedisReply& message) {
const char** args = (const char**) const char** args = (const char**)
malloc(sizeof(const char*) * (message.size() + 1 /* NULL */)); malloc(sizeof(const char*) * (message.size() + 1 /* NULL */));
for (size_t i = 0; i < message.size(); ++i) { for (size_t i = 0; i < message.size(); ++i) {
...@@ -73,12 +73,12 @@ const char** ParseArgs(const RedisReply& message) { ...@@ -73,12 +73,12 @@ const char** ParseArgs(const RedisReply& message) {
} }
// One redis command corresponding to one ConsumeTaskDone. Whenever user // One redis command corresponding to one ConsumeTaskDone. Whenever user
// has completed the process of command and call done->Run()(read redis.h // has completed the process of handling command and call done->Run()
// for more details), RedisConnContext::Flush() will be called and flush // (read redis.h for more details), RedisConnContext::Flush() will be
// the response to client by the order that commands arrive. // called and flush the response to client by the order that commands arrive.
class ConsumeTaskDone; class ConsumeTaskDone;
// This class plays role as parsing_context in socket. // This class is as parsing_context in socket.
class RedisConnContext : public SharedObject class RedisConnContext : public SharedObject
, public Destroyable { , public Destroyable {
public: public:
...@@ -90,6 +90,7 @@ public: ...@@ -90,6 +90,7 @@ public:
void Destroy(); void Destroy();
int Init(); int Init();
// Push `done` to a queue which is read by Flush().
void Push(ConsumeTaskDone* done); void Push(ConsumeTaskDone* done);
void Flush(); void Flush();
...@@ -113,7 +114,8 @@ class ConsumeTaskDone : public google::protobuf::Closure { ...@@ -113,7 +114,8 @@ class ConsumeTaskDone : public google::protobuf::Closure {
public: public:
ConsumeTaskDone() ConsumeTaskDone()
: _ready(false) : _ready(false)
, output_message(&arena) {} , output_message(&arena)
, ctx(NULL) {}
void Run() override; void Run() override;
bool IsReady() { return _ready.load(butil::memory_order_acquire); } bool IsReady() { return _ready.load(butil::memory_order_acquire); }
...@@ -241,6 +243,10 @@ void RedisConnContext::Flush() { ...@@ -241,6 +243,10 @@ void RedisConnContext::Flush() {
ready_to_delete.push(head); ready_to_delete.push(head);
} }
if ((int)buf.size() > FLAGS_redis_batch_flush_max_size) { if ((int)buf.size() > FLAGS_redis_batch_flush_max_size) {
// In extreme cases, there are always tasks that are ready in every check
// loop and the buf size continues to grow, then we will never have chance
// to write the buffer. To solve this issue, just add a limit to the maximum
// size of buf.
LOG_IF(WARNING, s->Write(&buf, &wopt) != 0) LOG_IF(WARNING, s->Write(&buf, &wopt) != 0)
<< "Fail to send redis reply"; << "Fail to send redis reply";
CHECK(buf.empty()); CHECK(buf.empty());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment