Commit d35efefd authored by zhujiashun's avatar zhujiashun

redis_server_protocol: separate delete process

parent a281b1bd
......@@ -93,9 +93,7 @@ public:
RedisConnContext()
: handler_continue(NULL)
, message_count(0) {}
~RedisConnContext() {
ClearQueue(dones);
}
~RedisConnContext();
// @Destroyable
void Destroy();
......@@ -103,6 +101,7 @@ public:
// Push `done` to a queue which is read by Flush().
void Push(ConsumeTaskDone* done);
void Flush();
void ClearSentDones();
SocketId socket_id;
RedisService::CommandMap command_map;
......@@ -116,11 +115,14 @@ public:
butil::Arena arena;
int64_t message_count;
private:
void ClearQueue(std::queue<ConsumeTaskDone*>& queue);
void AddSentDone(ConsumeTaskDone* done);
bool _writing = false;
butil::Mutex _mutex;
std::queue<ConsumeTaskDone*> dones;
std::queue<ConsumeTaskDone*> _dones;
butil::Mutex _dones_sent_mutex;
std::queue<ConsumeTaskDone*> _dones_sent;
};
class ConsumeTaskDone : public google::protobuf::Closure {
......@@ -198,6 +200,15 @@ int Consume(void* ctx, bthread::TaskIterator<ConsumeTaskDone*>& iter) {
// ========== impl of RedisConnContext ==========
RedisConnContext::~RedisConnContext() {
ClearSentDones();
while (!_dones.empty()) {
ConsumeTaskDone* head = _dones.front();
_dones.pop();
delete head;
}
}
void RedisConnContext::Destroy() {
bthread::execution_queue_stop(queue);
}
......@@ -215,7 +226,7 @@ int RedisConnContext::Init() {
void RedisConnContext::Push(ConsumeTaskDone* done) {
std::unique_lock<butil::Mutex> m(_mutex);
dones.push(done);
_dones.push(done);
}
void RedisConnContext::Flush() {
SocketUniquePtr s;
......@@ -229,15 +240,14 @@ void RedisConnContext::Flush() {
_writing = true;
}
std::queue<ConsumeTaskDone*> ready_to_write;
std::queue<ConsumeTaskDone*> ready_to_delete;
butil::IOBuf buf;
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
while (true) {
std::unique_lock<butil::Mutex> m(_mutex);
while (!dones.empty() && dones.front()->IsReady()) {
ready_to_write.push(dones.front());
dones.pop();
while (!_dones.empty() && _dones.front()->IsReady()) {
ready_to_write.push(_dones.front());
_dones.pop();
}
if (ready_to_write.empty()) {
_writing = false;
......@@ -253,7 +263,7 @@ void RedisConnContext::Flush() {
ConsumeTaskDone* head = ready_to_write.front();
ready_to_write.pop();
buf.append(head->sendbuf);
ready_to_delete.push(head);
AddSentDone(head);
}
if ((int)buf.size() > FLAGS_redis_batch_flush_max_size) {
// In extreme cases, there are always tasks that are ready in every check
......@@ -265,16 +275,26 @@ void RedisConnContext::Flush() {
CHECK(buf.empty());
}
}
ClearQueue(ready_to_delete);
}
void RedisConnContext::ClearQueue(std::queue<ConsumeTaskDone*>& ready_to_delete) {
while (!ready_to_delete.empty()) {
ConsumeTaskDone* head = ready_to_delete.front();
ready_to_delete.pop();
void RedisConnContext::AddSentDone(ConsumeTaskDone* done) {
std::unique_lock<butil::Mutex> m(_dones_sent_mutex);
_dones_sent.push(done);
}
void RedisConnContext::ClearSentDones() {
std::queue<ConsumeTaskDone*> dones_sent;
{
std::unique_lock<butil::Mutex> m(_dones_sent_mutex);
_dones_sent.swap(dones_sent);
}
while (!dones_sent.empty()) {
ConsumeTaskDone* head = dones_sent.front();
dones_sent.pop();
delete head;
}
}
// ========== impl of RedisConnContext ==========
void ConsumeTaskDone::Run() {
......@@ -328,6 +348,7 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
LOG(ERROR) << "Fail to push execution queue";
return MakeParseError(PARSE_ERROR_NO_RESOURCE);
}
ctx->ClearSentDones();
done.release();
return MakeMessage(NULL);
} else {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment