Commit 64d0bcea authored by zhujiashun's avatar zhujiashun

redis_server_protocol: combine buf and then write

parent bb09b153
......@@ -45,6 +45,8 @@ namespace policy {
DEFINE_bool(redis_verbose, false,
"[DEBUG] Print EVERY redis request/response");
DEFINE_int32(redis_batch_flush_max_size, 2048, "beyond which the server response"
" are forced to write to socket");
struct InputResponse : public InputMessageBase {
bthread_id_t id_wait;
......@@ -56,33 +58,27 @@ struct InputResponse : public InputMessageBase {
}
};
class RedisConnContext;
class ConsumeTaskDone : public google::protobuf::Closure {
public:
ConsumeTaskDone()
: _ready(false)
, output_message(&_arena) {}
void Run() override;
bool is_ready() { return _ready.load(butil::memory_order_acquire); }
private:
butil::atomic<bool> _ready;
butil::Arena _arena;
public:
RedisMessage output_message;
RedisConnContext* meta;
butil::IOBuf sendbuf;
};
const char** ParseArgs(const RedisMessage& message) {
const char** args = (const char**)
malloc(sizeof(const char*) * (message.size() + 1 /* NULL */));
for (size_t i = 0; i < message.size(); ++i) {
if (!message[i].is_string()) {
free(args);
return NULL;
}
args[i] = message[i].c_str();
}
args[message.size()] = NULL;
return args;
}
struct TaskContext {
RedisMessage message;
butil::Arena arena;
};
int Consume(void* meta, bthread::TaskIterator<TaskContext*>& iter);
// One redis command corresponding to one ConsumeTaskDone. Whenever user
// has completed the process of command and call done->Run()(read redis.h
// for more details), RedisConnContext::Flush() will be called and flush
// the response to client by the order that commands arrive.
class ConsumeTaskDone;
// This class plays role as parsing_context in socket.
class RedisConnContext : public SharedObject
, public Destroyable {
public:
......@@ -91,144 +87,73 @@ public:
CHECK(dones.empty());
}
// @Destroyable
void Destroy() {
bthread::execution_queue_stop(queue);
}
void Destroy();
int Init() {
bthread::ExecutionQueueOptions q_opt;
q_opt.bthread_attr =
FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL;
if (bthread::execution_queue_start(&queue, &q_opt, Consume, this) != 0) {
LOG(ERROR) << "Fail to start execution queue";
return -1;
}
return 0;
}
void Push(ConsumeTaskDone* done) {
std::unique_lock<butil::Mutex> m(_mutex);
dones.push(done);
}
void Flush() {
std::queue<ConsumeTaskDone*> ready_to_write;
SocketUniquePtr s;
if (Socket::Address(socket_id, &s) != 0) {
LOG(WARNING) << "Fail to address redis socket";
return;
}
{
std::unique_lock<butil::Mutex> m(_mutex);
if (_writing) return;
_writing = true;
}
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
std::queue<ConsumeTaskDone*> ready_to_delete;
while (true) {
std::unique_lock<butil::Mutex> m(_mutex);
while (!dones.empty() && dones.front()->is_ready()) {
ready_to_write.push(dones.front());
dones.pop();
}
if (ready_to_write.empty()) {
_writing = false;
break;
}
m.unlock();
while (!ready_to_write.empty()) {
ConsumeTaskDone* head = ready_to_write.front();
ready_to_write.pop();
LOG_IF(WARNING, s->Write(&head->sendbuf, &wopt) != 0)
<< "Fail to send redis reply";
ready_to_delete.push(head);
}
}
ClearQueue(ready_to_delete);
}
int Init();
void Push(ConsumeTaskDone* done);
void Flush();
SocketId socket_id;
RedisService::CommandMap command_map;
// If user starts a transaction, handler_continue indicates the
// first handler pointer that triggers the transaction.
RedisCommandHandler* handler_continue;
// The redis command are parsed and pushed into this queue
bthread::ExecutionQueueId<ConsumeTaskDone*> queue;
private:
void ClearQueue(std::queue<ConsumeTaskDone*>& queue) {
while (!queue.empty()) {
ConsumeTaskDone* head = queue.front();
queue.pop();
delete head;
}
}
void ClearQueue(std::queue<ConsumeTaskDone*>& queue);
bthread::ExecutionQueueId<TaskContext*> queue;
bool _writing = false;
butil::Mutex _mutex;
std::queue<ConsumeTaskDone*> dones;
};
int ConsumeTask(RedisConnContext* meta, const RedisMessage& m);
int Consume(void* meta, bthread::TaskIterator<TaskContext*>& iter) {
RedisConnContext* qmeta = static_cast<RedisConnContext*>(meta);
if (iter.is_queue_stopped()) {
qmeta->RemoveRefManually();
return 0;
}
for (; iter; ++iter) {
std::unique_ptr<TaskContext> ctx(*iter);
ConsumeTask(qmeta, ctx->message);
}
return 0;
}
class ConsumeTaskDone : public google::protobuf::Closure {
public:
ConsumeTaskDone()
: _ready(false)
, output_message(&arena) {}
const char** ParseArgs(const RedisMessage& message) {
const char** args = (const char**)
malloc(sizeof(const char*) * (message.size() + 1 /* NULL */));
for (size_t i = 0; i < message.size(); ++i) {
if (!message[i].is_string()) {
free(args);
return NULL;
}
args[i] = message[i].c_str();
}
args[message.size()] = NULL;
return args;
}
void Run() override;
bool IsReady() { return _ready.load(butil::memory_order_acquire); }
void ConsumeTaskDone::Run() {
butil::intrusive_ptr<RedisConnContext> delete_queue_meta(meta, false);
output_message.SerializeToIOBuf(&sendbuf);
_ready.store(true, butil::memory_order_release);
meta->Flush();
}
private:
butil::atomic<bool> _ready;
int ConsumeTask(RedisConnContext* meta, const RedisMessage& m) {
ConsumeTaskDone* done = new ConsumeTaskDone;
public:
RedisMessage input_message;
RedisMessage output_message;
RedisConnContext* ctx;
butil::IOBuf sendbuf;
butil::Arena arena;
};
int ConsumeTask(RedisConnContext* ctx, ConsumeTaskDone* done) {
ClosureGuard done_guard(done);
meta->Push(done);
meta->AddRefManually();
done->meta = meta;
done->ctx = ctx;
ctx->Push(done);
RedisMessage& output = done->output_message;
const char** args = ParseArgs(m);
const char** args = ParseArgs(done->input_message);
if (!args) {
output.SetError("ERR command not string");
return -1;
}
if (meta->handler_continue) {
RedisCommandHandler::Result result = meta->handler_continue->Run(
if (ctx->handler_continue) {
RedisCommandHandler::Result result = ctx->handler_continue->Run(
args, &output, done_guard.release());
if (result == RedisCommandHandler::OK) {
meta->handler_continue = NULL;
ctx->handler_continue = NULL;
}
} else {
std::string comm;
comm.reserve(8);
for (const char* c = m[0].c_str(); *c; ++c) {
for (const char* c = done->input_message[0].c_str(); *c; ++c) {
comm.push_back(std::tolower(*c));
}
auto it = meta->command_map.find(comm);
if (it == meta->command_map.end()) {
auto it = ctx->command_map.find(comm);
if (it == ctx->command_map.end()) {
char buf[64];
snprintf(buf, sizeof(buf), "ERR unknown command `%s`", comm.c_str());
output.SetError(buf);
......@@ -236,7 +161,7 @@ int ConsumeTask(RedisConnContext* meta, const RedisMessage& m) {
RedisCommandHandler::Result result =
it->second->Run(args, &output, done_guard.release());
if (result == RedisCommandHandler::CONTINUE) {
meta->handler_continue = it->second.get();
ctx->handler_continue = it->second.get();
}
}
}
......@@ -244,6 +169,104 @@ int ConsumeTask(RedisConnContext* meta, const RedisMessage& m) {
return 0;
}
int Consume(void* ctx, bthread::TaskIterator<ConsumeTaskDone*>& iter) {
RedisConnContext* qctx = static_cast<RedisConnContext*>(ctx);
if (iter.is_queue_stopped()) {
qctx->RemoveRefManually();
return 0;
}
for (; iter; ++iter) {
ConsumeTask(qctx, *iter);
}
return 0;
}
// ========== impl of RedisConnContext ==========
void RedisConnContext::Destroy() {
bthread::execution_queue_stop(queue);
}
int RedisConnContext::Init() {
bthread::ExecutionQueueOptions q_opt;
q_opt.bthread_attr =
FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL;
if (bthread::execution_queue_start(&queue, &q_opt, Consume, this) != 0) {
LOG(ERROR) << "Fail to start execution queue";
return -1;
}
return 0;
}
void RedisConnContext::Push(ConsumeTaskDone* done) {
std::unique_lock<butil::Mutex> m(_mutex);
dones.push(done);
}
void RedisConnContext::Flush() {
SocketUniquePtr s;
if (Socket::Address(socket_id, &s) != 0) {
LOG(WARNING) << "Fail to address redis socket";
return;
}
{
std::unique_lock<butil::Mutex> m(_mutex);
if (_writing) return;
_writing = true;
}
std::queue<ConsumeTaskDone*> ready_to_write;
std::queue<ConsumeTaskDone*> ready_to_delete;
butil::IOBuf buf;
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
while (true) {
std::unique_lock<butil::Mutex> m(_mutex);
while (!dones.empty() && dones.front()->IsReady()) {
ready_to_write.push(dones.front());
dones.pop();
}
if (ready_to_write.empty()) {
_writing = false;
if (!buf.empty()) {
LOG_IF(WARNING, s->Write(&buf, &wopt) != 0)
<< "Fail to send redis reply";
}
break;
}
m.unlock();
while (!ready_to_write.empty()) {
ConsumeTaskDone* head = ready_to_write.front();
ready_to_write.pop();
buf.append(head->sendbuf);
ready_to_delete.push(head);
}
if ((int)buf.size() > FLAGS_redis_batch_flush_max_size) {
LOG_IF(WARNING, s->Write(&buf, &wopt) != 0)
<< "Fail to send redis reply";
CHECK(buf.empty());
}
}
ClearQueue(ready_to_delete);
}
void RedisConnContext::ClearQueue(std::queue<ConsumeTaskDone*>& queue) {
while (!queue.empty()) {
ConsumeTaskDone* head = queue.front();
queue.pop();
delete head;
}
}
// ========== impl of RedisConnContext ==========
void ConsumeTaskDone::Run() {
butil::intrusive_ptr<RedisConnContext> delete_ctx(ctx, false);
output_message.SerializeToIOBuf(&sendbuf);
_ready.store(true, butil::memory_order_release);
ctx->Flush();
// After Flush(), this object may be deleted and should never be
// touched.
}
ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
bool read_eof, const void* arg) {
if (read_eof || source->empty()) {
......@@ -258,7 +281,7 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
RedisConnContext* ctx = static_cast<RedisConnContext*>(socket->parsing_context());
if (ctx == NULL) {
ctx = new RedisConnContext;
// add ref for Consume()
// add ref that removed in Consume()
ctx->AddRefManually();
ctx->socket_id = socket->id();
rs->CloneCommandMap(&ctx->command_map);
......@@ -269,16 +292,19 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
}
socket->reset_parsing_context(ctx);
}
std::unique_ptr<TaskContext> task_ctx(new TaskContext);
ParseError err = task_ctx->message.ConsumePartialIOBuf(*source, &task_ctx->arena);
std::unique_ptr<ConsumeTaskDone> done(new ConsumeTaskDone);
ParseError err = done->input_message.ConsumePartialIOBuf(*source, &done->arena);
if (err != PARSE_OK) {
return MakeParseError(err);
}
if (bthread::execution_queue_execute(ctx->queue, task_ctx.get()) != 0) {
// Add a ref that removed in ConsumeTaskDone::Run
ctx->AddRefManually();
if (bthread::execution_queue_execute(ctx->queue, done.get()) != 0) {
ctx->RemoveRefManually();
LOG(ERROR) << "Fail to push execution queue";
return MakeParseError(PARSE_ERROR_NO_RESOURCE);
}
task_ctx.release();
done.release();
return MakeMessage(NULL);
} else {
// NOTE(gejun): PopPipelinedInfo() is actually more contended than what
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment