Commit 78ae56e8 authored by zhujiashun's avatar zhujiashun

redis_server_protocol: add RedisReply::SerializeToIOBuf and UT

parent 5f4815ab
......@@ -75,10 +75,11 @@ int Consume(void* meta, bthread::TaskIterator<ExecutionQueueContext*>& iter) {
RedisReply output;
conn->OnRedisMessage(ctx->message, &output, &ctx->arena);
butil::IOBuf sendbuf;
sendbuf.append("+OK\r\n");
output.SerializeToIOBuf(&sendbuf);
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
s->Write(&sendbuf, &wopt);
LOG_IF(WARNING, s->Write(&sendbuf, &wopt) != 0)
<< "Fail to send redis reply";
}
return 0;
}
......@@ -114,20 +115,21 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
}
const Server* server = static_cast<const Server*>(arg);
if (server) {
ServerContext* ctx = static_cast<ServerContext*>(socket->parsing_context());
if (ctx == NULL) {
RedisConnection* conn = server->options().redis_service->NewConnection();
if (!conn) {
LOG(ERROR) << "Fail to new redis connection from redis service";
return MakeParseError(PARSE_ERROR_TRY_OTHERS);
}
ServerContext* ctx = static_cast<ServerContext*>(socket->parsing_context());
if (ctx == NULL) {
ctx = new ServerContext;
if (ctx->init(conn) != 0) {
delete conn;
delete ctx;
LOG(ERROR) << "Fail to init redis ServerContext";
return MakeParseError(PARSE_ERROR_NO_RESOURCE);
}
socket->initialize_parsing_context(&ctx);
socket->reset_parsing_context(&ctx);
}
std::unique_ptr<ExecutionQueueContext> task(new ExecutionQueueContext);
RedisReply message;
......
......@@ -24,6 +24,7 @@
namespace brpc {
//BAIDU_CASSERT(sizeof(RedisReply) == 24, size_match);
const uint32_t RedisReply::npos = (uint32_t)-1;
const char* RedisReplyTypeToString(RedisReplyType type) {
switch (type) {
......@@ -38,8 +39,56 @@ const char* RedisReplyTypeToString(RedisReplyType type) {
}
bool RedisReply::SerializeToIOBuf(butil::IOBuf* buf) {
//TODO
butil::IOBufBuilder builder;
switch (_type) {
case REDIS_REPLY_ERROR:
case REDIS_REPLY_STATUS:
buf->push_back((_type == REDIS_REPLY_ERROR)? '-' : '+');
if (_length < sizeof(_data.short_str)) {
buf->append(_data.short_str, _length);
} else {
buf->append(_data.long_str, _length);
}
buf->append("\r\n");
break;
case REDIS_REPLY_INTEGER:
builder << ':' << _data.integer << "\r\n";
buf->append(builder.buf());
break;
case REDIS_REPLY_STRING:
// Since _length is unsigned, we have to int casting _length to
// represent nil string
builder << '$' << (int)_length << "\r\n";
buf->append(builder.buf());
if (_length == npos) {
break;
}
if (_length < sizeof(_data.short_str)) {
buf->append(_data.short_str, _length);
} else {
buf->append(_data.long_str, _length);
}
buf->append("\r\n");
break;
case REDIS_REPLY_ARRAY:
builder << '*' << (int)_length << "\r\n";
buf->append(builder.buf());
if (_length == npos) {
break;
}
for (size_t i = 0; i < _length; ++i) {
if (!_data.array.replies[i].SerializeToIOBuf(buf)) {
return false;
}
}
break;
case REDIS_REPLY_NIL:
buf->append("$-1\r\n");
break;
default:
CHECK(false) << "unknown redis type=" << _type;
return false;
}
return true;
}
......
......@@ -58,7 +58,7 @@ public:
bool set_nil_string(); // "$-1\r\n"
bool set_array(int size, butil::Arena* arena); // size == -1 means nil array("*-1\r\n")
bool set_simple_string(const std::string& str, butil::Arena* arena);
bool set_status(const std::string& str, butil::Arena* arena);
bool set_error(const std::string& str, butil::Arena* arena);
bool set_integer(int64_t value);
bool set_bulk_string(const std::string& str, butil::Arena* arena);
......@@ -87,6 +87,7 @@ public:
// Get the index-th sub reply. If this reply is not an array, a nil reply
// is returned (call stacks are not logged)
const RedisReply& operator[](size_t index) const;
RedisReply& operator[](size_t index);
// Parse from `buf' which may be incomplete and allocate needed memory
// on `arena'.
......@@ -100,6 +101,7 @@ public:
// the complexity in worst case may be O(N^2).
// Returns PARSE_ERROR_ABSOLUTELY_WRONG if the parsing failed.
ParseError ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* arena);
//
bool SerializeToIOBuf(butil::IOBuf* buf);
......@@ -121,6 +123,8 @@ public:
void CopyFromSameArena(const RedisReply& other);
private:
static const uint32_t npos;
// RedisReply does not own the memory of fields, copying must be done
// by calling CopyFrom[Different|Same]Arena.
DISALLOW_COPY_AND_ASSIGN(RedisReply);
......@@ -155,7 +159,11 @@ inline RedisReply::RedisReply()
_data.array.replies = NULL;
}
inline bool RedisReply::is_nil() const { return _type == REDIS_REPLY_NIL; }
inline bool RedisReply::is_nil() const {
return (_type == REDIS_REPLY_NIL) ||
((_type == REDIS_REPLY_STRING || _type == REDIS_REPLY_ARRAY) &&
_length == uint32_t(-1));
}
inline bool RedisReply::is_error() const { return _type == REDIS_REPLY_ERROR; }
inline bool RedisReply::is_integer() const { return _type == REDIS_REPLY_INTEGER; }
inline bool RedisReply::is_string() const
......@@ -173,14 +181,14 @@ inline int64_t RedisReply::integer() const {
inline bool RedisReply::set_nil_string() {
_type = REDIS_REPLY_STRING;
_length = -1;
_length = npos;
return true;
}
inline bool RedisReply::set_array(int size, butil::Arena* arena) {
_type = REDIS_REPLY_ARRAY;
if (size < 0) {
_length = -1;
_length = npos;
return true;
} else if (size == 0) {
_length = 0;
......@@ -217,7 +225,7 @@ inline bool RedisReply::set_basic_string(const std::string& str, butil::Arena* a
return true;
}
inline bool RedisReply::set_simple_string(const std::string& str, butil::Arena* arena) {
inline bool RedisReply::set_status(const std::string& str, butil::Arena* arena) {
return set_basic_string(str, arena, REDIS_REPLY_STATUS);
}
......@@ -279,6 +287,11 @@ inline size_t RedisReply::size() const {
return (is_array() ? _length : 0);
}
inline RedisReply& RedisReply::operator[](size_t index) {
return const_cast<RedisReply&>(
const_cast<const RedisReply*>(this)->operator[](index));
}
inline const RedisReply& RedisReply::operator[](size_t index) const {
if (is_array() && index < _length) {
return _data.array.replies[index];
......
......@@ -548,6 +548,120 @@ TEST_F(RedisTest, quote_and_escape) {
request.Clear();
}
TEST_F(RedisTest, codec) {
butil::Arena arena;
// status
{
brpc::RedisReply r;
butil::IOBuf buf;
ASSERT_TRUE(r.set_status("OK", &arena));
ASSERT_TRUE(r.SerializeToIOBuf(&buf));
ASSERT_STREQ(buf.to_string().c_str(), "+OK\r\n");
r.Clear();
brpc::ParseError err = r.ConsumePartialIOBuf(buf, &arena);
ASSERT_EQ(err, brpc::PARSE_OK);
ASSERT_TRUE(r.is_string());
ASSERT_STREQ("OK", r.c_str());
}
// error
{
brpc::RedisReply r;
butil::IOBuf buf;
ASSERT_TRUE(r.set_error("not exist \'key\'", &arena));
ASSERT_TRUE(r.SerializeToIOBuf(&buf));
ASSERT_STREQ(buf.to_string().c_str(), "-not exist \'key\'\r\n");
r.Clear();
brpc::ParseError err = r.ConsumePartialIOBuf(buf, &arena);
ASSERT_EQ(err, brpc::PARSE_OK);
ASSERT_TRUE(r.is_error());
ASSERT_STREQ("not exist \'key\'", r.error_message());
}
// string
{
brpc::RedisReply r;
butil::IOBuf buf;
ASSERT_TRUE(r.set_nil_string());
ASSERT_TRUE(r.SerializeToIOBuf(&buf));
ASSERT_STREQ(buf.to_string().c_str(), "$-1\r\n");
r.Clear();
brpc::ParseError err = r.ConsumePartialIOBuf(buf, &arena);
ASSERT_EQ(err, brpc::PARSE_OK);
ASSERT_TRUE(r.is_nil());
r.Clear();
ASSERT_TRUE(r.set_bulk_string("abc'hello world", &arena));
ASSERT_TRUE(r.SerializeToIOBuf(&buf));
ASSERT_STREQ(buf.to_string().c_str(), "$15\r\nabc'hello world\r\n");
r.Clear();
err = r.ConsumePartialIOBuf(buf, &arena);
ASSERT_EQ(err, brpc::PARSE_OK);
ASSERT_TRUE(r.is_string());
ASSERT_STREQ(r.c_str(), "abc'hello world");
}
// integer
{
brpc::RedisReply r;
butil::IOBuf buf;
int t = 2;
int input[] = { -1, 1234567 };
const char* output[] = { ":-1\r\n", ":1234567\r\n" };
for (int i = 0; i < t; ++i) {
r.Clear();
ASSERT_TRUE(r.set_integer(input[i]));
ASSERT_TRUE(r.SerializeToIOBuf(&buf));
ASSERT_STREQ(buf.to_string().c_str(), output[i]);
r.Clear();
brpc::ParseError err = r.ConsumePartialIOBuf(buf, &arena);
ASSERT_EQ(err, brpc::PARSE_OK);
ASSERT_TRUE(r.is_integer());
ASSERT_EQ(r.integer(), input[i]);
}
}
// array
{
brpc::RedisReply r;
butil::IOBuf buf;
ASSERT_TRUE(r.set_array(3, &arena));
brpc::RedisReply& sub_reply = r[0];
sub_reply.set_array(2, &arena);
sub_reply[0].set_bulk_string("hello, it's me", &arena);
sub_reply[1].set_integer(422);
r[1].set_bulk_string("To go over everything", &arena);
r[2].set_integer(1);
ASSERT_TRUE(r[3].is_nil());
ASSERT_TRUE(r.SerializeToIOBuf(&buf));
ASSERT_STREQ(buf.to_string().c_str(),
"*3\r\n*2\r\n$14\r\nhello, it's me\r\n:422\r\n$21\r\n"
"To go over everything\r\n:1\r\n");
r.Clear();
ASSERT_EQ(r.ConsumePartialIOBuf(buf, &arena), brpc::PARSE_OK);
ASSERT_TRUE(r.is_array());
ASSERT_EQ(3ul, r.size());
ASSERT_TRUE(r[0].is_array());
ASSERT_EQ(2ul, r[0].size());
ASSERT_TRUE(r[0][0].is_string());
ASSERT_STREQ(r[0][0].c_str(), "hello, it's me");
ASSERT_TRUE(r[0][1].is_integer());
ASSERT_EQ(r[0][1].integer(), 422);
ASSERT_TRUE(r[1].is_string());
ASSERT_STREQ(r[1].c_str(), "To go over everything");
ASSERT_TRUE(r[2].is_integer());
ASSERT_EQ(1, r[2].integer());
r.Clear();
// nil array
ASSERT_TRUE(r.set_array(-1, &arena));
ASSERT_TRUE(r.SerializeToIOBuf(&buf));
ASSERT_STREQ(buf.to_string().c_str(), "*-1\r\n");
ASSERT_EQ(r.ConsumePartialIOBuf(buf, &arena), brpc::PARSE_OK);
ASSERT_TRUE(r.is_nil());
}
}
butil::Mutex s_mutex;
std::unordered_map<std::string, std::string> m;
std::unordered_map<std::string, int64_t> int_map;
class RedisServiceImpl;
class RedisConnectionImpl : public brpc::RedisConnection {
public:
......@@ -555,12 +669,49 @@ public:
: _rs(rs) { }
void OnRedisMessage(const brpc::RedisReply& message, brpc::RedisReply* output, butil::Arena* arena) {
LOG(INFO) << "OnRedisMessage, m=" << message;
if (!message.is_array() || message.size() == 0) {
output->set_error("command not valid array", arena);
return;
}
const brpc::RedisReply& comm = message[0];
if (!comm.is_string()) {
output->set_error("command not string", arena);
return;
}
std::string s(comm.c_str());
std::transform(s.begin(), s.end(), s.begin(), [](char c){ return std::tolower(c); });
if (s == "set") {
std::string key = message[1].c_str();
std::string value = message[2].c_str();
m[key] = value;
output->set_status("OK", arena);
return;
} else if (s == "get") {
std::string key = message[1].c_str();
auto it = m.find(key);
if (it != m.end()) {
output->set_bulk_string(it->second, arena);
} else {
output->set_nil_string();
}
butil::IOBuf buf;
output->SerializeToIOBuf(&buf);
return;
} else if (s == "incr") {
int64_t value;
s_mutex.lock();
value = ++int_map[message[1].c_str()];
s_mutex.unlock();
output->set_integer(value);
return;
}
char buf[128];
snprintf(buf, sizeof(buf), "ERR unknown command `%s`", s.c_str());
output->set_error(buf, arena);
return;
}
public:
private:
RedisServiceImpl* _rs;
};
......@@ -568,16 +719,18 @@ class RedisServiceImpl : public brpc::RedisService {
public:
// @RedisService
brpc::RedisConnection* NewConnection() {
call_count++;
return new RedisConnectionImpl(this);
}
std::map<std::string, std::string> m;
int call_count = 0;
};
TEST_F(RedisTest, server) {
TEST_F(RedisTest, server_sanity) {
brpc::Server server;
brpc::ServerOptions server_options;
server_options.redis_service = new RedisServiceImpl;
RedisServiceImpl* rsimpl = new RedisServiceImpl;
server_options.redis_service = rsimpl;
brpc::PortRange pr(8081, 8900);
ASSERT_EQ(0, server.Start("127.0.0.1", pr, &server_options));
......@@ -585,16 +738,80 @@ TEST_F(RedisTest, server) {
options.protocol = brpc::PROTOCOL_REDIS;
brpc::Channel channel;
ASSERT_EQ(0, channel.Init("127.0.0.1", server.listen_address().port, &options));
brpc::RedisRequest request;
brpc::RedisResponse response;
brpc::Controller cntl;
ASSERT_TRUE(request.AddCommand("get hello"));
ASSERT_TRUE(request.AddCommand("get hello2"));
ASSERT_TRUE(request.AddCommand("set key1 value1"));
ASSERT_TRUE(request.AddCommand("get key1"));
ASSERT_TRUE(request.AddCommand("set key2 value2"));
ASSERT_TRUE(request.AddCommand("get key2"));
ASSERT_TRUE(request.AddCommand("xxxcommand key2"));
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(1, response.reply_size());
ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(0).type());
ASSERT_EQ("OK", response.reply(0).data());
ASSERT_EQ(7, response.reply_size());
ASSERT_EQ(brpc::REDIS_REPLY_NIL, response.reply(0).type());
ASSERT_EQ(brpc::REDIS_REPLY_NIL, response.reply(1).type());
ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(2).type());
ASSERT_STREQ("OK", response.reply(2).c_str());
ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(3).type());
ASSERT_STREQ("value1", response.reply(3).c_str());
ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(4).type());
ASSERT_STREQ("OK", response.reply(4).c_str());
ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(5).type());
ASSERT_STREQ("value2", response.reply(5).c_str());
ASSERT_EQ(brpc::REDIS_REPLY_ERROR, response.reply(6).type());
ASSERT_TRUE(butil::StringPiece(response.reply(6).error_message()).starts_with("ERR unknown command"));
ASSERT_EQ(rsimpl->call_count, 1);
}
void* incr_thread(void* arg) {
brpc::Channel* c = static_cast<brpc::Channel*>(arg);
for (int i = 0; i < 5000; ++i) {
brpc::RedisRequest request;
brpc::RedisResponse response;
brpc::Controller cntl;
EXPECT_TRUE(request.AddCommand("incr count"));
c->CallMethod(NULL, &cntl, &request, &response, NULL);
EXPECT_FALSE(cntl.Failed()) << cntl.ErrorText();
EXPECT_EQ(1, response.reply_size());
EXPECT_TRUE(response.reply(0).is_integer());
}
return NULL;
}
TEST_F(RedisTest, server_concurrency) {
int N = 10;
brpc::Server server;
brpc::ServerOptions server_options;
RedisServiceImpl* rsimpl = new RedisServiceImpl;
server_options.redis_service = rsimpl;
brpc::PortRange pr(8081, 8900);
ASSERT_EQ(0, server.Start("127.0.0.1", pr, &server_options));
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_REDIS;
options.connection_type = "pooled";
std::vector<bthread_t> bths;
std::vector<brpc::Channel*> channels;
for (int i = 0; i < N; ++i) {
channels.push_back(new brpc::Channel);
ASSERT_EQ(0, channels.back()->Init("127.0.0.1", server.listen_address().port, &options));
bthread_t bth;
ASSERT_EQ(bthread_start_background(&bth, NULL, incr_thread, channels.back()), 0);
bths.push_back(bth);
}
for (int i = 0; i < N; ++i) {
bthread_join(bths[i], NULL);
delete channels[i];
}
ASSERT_EQ(int_map["count"], 10 * 5000LL);
ASSERT_EQ(N, rsimpl->call_count);
}
} //namespace
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment