Commit 7ba68500 authored by zhujiashun's avatar zhujiashun

redis_server_protocol: use IOBufAppender in redisreply::SerializeTo

parent eee11d90
...@@ -81,7 +81,7 @@ int ConsumeCommand(RedisConnContext* ctx, ...@@ -81,7 +81,7 @@ int ConsumeCommand(RedisConnContext* ctx,
const std::string& next_command, const std::string& next_command,
butil::Arena* arena, butil::Arena* arena,
bool is_last, bool is_last,
butil::IOBuf* sendbuf) { butil::IOBufAppender* appender) {
RedisReply output(arena); RedisReply output(arena);
RedisCommandHandler::Result result = RedisCommandHandler::OK; RedisCommandHandler::Result result = RedisCommandHandler::OK;
if (ctx->transaction_handler) { if (ctx->transaction_handler) {
...@@ -124,14 +124,14 @@ int ConsumeCommand(RedisConnContext* ctx, ...@@ -124,14 +124,14 @@ int ConsumeCommand(RedisConnContext* ctx,
return -1; return -1;
} }
for (int i = 0; i < (int)output.size(); ++i) { for (int i = 0; i < (int)output.size(); ++i) {
output[i].SerializeTo(sendbuf); output[i].SerializeTo(appender);
} }
ctx->batched_size = 0; ctx->batched_size = 0;
} else { } else {
output.SerializeTo(sendbuf); output.SerializeTo(appender);
} }
} else if (result == RedisCommandHandler::CONTINUE) { } else if (result == RedisCommandHandler::CONTINUE) {
output.SerializeTo(sendbuf); output.SerializeTo(appender);
} else if (result == RedisCommandHandler::BATCHED) { } else if (result == RedisCommandHandler::BATCHED) {
// just do nothing and wait handler to return OK. // just do nothing and wait handler to return OK.
} else { } else {
...@@ -171,7 +171,7 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket, ...@@ -171,7 +171,7 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
} }
butil::Arena arena; butil::Arena arena;
std::vector<const char*> current_commands; std::vector<const char*> current_commands;
butil::IOBuf sendbuf; butil::IOBufAppender appender;
ParseError err = PARSE_OK; ParseError err = PARSE_OK;
err = ctx->parser.Consume(*source, &current_commands, &arena); err = ctx->parser.Consume(*source, &current_commands, &arena);
...@@ -187,15 +187,17 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket, ...@@ -187,15 +187,17 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
// next_commands must have at least one element, otherwise parse.Consume() // next_commands must have at least one element, otherwise parse.Consume()
// should return error. // should return error.
if (ConsumeCommand(ctx, current_commands, next_commands[0], &arena, if (ConsumeCommand(ctx, current_commands, next_commands[0], &arena,
false, &sendbuf) != 0) { false, &appender) != 0) {
return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG); return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
} }
current_commands.swap(next_commands); current_commands.swap(next_commands);
} }
if (ConsumeCommand(ctx, current_commands, "", &arena, if (ConsumeCommand(ctx, current_commands, "", &arena,
true /* must be last message */, &sendbuf) != 0) { true /* must be last message */, &appender) != 0) {
return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG); return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
} }
butil::IOBuf sendbuf;
appender.move_to(sendbuf);
CHECK(!sendbuf.empty()); CHECK(!sendbuf.empty());
Socket::WriteOptions wopt; Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true; wopt.ignore_eovercrowded = true;
......
...@@ -39,61 +39,61 @@ const char* RedisReplyTypeToString(RedisReplyType type) { ...@@ -39,61 +39,61 @@ const char* RedisReplyTypeToString(RedisReplyType type) {
} }
} }
bool RedisReply::SerializeTo(butil::IOBuf* buf) { bool RedisReply::SerializeTo(butil::IOBufAppender* appender) {
char prefix_buf[24]; // should be enouth for '<type><integer>\r\n" char prefix_buf[24]; // should be enough for '<type><integer>\r\n"
size_t len = 0; size_t len = 0;
switch (_type) { switch (_type) {
case REDIS_REPLY_ERROR: case REDIS_REPLY_ERROR:
// fall through // fall through
case REDIS_REPLY_STATUS: case REDIS_REPLY_STATUS:
buf->push_back((_type == REDIS_REPLY_ERROR)? '-' : '+'); appender->push_back((_type == REDIS_REPLY_ERROR)? '-' : '+');
if (_length < (int)sizeof(_data.short_str)) { if (_length < (int)sizeof(_data.short_str)) {
buf->append(_data.short_str, _length); appender->append(_data.short_str, _length);
} else { } else {
buf->append(_data.long_str, _length); appender->append(_data.long_str, _length);
} }
buf->append("\r\n"); appender->append("\r\n");
break; break;
case REDIS_REPLY_INTEGER: case REDIS_REPLY_INTEGER:
prefix_buf[0] = ':'; prefix_buf[0] = ':';
len = butil::AppendDecimal(&prefix_buf[1], _data.integer); len = butil::AppendDecimal(&prefix_buf[1], _data.integer);
prefix_buf[len + 1] = '\r'; prefix_buf[len + 1] = '\r';
prefix_buf[len + 2] = '\n'; prefix_buf[len + 2] = '\n';
buf->append(prefix_buf, len + 3 /* 1 for ':', 2 for "\r\n" */); appender->append(prefix_buf, len + 3 /* 1 for ':', 2 for "\r\n" */);
break; break;
case REDIS_REPLY_STRING: case REDIS_REPLY_STRING:
prefix_buf[0] = '$'; prefix_buf[0] = '$';
len = butil::AppendDecimal(&prefix_buf[1], _length); len = butil::AppendDecimal(&prefix_buf[1], _length);
prefix_buf[len + 1] = '\r'; prefix_buf[len + 1] = '\r';
prefix_buf[len + 2] = '\n'; prefix_buf[len + 2] = '\n';
buf->append(prefix_buf, len + 3 /* 1 for ':', 2 for "\r\n" */); appender->append(prefix_buf, len + 3 /* 1 for ':', 2 for "\r\n" */);
if (_length == npos) { if (_length == npos) {
break; break;
} }
if (_length < (int)sizeof(_data.short_str)) { if (_length < (int)sizeof(_data.short_str)) {
buf->append(_data.short_str, _length); appender->append(_data.short_str, _length);
} else { } else {
buf->append(_data.long_str, _length); appender->append(_data.long_str, _length);
} }
buf->append("\r\n"); appender->append("\r\n");
break; break;
case REDIS_REPLY_ARRAY: case REDIS_REPLY_ARRAY:
prefix_buf[0] = '*'; prefix_buf[0] = '*';
len = butil::AppendDecimal(&prefix_buf[1], _length); len = butil::AppendDecimal(&prefix_buf[1], _length);
prefix_buf[len + 1] = '\r'; prefix_buf[len + 1] = '\r';
prefix_buf[len + 2] = '\n'; prefix_buf[len + 2] = '\n';
buf->append(prefix_buf, len + 3 /* 1 for ':', 2 for "\r\n" */); appender->append(prefix_buf, len + 3 /* 1 for ':', 2 for "\r\n" */);
if (_length == npos) { if (_length == npos) {
break; break;
} }
for (int i = 0; i < _length; ++i) { for (int i = 0; i < _length; ++i) {
if (!_data.array.replies[i].SerializeTo(buf)) { if (!_data.array.replies[i].SerializeTo(appender)) {
return false; return false;
} }
} }
break; break;
case REDIS_REPLY_NIL: case REDIS_REPLY_NIL:
buf->append("$-1\r\n"); appender->append("$-1\r\n");
break; break;
default: default:
CHECK(false) << "unknown redis type=" << _type; CHECK(false) << "unknown redis type=" << _type;
......
...@@ -122,8 +122,8 @@ public: ...@@ -122,8 +122,8 @@ public:
// Returns PARSE_ERROR_ABSOLUTELY_WRONG if the parsing failed. // Returns PARSE_ERROR_ABSOLUTELY_WRONG if the parsing failed.
ParseError ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* arena); ParseError ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* arena);
// Serialize to buf using redis protocol // Serialize to iobuf appender using redis protocol
bool SerializeTo(butil::IOBuf* buf); bool SerializeTo(butil::IOBufAppender* appender);
// Swap internal fields with another reply. // Swap internal fields with another reply.
void Swap(RedisReply& other); void Swap(RedisReply& other);
......
...@@ -637,8 +637,10 @@ TEST_F(RedisTest, redis_reply_codec) { ...@@ -637,8 +637,10 @@ TEST_F(RedisTest, redis_reply_codec) {
{ {
brpc::RedisReply r(&arena); brpc::RedisReply r(&arena);
butil::IOBuf buf; butil::IOBuf buf;
butil::IOBufAppender appender;
r.SetStatus("OK"); r.SetStatus("OK");
ASSERT_TRUE(r.SerializeTo(&buf)); ASSERT_TRUE(r.SerializeTo(&appender));
appender.move_to(buf);
ASSERT_STREQ(buf.to_string().c_str(), "+OK\r\n"); ASSERT_STREQ(buf.to_string().c_str(), "+OK\r\n");
ASSERT_STREQ(r.c_str(), "OK"); ASSERT_STREQ(r.c_str(), "OK");
r.Clear(); r.Clear();
...@@ -651,8 +653,10 @@ TEST_F(RedisTest, redis_reply_codec) { ...@@ -651,8 +653,10 @@ TEST_F(RedisTest, redis_reply_codec) {
{ {
brpc::RedisReply r(&arena); brpc::RedisReply r(&arena);
butil::IOBuf buf; butil::IOBuf buf;
butil::IOBufAppender appender;
r.SetError("not exist \'key\'"); r.SetError("not exist \'key\'");
ASSERT_TRUE(r.SerializeTo(&buf)); ASSERT_TRUE(r.SerializeTo(&appender));
appender.move_to(buf);
ASSERT_STREQ(buf.to_string().c_str(), "-not exist \'key\'\r\n"); ASSERT_STREQ(buf.to_string().c_str(), "-not exist \'key\'\r\n");
r.Clear(); r.Clear();
brpc::ParseError err = r.ConsumePartialIOBuf(buf, &arena); brpc::ParseError err = r.ConsumePartialIOBuf(buf, &arena);
...@@ -664,8 +668,10 @@ TEST_F(RedisTest, redis_reply_codec) { ...@@ -664,8 +668,10 @@ TEST_F(RedisTest, redis_reply_codec) {
{ {
brpc::RedisReply r(&arena); brpc::RedisReply r(&arena);
butil::IOBuf buf; butil::IOBuf buf;
butil::IOBufAppender appender;
r.SetNullString(); r.SetNullString();
ASSERT_TRUE(r.SerializeTo(&buf)); ASSERT_TRUE(r.SerializeTo(&appender));
appender.move_to(buf);
ASSERT_STREQ(buf.to_string().c_str(), "$-1\r\n"); ASSERT_STREQ(buf.to_string().c_str(), "$-1\r\n");
r.Clear(); r.Clear();
brpc::ParseError err = r.ConsumePartialIOBuf(buf, &arena); brpc::ParseError err = r.ConsumePartialIOBuf(buf, &arena);
...@@ -674,7 +680,8 @@ TEST_F(RedisTest, redis_reply_codec) { ...@@ -674,7 +680,8 @@ TEST_F(RedisTest, redis_reply_codec) {
r.Clear(); r.Clear();
r.SetString("abcde'hello world"); r.SetString("abcde'hello world");
ASSERT_TRUE(r.SerializeTo(&buf)); ASSERT_TRUE(r.SerializeTo(&appender));
appender.move_to(buf);
ASSERT_STREQ(buf.to_string().c_str(), "$17\r\nabcde'hello world\r\n"); ASSERT_STREQ(buf.to_string().c_str(), "$17\r\nabcde'hello world\r\n");
ASSERT_STREQ(r.c_str(), "abcde'hello world"); ASSERT_STREQ(r.c_str(), "abcde'hello world");
r.Clear(); r.Clear();
...@@ -687,13 +694,15 @@ TEST_F(RedisTest, redis_reply_codec) { ...@@ -687,13 +694,15 @@ TEST_F(RedisTest, redis_reply_codec) {
{ {
brpc::RedisReply r(&arena); brpc::RedisReply r(&arena);
butil::IOBuf buf; butil::IOBuf buf;
butil::IOBufAppender appender;
int t = 2; int t = 2;
int input[] = { -1, 1234567 }; int input[] = { -1, 1234567 };
const char* output[] = { ":-1\r\n", ":1234567\r\n" }; const char* output[] = { ":-1\r\n", ":1234567\r\n" };
for (int i = 0; i < t; ++i) { for (int i = 0; i < t; ++i) {
r.Clear(); r.Clear();
r.SetInteger(input[i]); r.SetInteger(input[i]);
ASSERT_TRUE(r.SerializeTo(&buf)); ASSERT_TRUE(r.SerializeTo(&appender));
appender.move_to(buf);
ASSERT_STREQ(buf.to_string().c_str(), output[i]); ASSERT_STREQ(buf.to_string().c_str(), output[i]);
r.Clear(); r.Clear();
brpc::ParseError err = r.ConsumePartialIOBuf(buf, &arena); brpc::ParseError err = r.ConsumePartialIOBuf(buf, &arena);
...@@ -706,6 +715,7 @@ TEST_F(RedisTest, redis_reply_codec) { ...@@ -706,6 +715,7 @@ TEST_F(RedisTest, redis_reply_codec) {
{ {
brpc::RedisReply r(&arena); brpc::RedisReply r(&arena);
butil::IOBuf buf; butil::IOBuf buf;
butil::IOBufAppender appender;
r.SetArray(3); r.SetArray(3);
brpc::RedisReply& sub_reply = r[0]; brpc::RedisReply& sub_reply = r[0];
sub_reply.SetArray(2); sub_reply.SetArray(2);
...@@ -714,7 +724,8 @@ TEST_F(RedisTest, redis_reply_codec) { ...@@ -714,7 +724,8 @@ TEST_F(RedisTest, redis_reply_codec) {
r[1].SetString("To go over everything"); r[1].SetString("To go over everything");
r[2].SetInteger(1); r[2].SetInteger(1);
ASSERT_TRUE(r[3].is_nil()); ASSERT_TRUE(r[3].is_nil());
ASSERT_TRUE(r.SerializeTo(&buf)); ASSERT_TRUE(r.SerializeTo(&appender));
appender.move_to(buf);
ASSERT_STREQ(buf.to_string().c_str(), ASSERT_STREQ(buf.to_string().c_str(),
"*3\r\n*2\r\n$14\r\nhello, it's me\r\n:422\r\n$21\r\n" "*3\r\n*2\r\n$14\r\nhello, it's me\r\n:422\r\n$21\r\n"
"To go over everything\r\n:1\r\n"); "To go over everything\r\n:1\r\n");
...@@ -736,7 +747,8 @@ TEST_F(RedisTest, redis_reply_codec) { ...@@ -736,7 +747,8 @@ TEST_F(RedisTest, redis_reply_codec) {
r.Clear(); r.Clear();
// null array // null array
r.SetNullArray(); r.SetNullArray();
ASSERT_TRUE(r.SerializeTo(&buf)); ASSERT_TRUE(r.SerializeTo(&appender));
appender.move_to(buf);
ASSERT_STREQ(buf.to_string().c_str(), "*-1\r\n"); ASSERT_STREQ(buf.to_string().c_str(), "*-1\r\n");
ASSERT_EQ(r.ConsumePartialIOBuf(buf, &arena), brpc::PARSE_OK); ASSERT_EQ(r.ConsumePartialIOBuf(buf, &arena), brpc::PARSE_OK);
ASSERT_TRUE(r.is_nil()); ASSERT_TRUE(r.is_nil());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment