Commit bace6cca authored by zhujiashun's avatar zhujiashun

redis_server_protocol: add async interface

parent 8c1f531f
......@@ -35,6 +35,7 @@
#include "brpc/redis_command.h"
#include "brpc/policy/redis_protocol.h"
#include "bthread/execution_queue.h"
#include "bthread/countdown_event.h"
namespace brpc {
......@@ -56,19 +57,91 @@ struct InputResponse : public InputMessageBase {
}
};
struct QueueMeta {
class QueueMeta;
class ConsumeTaskDone : public google::protobuf::Closure {
public:
ConsumeTaskDone()
: _ready(false)
, in_transaction(false)
, output_message(&arena) {}
void Run() override;
bool is_ready() { return _ready; }
private:
bool _ready;
public:
butil::Arena arena;
RedisCommandHandler::Result result;
bool in_transaction;
RedisMessage input_message;
RedisMessage output_message;
QueueMeta* meta;
std::string command_name;
butil::IOBuf sendbuf;
};
class QueueMeta : public brpc::SharedObject {
public:
QueueMeta() : handler_continue(NULL) {}
void Push(ConsumeTaskDone* done) {
std::unique_lock<butil::Mutex> m(mutex);
dones.push(done);
}
void Flush() {
std::queue<ConsumeTaskDone*> ready_to_write;
SocketUniquePtr s;
if (Socket::Address(socket_id, &s) != 0) {
LOG(WARNING) << "Fail to address redis socket";
return;
}
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
{
std::unique_lock<butil::Mutex> m(mutex);
if (_writing) return;
_writing = true;
}
while (true) {
std::unique_lock<butil::Mutex> m(mutex);
while (!dones.empty() && dones.front()->is_ready()) {
ready_to_write.push(dones.front());
dones.pop();
}
if (ready_to_write.empty()) {
_writing = false;
return;
}
m.unlock();
while (!ready_to_write.empty()) {
ConsumeTaskDone* head = ready_to_write.front();
ready_to_write.pop();
LOG_IF(WARNING, s->Write(&head->sendbuf, &wopt) != 0)
<< "Fail to send redis reply";
delete head;
}
}
}
SocketId socket_id;
RedisService::CommandMap command_map;
RedisCommandHandler* handler_continue;
// queue to buffer commands and execute these commands together and atomicly
// used to implement command like 'MULTI'.
std::queue<RedisMessage> command_queue;
std::vector<const char**> command_queue;
// used to allocate memory for queued command;
butil::Arena arena;
// command that trigger commands to execute atomicly.
std::string queue_command_name;
QueueMeta() : handler_continue(NULL) {}
std::queue<ConsumeTaskDone*> dones;
bool _writing = false;
butil::Mutex mutex;
};
struct TaskContext {
......@@ -76,115 +149,126 @@ struct TaskContext {
butil::Arena arena;
};
void ConsumeTask(QueueMeta* meta, const RedisMessage& m, butil::Arena* arena, butil::IOBuf* sendbuf) {
RedisMessage output;
char buf[64];
do {
std::vector<const char*> args;
args.reserve(8);
bool args_parsed = true;
for (size_t i = 0; i < m.size(); ++i) {
if (!m[i].is_string()) {
output.set_error("ERR command not string", arena);
args_parsed = false;
break;
const char** ParseArgs(const RedisMessage& message) {
const char** args = (const char**)
malloc(sizeof(const char*) * (message.size() + 1 /* NULL */));
for (size_t i = 0; i < message.size(); ++i) {
if (!message[i].is_string()) {
free(args);
return NULL;
}
args.push_back(m[i].c_str());
}
if (!args_parsed) {
break;
}
std::string comm;
comm.reserve(8);
for (const char* c = m[0].c_str(); *c; ++c) {
comm.push_back(std::tolower(*c));
args[i] = message[i].c_str();
}
args[message.size()] = NULL;
return args;
}
void ConsumeTaskDone::Run() {
butil::intrusive_ptr<QueueMeta> delete_queue_meta(meta, false);
/*
char buf[64];
if (result.is_continue()) {
if (meta->handler_continue) {
RedisCommandResult result = meta->handler_continue->Run(args, &output, arena);
if (result == REDIS_COMMAND_CONTINUE) {
if (comm == meta->queue_command_name) {
snprintf(buf, sizeof(buf), "ERR %s calls can not be nested", comm.c_str());
output.set_error(buf, arena);
break;
}
meta->command_queue.emplace();
RedisMessage& last = meta->command_queue.back();
last.CopyFromDifferentArena(m, &meta->arena);
output.set_status("QUEUED", arena);
} else if (result == REDIS_COMMAND_OK) {
// This command is not first and should be queued.
if (command_name == meta->queue_command_name) {
snprintf(buf, sizeof(buf),
"ERR %s calls can not be nested", command_name.c_str());
output_message.set_error(buf);
} else {
RedisMessage copyed;
copyed.CopyFromDifferentArena(input_message, &meta->arena);
const char** args = ParseArgs(copyed);
CHECK(args != NULL);
meta->command_queue.push_back(args);
output_message.set_status("QUEUED");
}
} else {
// First command that return RedisCommandHandler::REDIS_COMMAND_CONTINUE
// should not be pushed into queue, since it is always a marker.
meta->handler_continue = last_handler;
meta->queue_command_name = command_name;
output_message.set_status("OK");
}
} else if (result.is_ok()) {
if (in_transaction && meta->handler_continue) {
RedisCommandHandler* handler = meta->handler_continue;
meta->handler_continue = NULL;
meta->queue_command_name.clear();
butil::IOBuf nocountbuf;
int array_count = meta->command_queue.size();
while (!meta->command_queue.empty()) {
RedisMessage& front = meta->command_queue.front();
meta->command_queue.pop();
ConsumeTask(meta, front, arena, &nocountbuf);
}
AppendHeader(*sendbuf, '*', array_count);
sendbuf->append(nocountbuf);
delete_queue_meta.detach();
handler->RunTransaction(meta->command_queue, &output_message, &result, this);
meta->arena.clear();
for (size_t i = 0; i < meta->command_queue.size(); ++i) {
free(meta->command_queue[i]);
}
meta->command_queue.clear();
return;
} else if (result == REDIS_COMMAND_ERROR) {
meta->handler_continue = NULL;
meta->queue_command_name.clear();
if (!output.is_error()) {
output.set_error("internal server error", arena);
}
} else {
meta->handler_continue = NULL;
meta->queue_command_name.clear();
LOG(ERROR) << "unknown redis command result=" << result;
output.set_error("internal server error", arena);
LOG(ERROR) << "unknown redis command result";
output_message.set_error("internal server error");
}
*/
output_message.SerializeToIOBuf(&sendbuf);
//TODO: add fence
_ready = true;
meta->Flush();
}
int ConsumeTask(QueueMeta* meta, const RedisMessage& m) {
ConsumeTaskDone* done = new ConsumeTaskDone;
ClosureGuard done_guard(done);
meta->Push(done);
meta->AddRefManually();
done->meta = meta;
RedisMessage& output = done->output_message;
done->input_message.CopyFromDifferentArena(m, &done->arena);
char buf[64];
const char** args = ParseArgs(done->input_message);
if (!args) {
output.set_error("ERR command not string");
return -1;
}
break;
std::string comm;
comm.reserve(8);
for (const char* c = done->input_message[0].c_str(); *c; ++c) {
comm.push_back(std::tolower(*c));
}
done->command_name = comm;
if (meta->handler_continue) {
RedisCommandHandler::Result result = meta->handler_continue->Run(
args, &output, done_guard.release());
if (result == RedisCommandHandler::OK) {
meta->handler_continue = NULL;
}
} else {
auto it = meta->command_map.find(comm);
if (it == meta->command_map.end()) {
snprintf(buf, sizeof(buf), "ERR unknown command `%s`", comm.c_str());
output.set_error(buf, arena);
break;
}
RedisCommandResult result = it->second->Run(args, &output, arena);
if (result == REDIS_COMMAND_CONTINUE) {
// First command that return REDIS_COMMAND_CONTINUE should not be pushed
// into queue, since it is always a marker.
output.set_error(buf);
} else {
RedisCommandHandler::Result result =
it->second->Run(args, &output, done_guard.release());
if (result == RedisCommandHandler::CONTINUE) {
meta->handler_continue = it->second.get();
meta->queue_command_name = comm;
output.set_status("OK", arena);
} else if (result == REDIS_COMMAND_ERROR) {
if (!output.is_error()) {
output.set_error("internal server error", arena);
}
} else if (result != REDIS_COMMAND_OK) {
LOG(ERROR) << "unknown redis command result=" << result;
}
} while(0);
output.SerializeToIOBuf(sendbuf);
}
free(args);
return 0;
}
int Consume(void* meta, bthread::TaskIterator<TaskContext*>& iter) {
QueueMeta* qmeta = static_cast<QueueMeta*>(meta);
if (iter.is_queue_stopped()) {
delete qmeta;
qmeta->RemoveRefManually();
return 0;
}
SocketUniquePtr s;
bool has_err = false;
if (Socket::Address(qmeta->socket_id, &s) != 0) {
LOG(WARNING) << "Fail to address redis socket";
has_err = true;
}
for (; iter; ++iter) {
std::unique_ptr<TaskContext> ctx(*iter);
if (has_err) {
continue;
}
butil::IOBuf sendbuf;
ConsumeTask(qmeta, ctx->message, &ctx->arena, &sendbuf);
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
LOG_IF(WARNING, s->Write(&sendbuf, &wopt) != 0)
<< "Fail to send redis reply";
ConsumeTask(qmeta, ctx->message);
}
return 0;
}
......@@ -226,12 +310,13 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
ServerContext* ctx = static_cast<ServerContext*>(socket->parsing_context());
if (ctx == NULL) {
QueueMeta* meta = new QueueMeta;
meta->AddRefManually();
meta->socket_id = socket->id();
rs->CloneCommandMap(&meta->command_map);
ctx = new ServerContext;
if (ctx->init(meta) != 0) {
delete ctx;
delete meta;
meta->RemoveRefManually();
LOG(ERROR) << "Fail to init redis ServerContext";
return MakeParseError(PARSE_ERROR_NO_RESOURCE);
}
......
......@@ -325,7 +325,7 @@ void RedisResponse::MergeFrom(const RedisResponse& from) {
RedisMessage* new_others =
(RedisMessage*)_arena.allocate(sizeof(RedisMessage) * (new_nreply - 1));
for (int i = 0; i < new_nreply - 1; ++i) {
new (new_others + i) RedisMessage;
new (new_others + i) RedisMessage(NULL);
}
int new_other_index = 0;
for (int i = 1; i < _nreply; ++i) {
......@@ -436,7 +436,7 @@ std::ostream& operator<<(std::ostream& os, const RedisResponse& response) {
return os;
}
bool RedisService::AddHandler(const std::string& name, RedisCommandHandler* handler) {
bool RedisService::AddCommandHandler(const std::string& name, RedisCommandHandler* handler) {
std::string lcname;
lcname.reserve(name.size());
for (auto c : name) {
......
......@@ -21,12 +21,15 @@
#define BRPC_REDIS_H
#include <google/protobuf/message.h>
#include <unordered_map>
#include <vector>
#include "butil/iobuf.h"
#include "butil/strings/string_piece.h"
#include "butil/arena.h"
#include "brpc/proto_base.pb.h"
#include "brpc/redis_message.h"
#include "brpc/parse_result.h"
#include "brpc/callback.h"
namespace brpc {
......@@ -209,39 +212,41 @@ private:
std::ostream& operator<<(std::ostream& os, const RedisRequest&);
std::ostream& operator<<(std::ostream& os, const RedisResponse&);
enum RedisCommandResult {
REDIS_COMMAND_OK = 0,
REDIS_COMMAND_CONTINUE = 1,
REDIS_COMMAND_ERROR = 2,
};
// The handler for a redis command. Run() and New() should be implemented
// by user. For Run(), `args` is the redis command argument. For example,
// "set foo bar" corresponds to args[0] == "set", args[1] == "foo" and
// args[2] == "bar". `output` is the content that sent to client side,
// which should be set by user. Read brpc/src/redis_message.h for more usage.
// `arena` is the memory arena that `output` would use.
// For New(), whenever a tcp connection is established, all handlers would
// be cloned and brpc makes sure that all requests of the same command name
// from one connection would be sent to the same command handler. All requests
// in one connection are executed sequentially, just like what redis-server does.
// For New(), whenever a tcp connection is established, a bunch of new handlers
// would be created using New() of corresponding handler and brpc makes sure that
// all requests of the same command name from one connection would be redirected
// to the same New()-ed command handler. All requests in one connection are
// executed sequentially, just like what redis-server does.
class RedisCommandHandler {
public:
enum Result {
OK = 0,
CONTINUE = 1,
};
~RedisCommandHandler() {}
virtual RedisCommandResult Run(const std::vector<const char*>& args,
RedisMessage* output, butil::Arena* arena) = 0;
virtual RedisCommandHandler::Result Run(const char* args[],
RedisMessage* output,
google::protobuf::Closure* done) = 0;
virtual RedisCommandHandler* New() = 0;
};
// Implement this class and assign an instance to ServerOption.redis_service
// to enable redis support. To support a particular command, you should implement
// the corresponding handler and call AddHandler to install it.
// the corresponding handler and call AddCommandHandler to install it.
class RedisService {
public:
typedef std::unordered_map<std::string, std::shared_ptr<RedisCommandHandler>> CommandMap;
virtual ~RedisService() {}
bool AddHandler(const std::string& name, RedisCommandHandler* handler);
bool AddCommandHandler(const std::string& name, RedisCommandHandler* handler);
void CloneCommandMap(CommandMap* map);
private:
CommandMap _command_map;
......@@ -249,5 +254,4 @@ private:
} // namespace brpc
#endif // BRPC_REDIS_H
......@@ -245,7 +245,7 @@ ParseError RedisMessage::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* ar
return PARSE_ERROR_ABSOLUTELY_WRONG;
}
for (int64_t i = 0; i < count; ++i) {
new (&subs[i]) RedisMessage;
new (&subs[i]) RedisMessage(NULL);
}
buf.pop_front(crlf_pos + 2/*CRLF*/);
_type = REDIS_MESSAGE_ARRAY;
......
......@@ -44,8 +44,9 @@ const char* RedisMessageTypeToString(RedisMessageType);
// A reply from redis-server.
class RedisMessage {
public:
// A default constructed reply is a nil.
// A default reply is a nil.
RedisMessage();
RedisMessage(butil::Arena* arena);
// Type of the reply.
RedisMessageType type() const { return _type; }
......@@ -57,11 +58,11 @@ public:
bool is_array() const; // True if the reply is an array.
bool set_nil_string(); // "$-1\r\n"
bool set_array(int size, butil::Arena* arena); // size == -1 means nil array("*-1\r\n")
bool set_status(const std::string& str, butil::Arena* arena);
bool set_error(const std::string& str, butil::Arena* arena);
bool set_array(int size); // size == -1 means nil array("*-1\r\n")
bool set_status(const std::string& str);
bool set_error(const std::string& str);
bool set_integer(int64_t value);
bool set_bulk_string(const std::string& str, butil::Arena* arena);
bool set_bulk_string(const std::string& str);
// Convert the reply into a signed 64-bit integer(according to
// http://redis.io/topics/protocol). If the reply is not an integer,
......@@ -129,7 +130,7 @@ private:
// by calling CopyFrom[Different|Same]Arena.
DISALLOW_COPY_AND_ASSIGN(RedisMessage);
bool set_basic_string(const std::string& str, butil::Arena* arena, RedisMessageType type);
bool set_basic_string(const std::string& str, RedisMessageType type);
RedisMessageType _type;
uint32_t _length; // length of short_str/long_str, count of replies
......@@ -143,6 +144,7 @@ private:
} array;
uint64_t padding[2]; // For swapping, must cover all bytes.
} _data;
butil::Arena* _arena;
};
// =========== inline impl. ==============
......@@ -152,9 +154,15 @@ inline std::ostream& operator<<(std::ostream& os, const RedisMessage& r) {
return os;
}
inline RedisMessage::RedisMessage(butil::Arena* arena)
: RedisMessage() {
_arena = arena;
}
inline RedisMessage::RedisMessage()
: _type(REDIS_MESSAGE_NIL)
, _length(0) {
, _length(0)
, _arena(NULL) {
_data.array.last_index = -1;
_data.array.replies = NULL;
}
......@@ -180,12 +188,16 @@ inline int64_t RedisMessage::integer() const {
}
inline bool RedisMessage::set_nil_string() {
if (!_arena) return false;
_type = REDIS_MESSAGE_STRING;
_length = npos;
return true;
}
inline bool RedisMessage::set_array(int size, butil::Arena* arena) {
inline bool RedisMessage::set_array(int size) {
if (!_arena) {
return false;
}
_type = REDIS_MESSAGE_ARRAY;
if (size < 0) {
_length = npos;
......@@ -194,26 +206,29 @@ inline bool RedisMessage::set_array(int size, butil::Arena* arena) {
_length = 0;
return true;
}
RedisMessage* subs = (RedisMessage*)arena->allocate(sizeof(RedisMessage) * size);
RedisMessage* subs = (RedisMessage*)_arena->allocate(sizeof(RedisMessage) * size);
if (!subs) {
LOG(FATAL) << "Fail to allocate RedisMessage[" << size << "]";
return false;
}
for (int i = 0; i < size; ++i) {
new (&subs[i]) RedisMessage;
new (&subs[i]) RedisMessage(_arena);
}
_length = size;
_data.array.replies = subs;
return true;
}
inline bool RedisMessage::set_basic_string(const std::string& str, butil::Arena* arena, RedisMessageType type) {
inline bool RedisMessage::set_basic_string(const std::string& str, RedisMessageType type) {
if (!_arena) {
return false;
}
size_t size = str.size();
if (size < sizeof(_data.short_str)) {
memcpy(_data.short_str, str.c_str(), size);
_data.short_str[size] = '\0';
} else {
char* d = (char*)arena->allocate((_length/8 + 1) * 8);
char* d = (char*)_arena->allocate((_length/8 + 1) * 8);
if (!d) {
LOG(FATAL) << "Fail to allocate string[" << size << "]";
return false;
......@@ -227,12 +242,12 @@ inline bool RedisMessage::set_basic_string(const std::string& str, butil::Arena*
return true;
}
inline bool RedisMessage::set_status(const std::string& str, butil::Arena* arena) {
return set_basic_string(str, arena, REDIS_MESSAGE_STATUS);
inline bool RedisMessage::set_status(const std::string& str) {
return set_basic_string(str, REDIS_MESSAGE_STATUS);
}
inline bool RedisMessage::set_error(const std::string& str, butil::Arena* arena) {
return set_basic_string(str, arena, REDIS_MESSAGE_ERROR);
inline bool RedisMessage::set_error(const std::string& str) {
return set_basic_string(str, REDIS_MESSAGE_ERROR);
}
inline bool RedisMessage::set_integer(int64_t value) {
......@@ -242,8 +257,8 @@ inline bool RedisMessage::set_integer(int64_t value) {
return true;
}
inline bool RedisMessage::set_bulk_string(const std::string& str, butil::Arena* arena) {
return set_basic_string(str, arena, REDIS_MESSAGE_STRING);
inline bool RedisMessage::set_bulk_string(const std::string& str) {
return set_basic_string(str, REDIS_MESSAGE_STRING);
}
inline const char* RedisMessage::c_str() const {
......
......@@ -53,7 +53,7 @@ endif()
set(CMAKE_CPP_FLAGS "${DEFINE_CLOCK_GETTIME} -DBRPC_WITH_GLOG=${WITH_GLOG_VAL} -DGFLAGS_NS=${GFLAGS_NS}")
set(CMAKE_CPP_FLAGS "${CMAKE_CPP_FLAGS} -DBTHREAD_USE_FAST_PTHREAD_MUTEX -D__const__= -D_GNU_SOURCE -DUSE_SYMBOLIZE -DNO_TCMALLOC -D__STDC_FORMAT_MACROS -D__STDC_LIMIT_MACROS -D__STDC_CONSTANT_MACROS -DUNIT_TEST -Dprivate=public -Dprotected=public -DBVAR_NOT_LINK_DEFAULT_VARIABLES -D__STRICT_ANSI__ -include ${PROJECT_SOURCE_DIR}/test/sstream_workaround.h")
set(CMAKE_CXX_FLAGS "${CMAKE_CPP_FLAGS} -O2 -pipe -Wall -W -fPIC -fstrict-aliasing -Wno-invalid-offsetof -Wno-unused-parameter -fno-omit-frame-pointer")
set(CMAKE_CXX_FLAGS "${CMAKE_CPP_FLAGS} -g -pipe -Wall -W -fPIC -fstrict-aliasing -Wno-invalid-offsetof -Wno-unused-parameter -fno-omit-frame-pointer")
use_cxx11()
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
......
......@@ -553,9 +553,9 @@ TEST_F(RedisTest, codec) {
butil::Arena arena;
// status
{
brpc::RedisMessage r;
brpc::RedisMessage r(&arena);
butil::IOBuf buf;
ASSERT_TRUE(r.set_status("OK", &arena));
ASSERT_TRUE(r.set_status("OK"));
ASSERT_TRUE(r.SerializeToIOBuf(&buf));
ASSERT_STREQ(buf.to_string().c_str(), "+OK\r\n");
ASSERT_STREQ(r.c_str(), "OK");
......@@ -567,9 +567,9 @@ TEST_F(RedisTest, codec) {
}
// error
{
brpc::RedisMessage r;
brpc::RedisMessage r(&arena);
butil::IOBuf buf;
ASSERT_TRUE(r.set_error("not exist \'key\'", &arena));
ASSERT_TRUE(r.set_error("not exist \'key\'"));
ASSERT_TRUE(r.SerializeToIOBuf(&buf));
ASSERT_STREQ(buf.to_string().c_str(), "-not exist \'key\'\r\n");
r.Clear();
......@@ -580,7 +580,7 @@ TEST_F(RedisTest, codec) {
}
// string
{
brpc::RedisMessage r;
brpc::RedisMessage r(&arena);
butil::IOBuf buf;
ASSERT_TRUE(r.set_nil_string());
ASSERT_TRUE(r.SerializeToIOBuf(&buf));
......@@ -591,7 +591,7 @@ TEST_F(RedisTest, codec) {
ASSERT_TRUE(r.is_nil());
r.Clear();
ASSERT_TRUE(r.set_bulk_string("abcde'hello world", &arena));
ASSERT_TRUE(r.set_bulk_string("abcde'hello world"));
ASSERT_TRUE(r.SerializeToIOBuf(&buf));
ASSERT_STREQ(buf.to_string().c_str(), "$17\r\nabcde'hello world\r\n");
ASSERT_STREQ(r.c_str(), "abcde'hello world");
......@@ -603,7 +603,7 @@ TEST_F(RedisTest, codec) {
}
// integer
{
brpc::RedisMessage r;
brpc::RedisMessage r(&arena);
butil::IOBuf buf;
int t = 2;
int input[] = { -1, 1234567 };
......@@ -622,14 +622,14 @@ TEST_F(RedisTest, codec) {
}
// array
{
brpc::RedisMessage r;
brpc::RedisMessage r(&arena);
butil::IOBuf buf;
ASSERT_TRUE(r.set_array(3, &arena));
ASSERT_TRUE(r.set_array(3));
brpc::RedisMessage& sub_reply = r[0];
sub_reply.set_array(2, &arena);
sub_reply[0].set_bulk_string("hello, it's me", &arena);
sub_reply.set_array(2);
sub_reply[0].set_bulk_string("hello, it's me");
sub_reply[1].set_integer(422);
r[1].set_bulk_string("To go over everything", &arena);
r[1].set_bulk_string("To go over everything");
r[2].set_integer(1);
ASSERT_TRUE(r[3].is_nil());
ASSERT_TRUE(r.SerializeToIOBuf(&buf));
......@@ -653,7 +653,7 @@ TEST_F(RedisTest, codec) {
r.Clear();
// nil array
ASSERT_TRUE(r.set_array(-1, &arena));
ASSERT_TRUE(r.set_array(-1));
ASSERT_TRUE(r.SerializeToIOBuf(&buf));
ASSERT_STREQ(buf.to_string().c_str(), "*-1\r\n");
ASSERT_EQ(r.ConsumePartialIOBuf(buf, &arena), brpc::PARSE_OK);
......@@ -665,50 +665,98 @@ butil::Mutex s_mutex;
std::unordered_map<std::string, std::string> m;
std::unordered_map<std::string, int64_t> int_map;
void* random_sleep(void *arg) {
google::protobuf::Closure* done = static_cast<google::protobuf::Closure*>(arg);
// [50, 100) ms
int sleep_ms = 50 + butil::fast_rand_less_than(50);
bthread_usleep(sleep_ms * 1000);
done->Run();
return NULL;
}
class SetCommandHandler : public brpc::RedisCommandHandler {
public:
brpc::RedisCommandResult Run(const std::vector<const char*>& args,
brpc::RedisMessage* output, butil::Arena* arena) {
SetCommandHandler(bool rand_sleep = false)
: _rand_sleep(rand_sleep) {}
brpc::RedisCommandHandler::Result Run(const char* args[],
brpc::RedisMessage* output,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
std::string key = args[1];
std::string value = args[2];
m[key] = value;
output->set_status("OK", arena);
return brpc::REDIS_COMMAND_OK;
output->set_status("OK");
if (_rand_sleep) {
bthread_t bth;
bthread_start_background(&bth, NULL, random_sleep, done_guard.release());
}
return brpc::RedisCommandHandler::OK;
}
RedisCommandHandler* New() { new_count++; return new SetCommandHandler; }
int new_count = 0;
RedisCommandHandler* New() { _new_count++; return new SetCommandHandler(_rand_sleep); }
int new_count() { return _new_count; }
private:
int _new_count = 0;
bool _rand_sleep = false;
};
class GetCommandHandler : public brpc::RedisCommandHandler {
public:
brpc::RedisCommandResult Run(const std::vector<const char*>& args,
brpc::RedisMessage* output, butil::Arena* arena) {
GetCommandHandler(bool rand_sleep = false)
: _rand_sleep(rand_sleep) {}
brpc::RedisCommandHandler::Result Run(const char* args[],
brpc::RedisMessage* output,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
std::string key = args[1];
auto it = m.find(key);
if (it != m.end()) {
output->set_bulk_string(it->second, arena);
output->set_bulk_string(it->second);
} else {
output->set_nil_string();
}
return brpc::REDIS_COMMAND_OK;
if (_rand_sleep) {
bthread_t bth;
bthread_start_background(&bth, NULL, random_sleep, done_guard.release());
}
return brpc::RedisCommandHandler::OK;
}
RedisCommandHandler* New() { new_count++; return new GetCommandHandler; }
int new_count = 0;
RedisCommandHandler* New() { _new_count++; return new GetCommandHandler(_rand_sleep); }
int new_count() { return _new_count; }
private:
int _new_count = 0;
bool _rand_sleep = false;
};
class IncrCommandHandler : public brpc::RedisCommandHandler {
public:
brpc::RedisCommandResult Run(const std::vector<const char*>& args,
brpc::RedisMessage* output, butil::Arena* arena) {
IncrCommandHandler(bool rand_sleep = false)
: _rand_sleep(rand_sleep) {}
brpc::RedisCommandHandler::Result Run(const char* args[],
brpc::RedisMessage* output,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
int64_t value;
s_mutex.lock();
value = ++int_map[args[1]];
s_mutex.unlock();
output->set_integer(value);
return brpc::REDIS_COMMAND_OK;
if (_rand_sleep) {
bthread_t bth;
bthread_start_background(&bth, NULL, random_sleep, done_guard.release());
}
return brpc::RedisCommandHandler::OK;
}
RedisCommandHandler* New() { new_count++; return new IncrCommandHandler; }
int new_count = 0;
RedisCommandHandler* New() { _new_count++; return new IncrCommandHandler(_rand_sleep); }
int new_count() { return _new_count; }
private:
int _new_count = 0;
bool _rand_sleep = false;
};
class RedisServiceImpl : public brpc::RedisService { };
......@@ -717,12 +765,12 @@ TEST_F(RedisTest, server_sanity) {
brpc::Server server;
brpc::ServerOptions server_options;
RedisServiceImpl* rsimpl = new RedisServiceImpl;
GetCommandHandler *gh = new GetCommandHandler;
SetCommandHandler *sh = new SetCommandHandler;
IncrCommandHandler *ih = new IncrCommandHandler;
rsimpl->AddHandler("get", gh);
rsimpl->AddHandler("set", sh);
rsimpl->AddHandler("incr", ih);
GetCommandHandler *gh = new GetCommandHandler(true);
SetCommandHandler *sh = new SetCommandHandler(true);
IncrCommandHandler *ih = new IncrCommandHandler(true);
rsimpl->AddCommandHandler("get", gh);
rsimpl->AddCommandHandler("set", sh);
rsimpl->AddCommandHandler("incr", ih);
server_options.redis_service = rsimpl;
brpc::PortRange pr(8081, 8900);
ASSERT_EQ(0, server.Start("127.0.0.1", pr, &server_options));
......@@ -758,9 +806,9 @@ TEST_F(RedisTest, server_sanity) {
ASSERT_EQ(brpc::REDIS_MESSAGE_ERROR, response.reply(6).type());
ASSERT_TRUE(butil::StringPiece(response.reply(6).error_message()).starts_with("ERR unknown command"));
ASSERT_EQ(gh->new_count, 1);
ASSERT_EQ(sh->new_count, 1);
ASSERT_EQ(ih->new_count, 1);
ASSERT_EQ(gh->new_count(), 1);
ASSERT_EQ(sh->new_count(), 1);
ASSERT_EQ(ih->new_count(), 1);
}
void* incr_thread(void* arg) {
......@@ -785,7 +833,7 @@ TEST_F(RedisTest, server_concurrency) {
brpc::ServerOptions server_options;
RedisServiceImpl* rsimpl = new RedisServiceImpl;
IncrCommandHandler *ih = new IncrCommandHandler;
rsimpl->AddHandler("incr", ih);
rsimpl->AddCommandHandler("incr", ih);
server_options.redis_service = rsimpl;
brpc::PortRange pr(8081, 8900);
ASSERT_EQ(0, server.Start("127.0.0.1", pr, &server_options));
......@@ -808,29 +856,53 @@ TEST_F(RedisTest, server_concurrency) {
delete channels[i];
}
ASSERT_EQ(int_map["count"], 10 * 5000LL);
ASSERT_EQ(ih->new_count, N);
ASSERT_EQ(ih->new_count(), N);
}
class MultiCommandHandler : public brpc::RedisCommandHandler {
public:
brpc::RedisCommandResult Run(const std::vector<const char*>& args,
brpc::RedisMessage* output, butil::Arena* arena) {
RedisCommandHandler::Result Run(const char* args[],
brpc::RedisMessage* output,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
if (strcmp(args[0], "multi") == 0) {
output->set_status("OK");
return brpc::RedisCommandHandler::CONTINUE;
}
if (strcmp(args[0], "exec") != 0) {
return brpc::REDIS_COMMAND_CONTINUE;
std::vector<std::string> sargs;
for (const char** c = args; *c; ++c) {
sargs.push_back(*c);
}
commands.push_back(sargs);
output->set_status("QUEUED");
return brpc::RedisCommandHandler::CONTINUE;
}
return brpc::REDIS_COMMAND_OK;
output->set_array(commands.size());
for (size_t i = 0; i < commands.size(); ++i) {
if (commands[i][0] == "incr") {
int64_t value;
s_mutex.lock();
value = ++int_map[commands[i][1]];
s_mutex.unlock();
(*output)[i].set_integer(value);
}
}
return brpc::RedisCommandHandler::OK;
}
RedisCommandHandler* New() { return new MultiCommandHandler; }
std::vector<std::vector<std::string>> commands;
};
TEST_F(RedisTest, server_command_continue) {
brpc::Server server;
brpc::ServerOptions server_options;
RedisServiceImpl* rsimpl = new RedisServiceImpl;
rsimpl->AddHandler("get", new GetCommandHandler);
rsimpl->AddHandler("set", new SetCommandHandler);
rsimpl->AddHandler("incr", new IncrCommandHandler);
rsimpl->AddHandler("multi", new MultiCommandHandler);
rsimpl->AddCommandHandler("get", new GetCommandHandler);
rsimpl->AddCommandHandler("set", new SetCommandHandler);
rsimpl->AddCommandHandler("incr", new IncrCommandHandler);
rsimpl->AddCommandHandler("multi", new MultiCommandHandler);
server_options.redis_service = rsimpl;
brpc::PortRange pr(8081, 8900);
ASSERT_EQ(0, server.Start("127.0.0.1", pr, &server_options));
......@@ -855,27 +927,22 @@ TEST_F(RedisTest, server_command_continue) {
brpc::RedisRequest request;
brpc::RedisResponse response;
brpc::Controller cntl;
// multiple 'multi' should also work
ASSERT_TRUE(request.AddCommand("multi"));
ASSERT_TRUE(request.AddCommand("Multi"));
ASSERT_TRUE(request.AddCommand("muLti"));
int count = 10;
for (int i = 0; i < count; ++i) {
ASSERT_TRUE(request.AddCommand("incr hello 1"));
}
ASSERT_TRUE(request.AddCommand("exec"));
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
ASSERT_EQ(14, response.reply_size());
ASSERT_EQ(12, response.reply_size());
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(brpc::REDIS_MESSAGE_STATUS, response.reply(0).type());
ASSERT_STREQ("OK", response.reply(0).c_str());
ASSERT_EQ(brpc::REDIS_MESSAGE_ERROR, response.reply(1).type());
ASSERT_EQ(brpc::REDIS_MESSAGE_ERROR, response.reply(2).type());
for (int i = 3; i < count + 3; ++i) {
for (int i = 1; i < count + 1; ++i) {
ASSERT_EQ(brpc::REDIS_MESSAGE_STATUS, response.reply(i).type());
ASSERT_STREQ("QUEUED", response.reply(i).c_str());
}
const brpc::RedisMessage& m = response.reply(count+3);
const brpc::RedisMessage& m = response.reply(count+1);
ASSERT_EQ(count, (int)m.size());
for (int i = 0; i < count; ++i) {
ASSERT_EQ(i+1, m[i].integer());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment