Commit c7b63da2 authored by zhujiashun's avatar zhujiashun

redis_server_protocol: refine code

parent bace6cca
...@@ -62,24 +62,18 @@ class ConsumeTaskDone : public google::protobuf::Closure { ...@@ -62,24 +62,18 @@ class ConsumeTaskDone : public google::protobuf::Closure {
public: public:
ConsumeTaskDone() ConsumeTaskDone()
: _ready(false) : _ready(false)
, in_transaction(false) , output_message(&_arena) {}
, output_message(&arena) {}
void Run() override; void Run() override;
bool is_ready() { return _ready.load(butil::memory_order_acquire); }
bool is_ready() { return _ready; }
private: private:
bool _ready; butil::atomic<bool> _ready;
butil::Arena _arena;
public: public:
butil::Arena arena;
RedisCommandHandler::Result result;
bool in_transaction;
RedisMessage input_message;
RedisMessage output_message; RedisMessage output_message;
QueueMeta* meta; QueueMeta* meta;
std::string command_name;
butil::IOBuf sendbuf; butil::IOBuf sendbuf;
}; };
...@@ -89,7 +83,7 @@ public: ...@@ -89,7 +83,7 @@ public:
QueueMeta() : handler_continue(NULL) {} QueueMeta() : handler_continue(NULL) {}
void Push(ConsumeTaskDone* done) { void Push(ConsumeTaskDone* done) {
std::unique_lock<butil::Mutex> m(mutex); std::unique_lock<butil::Mutex> m(_mutex);
dones.push(done); dones.push(done);
} }
void Flush() { void Flush() {
...@@ -102,19 +96,20 @@ public: ...@@ -102,19 +96,20 @@ public:
Socket::WriteOptions wopt; Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true; wopt.ignore_eovercrowded = true;
{ {
std::unique_lock<butil::Mutex> m(mutex); std::unique_lock<butil::Mutex> m(_mutex);
if (_writing) return; if (_writing) return;
_writing = true; _writing = true;
} }
std::queue<ConsumeTaskDone*> ready_to_delete;
while (true) { while (true) {
std::unique_lock<butil::Mutex> m(mutex); std::unique_lock<butil::Mutex> m(_mutex);
while (!dones.empty() && dones.front()->is_ready()) { while (!dones.empty() && dones.front()->is_ready()) {
ready_to_write.push(dones.front()); ready_to_write.push(dones.front());
dones.pop(); dones.pop();
} }
if (ready_to_write.empty()) { if (ready_to_write.empty()) {
_writing = false; _writing = false;
return; break;
} }
m.unlock(); m.unlock();
...@@ -123,25 +118,24 @@ public: ...@@ -123,25 +118,24 @@ public:
ready_to_write.pop(); ready_to_write.pop();
LOG_IF(WARNING, s->Write(&head->sendbuf, &wopt) != 0) LOG_IF(WARNING, s->Write(&head->sendbuf, &wopt) != 0)
<< "Fail to send redis reply"; << "Fail to send redis reply";
delete head; ready_to_delete.push(head);
} }
} }
while (!ready_to_delete.empty()) {
ConsumeTaskDone* head = ready_to_delete.front();
ready_to_delete.pop();
delete head;
}
} }
SocketId socket_id; SocketId socket_id;
RedisService::CommandMap command_map; RedisService::CommandMap command_map;
RedisCommandHandler* handler_continue; RedisCommandHandler* handler_continue;
// queue to buffer commands and execute these commands together and atomicly
// used to implement command like 'MULTI'.
std::vector<const char**> command_queue;
// used to allocate memory for queued command;
butil::Arena arena;
// command that trigger commands to execute atomicly.
std::string queue_command_name;
std::queue<ConsumeTaskDone*> dones; std::queue<ConsumeTaskDone*> dones;
private:
bool _writing = false; bool _writing = false;
butil::Mutex mutex; butil::Mutex _mutex;
}; };
struct TaskContext { struct TaskContext {
...@@ -165,54 +159,8 @@ const char** ParseArgs(const RedisMessage& message) { ...@@ -165,54 +159,8 @@ const char** ParseArgs(const RedisMessage& message) {
void ConsumeTaskDone::Run() { void ConsumeTaskDone::Run() {
butil::intrusive_ptr<QueueMeta> delete_queue_meta(meta, false); butil::intrusive_ptr<QueueMeta> delete_queue_meta(meta, false);
/*
char buf[64];
if (result.is_continue()) {
if (meta->handler_continue) {
// This command is not first and should be queued.
if (command_name == meta->queue_command_name) {
snprintf(buf, sizeof(buf),
"ERR %s calls can not be nested", command_name.c_str());
output_message.set_error(buf);
} else {
RedisMessage copyed;
copyed.CopyFromDifferentArena(input_message, &meta->arena);
const char** args = ParseArgs(copyed);
CHECK(args != NULL);
meta->command_queue.push_back(args);
output_message.set_status("QUEUED");
}
} else {
// First command that return RedisCommandHandler::REDIS_COMMAND_CONTINUE
// should not be pushed into queue, since it is always a marker.
meta->handler_continue = last_handler;
meta->queue_command_name = command_name;
output_message.set_status("OK");
}
} else if (result.is_ok()) {
if (in_transaction && meta->handler_continue) {
RedisCommandHandler* handler = meta->handler_continue;
meta->handler_continue = NULL;
meta->queue_command_name.clear();
delete_queue_meta.detach();
handler->RunTransaction(meta->command_queue, &output_message, &result, this);
meta->arena.clear();
for (size_t i = 0; i < meta->command_queue.size(); ++i) {
free(meta->command_queue[i]);
}
meta->command_queue.clear();
return;
}
} else {
meta->handler_continue = NULL;
meta->queue_command_name.clear();
LOG(ERROR) << "unknown redis command result";
output_message.set_error("internal server error");
}
*/
output_message.SerializeToIOBuf(&sendbuf); output_message.SerializeToIOBuf(&sendbuf);
//TODO: add fence _ready.store(true, butil::memory_order_release);
_ready = true;
meta->Flush(); meta->Flush();
} }
...@@ -223,29 +171,29 @@ int ConsumeTask(QueueMeta* meta, const RedisMessage& m) { ...@@ -223,29 +171,29 @@ int ConsumeTask(QueueMeta* meta, const RedisMessage& m) {
meta->AddRefManually(); meta->AddRefManually();
done->meta = meta; done->meta = meta;
RedisMessage& output = done->output_message; RedisMessage& output = done->output_message;
done->input_message.CopyFromDifferentArena(m, &done->arena);
char buf[64];
const char** args = ParseArgs(done->input_message); const char** args = ParseArgs(m);
if (!args) { if (!args) {
output.set_error("ERR command not string"); output.set_error("ERR command not string");
return -1; return -1;
} }
std::string comm;
comm.reserve(8);
for (const char* c = done->input_message[0].c_str(); *c; ++c) {
comm.push_back(std::tolower(*c));
}
done->command_name = comm;
if (meta->handler_continue) { if (meta->handler_continue) {
RedisCommandHandler::Result result = meta->handler_continue->Run( RedisCommandHandler::Result result = meta->handler_continue->Run(
args, &output, done_guard.release()); args, &output, done_guard.release());
if (result == RedisCommandHandler::OK) { if (result == RedisCommandHandler::OK) {
meta->handler_continue = NULL; meta->handler_continue = NULL;
} else {
LOG(ERROR) << "Unknown handler result=" << (int)result;
} }
} else { } else {
std::string comm;
comm.reserve(8);
for (const char* c = m[0].c_str(); *c; ++c) {
comm.push_back(std::tolower(*c));
}
auto it = meta->command_map.find(comm); auto it = meta->command_map.find(comm);
if (it == meta->command_map.end()) { if (it == meta->command_map.end()) {
char buf[64];
snprintf(buf, sizeof(buf), "ERR unknown command `%s`", comm.c_str()); snprintf(buf, sizeof(buf), "ERR unknown command `%s`", comm.c_str());
output.set_error(buf); output.set_error(buf);
} else { } else {
...@@ -253,6 +201,8 @@ int ConsumeTask(QueueMeta* meta, const RedisMessage& m) { ...@@ -253,6 +201,8 @@ int ConsumeTask(QueueMeta* meta, const RedisMessage& m) {
it->second->Run(args, &output, done_guard.release()); it->second->Run(args, &output, done_guard.release());
if (result == RedisCommandHandler::CONTINUE) { if (result == RedisCommandHandler::CONTINUE) {
meta->handler_continue = it->second.get(); meta->handler_continue = it->second.get();
} else {
LOG(ERROR) << "Unknown handler result=" << (int)result;
} }
} }
} }
......
...@@ -401,7 +401,7 @@ ParseError RedisResponse::ConsumePartialIOBuf(butil::IOBuf& buf, int reply_count ...@@ -401,7 +401,7 @@ ParseError RedisResponse::ConsumePartialIOBuf(butil::IOBuf& buf, int reply_count
return PARSE_ERROR_ABSOLUTELY_WRONG; return PARSE_ERROR_ABSOLUTELY_WRONG;
} }
for (int i = 0; i < reply_count - 1; ++i) { for (int i = 0; i < reply_count - 1; ++i) {
new (&_other_replies[i]) RedisMessage; new (&_other_replies[i]) RedisMessage(NULL);
} }
} }
for (int i = reply_size(); i < reply_count; ++i) { for (int i = reply_size(); i < reply_count; ++i) {
......
...@@ -51,9 +51,9 @@ else() ...@@ -51,9 +51,9 @@ else()
message(FATAL_ERROR "Googletest is not available") message(FATAL_ERROR "Googletest is not available")
endif() endif()
set(CMAKE_CPP_FLAGS "${DEFINE_CLOCK_GETTIME} -DBRPC_WITH_GLOG=${WITH_GLOG_VAL} -DGFLAGS_NS=${GFLAGS_NS}") set(CMAKE_CPP_FLAGS "${DEFINE_CLOCK_GETTIME} -DBRPC_ENABLE_CPU_PROFILER -DBRPC_WITH_GLOG=${WITH_GLOG_VAL} -DGFLAGS_NS=${GFLAGS_NS}")
set(CMAKE_CPP_FLAGS "${CMAKE_CPP_FLAGS} -DBTHREAD_USE_FAST_PTHREAD_MUTEX -D__const__= -D_GNU_SOURCE -DUSE_SYMBOLIZE -DNO_TCMALLOC -D__STDC_FORMAT_MACROS -D__STDC_LIMIT_MACROS -D__STDC_CONSTANT_MACROS -DUNIT_TEST -Dprivate=public -Dprotected=public -DBVAR_NOT_LINK_DEFAULT_VARIABLES -D__STRICT_ANSI__ -include ${PROJECT_SOURCE_DIR}/test/sstream_workaround.h") set(CMAKE_CPP_FLAGS "${CMAKE_CPP_FLAGS} -DBTHREAD_USE_FAST_PTHREAD_MUTEX -D__const__= -D_GNU_SOURCE -DUSE_SYMBOLIZE -DNO_TCMALLOC -D__STDC_FORMAT_MACROS -D__STDC_LIMIT_MACROS -D__STDC_CONSTANT_MACROS -DUNIT_TEST -Dprivate=public -Dprotected=public -DBVAR_NOT_LINK_DEFAULT_VARIABLES -D__STRICT_ANSI__ -include ${PROJECT_SOURCE_DIR}/test/sstream_workaround.h")
set(CMAKE_CXX_FLAGS "${CMAKE_CPP_FLAGS} -g -pipe -Wall -W -fPIC -fstrict-aliasing -Wno-invalid-offsetof -Wno-unused-parameter -fno-omit-frame-pointer") set(CMAKE_CXX_FLAGS "${CMAKE_CPP_FLAGS} -O2 -pipe -Wall -W -fPIC -fstrict-aliasing -Wno-invalid-offsetof -Wno-unused-parameter -fno-omit-frame-pointer")
use_cxx11() use_cxx11()
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU") if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
......
...@@ -836,7 +836,7 @@ TEST_F(RedisTest, server_concurrency) { ...@@ -836,7 +836,7 @@ TEST_F(RedisTest, server_concurrency) {
rsimpl->AddCommandHandler("incr", ih); rsimpl->AddCommandHandler("incr", ih);
server_options.redis_service = rsimpl; server_options.redis_service = rsimpl;
brpc::PortRange pr(8081, 8900); brpc::PortRange pr(8081, 8900);
ASSERT_EQ(0, server.Start("127.0.0.1", pr, &server_options)); ASSERT_EQ(0, server.Start("0.0.0.0", pr, &server_options));
brpc::ChannelOptions options; brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_REDIS; options.protocol = brpc::PROTOCOL_REDIS;
...@@ -879,15 +879,15 @@ public: ...@@ -879,15 +879,15 @@ public:
return brpc::RedisCommandHandler::CONTINUE; return brpc::RedisCommandHandler::CONTINUE;
} }
output->set_array(commands.size()); output->set_array(commands.size());
s_mutex.lock();
for (size_t i = 0; i < commands.size(); ++i) { for (size_t i = 0; i < commands.size(); ++i) {
if (commands[i][0] == "incr") { if (commands[i][0] == "incr") {
int64_t value; int64_t value;
s_mutex.lock();
value = ++int_map[commands[i][1]]; value = ++int_map[commands[i][1]];
s_mutex.unlock();
(*output)[i].set_integer(value); (*output)[i].set_integer(value);
} }
} }
s_mutex.unlock();
return brpc::RedisCommandHandler::OK; return brpc::RedisCommandHandler::OK;
} }
RedisCommandHandler* New() { return new MultiCommandHandler; } RedisCommandHandler* New() { return new MultiCommandHandler; }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment