Commit 0eac0072 authored by zhujiashun's avatar zhujiashun

redis_server_protocol: remove default constructor of RedisReply

parent 72f953d3
......@@ -240,12 +240,13 @@ std::ostream& operator<<(std::ostream& os, const RedisRequest& r) {
}
RedisResponse::RedisResponse()
: ::google::protobuf::Message() {
: ::google::protobuf::Message()
, _first_reply(&_arena) {
SharedCtor();
}
RedisResponse::RedisResponse(const RedisResponse& from)
: ::google::protobuf::Message() {
: ::google::protobuf::Message()
, _first_reply(&_arena) {
SharedCtor();
MergeFrom(from);
}
......@@ -316,7 +317,7 @@ void RedisResponse::MergeFrom(const RedisResponse& from) {
}
_cached_size_ += from._cached_size_;
if (_nreply == 0) {
_first_reply.CopyFromDifferentArena(from._first_reply, &_arena);
_first_reply.CopyFromDifferentArena(from._first_reply);
}
const int new_nreply = _nreply + from._nreply;
if (new_nreply == 1) {
......@@ -326,7 +327,7 @@ void RedisResponse::MergeFrom(const RedisResponse& from) {
RedisReply* new_others =
(RedisReply*)_arena.allocate(sizeof(RedisReply) * (new_nreply - 1));
for (int i = 0; i < new_nreply - 1; ++i) {
new (new_others + i) RedisReply;
new (new_others + i) RedisReply(&_arena);
}
int new_other_index = 0;
for (int i = 1; i < _nreply; ++i) {
......@@ -334,8 +335,7 @@ void RedisResponse::MergeFrom(const RedisResponse& from) {
_other_replies[i - 1]);
}
for (int i = !_nreply; i < from._nreply; ++i) {
new_others[new_other_index++].CopyFromDifferentArena(
from.reply(i), &_arena);
new_others[new_other_index++].CopyFromDifferentArena(from.reply(i));
}
DCHECK_EQ(new_nreply - 1, new_other_index);
_other_replies = new_others;
......@@ -384,7 +384,7 @@ const ::google::protobuf::Descriptor* RedisResponse::descriptor() {
ParseError RedisResponse::ConsumePartialIOBuf(butil::IOBuf& buf, int reply_count) {
size_t oldsize = buf.size();
if (reply_size() == 0) {
ParseError err = _first_reply.ConsumePartialIOBuf(buf, &_arena);
ParseError err = _first_reply.ConsumePartialIOBuf(buf);
if (err != PARSE_OK) {
return err;
}
......@@ -402,11 +402,11 @@ ParseError RedisResponse::ConsumePartialIOBuf(butil::IOBuf& buf, int reply_count
return PARSE_ERROR_ABSOLUTELY_WRONG;
}
for (int i = 0; i < reply_count - 1; ++i) {
new (&_other_replies[i]) RedisReply;
new (&_other_replies[i]) RedisReply(&_arena);
}
}
for (int i = reply_size(); i < reply_count; ++i) {
ParseError err = _other_replies[i - 1].ConsumePartialIOBuf(buf, &_arena);
ParseError err = _other_replies[i - 1].ConsumePartialIOBuf(buf);
if (err != PARSE_OK) {
return err;
}
......
......@@ -165,7 +165,7 @@ public:
if (index < reply_size()) {
return (index == 0 ? _first_reply : _other_replies[index - 1]);
}
static RedisReply redis_nil;
static RedisReply redis_nil(NULL);
return redis_nil;
}
......
......@@ -92,13 +92,13 @@ bool RedisReply::SerializeTo(butil::IOBufAppender* appender) {
return true;
}
ParseError RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* arena) {
ParseError RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf) {
if (_type == REDIS_REPLY_ARRAY && _data.array.last_index >= 0) {
// The parsing was suspended while parsing sub replies,
// continue the parsing.
RedisReply* subs = (RedisReply*)_data.array.replies;
for (int i = _data.array.last_index; i < _length; ++i) {
ParseError err = subs[i].ConsumePartialIOBuf(buf, arena);
ParseError err = subs[i].ConsumePartialIOBuf(buf);
if (err != PARSE_OK) {
return err;
}
......@@ -136,7 +136,7 @@ ParseError RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* aren
str.copy_to_cstr(_data.short_str, (size_t)-1L, 1/*skip fc*/);
return PARSE_OK;
}
char* d = (char*)arena->allocate((len/8 + 1)*8);
char* d = (char*)_arena->allocate((len/8 + 1)*8);
if (d == NULL) {
LOG(FATAL) << "Fail to allocate string[" << len << "]";
return PARSE_ERROR_ABSOLUTELY_WRONG;
......@@ -196,7 +196,7 @@ ParseError RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* aren
buf.cutn(_data.short_str, len);
_data.short_str[len] = '\0';
} else {
char* d = (char*)arena->allocate((len/8 + 1)*8);
char* d = (char*)_arena->allocate((len/8 + 1)*8);
if (d == NULL) {
LOG(FATAL) << "Fail to allocate string[" << len << "]";
return PARSE_ERROR_ABSOLUTELY_WRONG;
......@@ -238,13 +238,13 @@ ParseError RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* aren
return PARSE_ERROR_ABSOLUTELY_WRONG;
}
// FIXME(gejun): Call allocate_aligned instead.
RedisReply* subs = (RedisReply*)arena->allocate(sizeof(RedisReply) * count);
RedisReply* subs = (RedisReply*)_arena->allocate(sizeof(RedisReply) * count);
if (subs == NULL) {
LOG(FATAL) << "Fail to allocate RedisReply[" << count << "]";
return PARSE_ERROR_ABSOLUTELY_WRONG;
}
for (int64_t i = 0; i < count; ++i) {
new (&subs[i]) RedisReply;
new (&subs[i]) RedisReply(_arena);
}
buf.pop_front(crlf_pos + 2/*CRLF*/);
_type = REDIS_REPLY_ARRAY;
......@@ -255,7 +255,7 @@ ParseError RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* aren
// be continued in next calls by tracking _data.array.last_index.
_data.array.last_index = 0;
for (int64_t i = 0; i < count; ++i) {
ParseError err = subs[i].ConsumePartialIOBuf(buf, arena);
ParseError err = subs[i].ConsumePartialIOBuf(buf);
if (err != PARSE_OK) {
return err;
}
......@@ -359,29 +359,28 @@ void RedisReply::Print(std::ostream& os) const {
}
}
void RedisReply::CopyFromDifferentArena(const RedisReply& other,
butil::Arena* arena) {
void RedisReply::CopyFromDifferentArena(const RedisReply& other) {
_type = other._type;
_length = other._length;
switch (_type) {
case REDIS_REPLY_ARRAY: {
RedisReply* subs = (RedisReply*)arena->allocate(sizeof(RedisReply) * _length);
RedisReply* subs = (RedisReply*)_arena->allocate(sizeof(RedisReply) * _length);
if (subs == NULL) {
LOG(FATAL) << "Fail to allocate RedisReply[" << _length << "]";
return;
}
for (int i = 0; i < _length; ++i) {
new (&subs[i]) RedisReply;
new (&subs[i]) RedisReply(_arena);
}
_data.array.last_index = other._data.array.last_index;
if (_data.array.last_index > 0) {
// incomplete state
for (int i = 0; i < _data.array.last_index; ++i) {
subs[i].CopyFromDifferentArena(other._data.array.replies[i], arena);
subs[i].CopyFromDifferentArena(other._data.array.replies[i]);
}
} else {
for (int i = 0; i < _length; ++i) {
subs[i].CopyFromDifferentArena(other._data.array.replies[i], arena);
subs[i].CopyFromDifferentArena(other._data.array.replies[i]);
}
}
_data.array.replies = subs;
......@@ -400,7 +399,7 @@ void RedisReply::CopyFromDifferentArena(const RedisReply& other,
if (_length < (int)sizeof(_data.short_str)) {
memcpy(_data.short_str, other._data.short_str, _length + 1);
} else {
char* d = (char*)arena->allocate((_length/8 + 1)*8);
char* d = (char*)_arena->allocate((_length/8 + 1)*8);
if (d == NULL) {
LOG(FATAL) << "Fail to allocate string[" << _length << "]";
return;
......
......@@ -44,10 +44,8 @@ const char* RedisReplyTypeToString(RedisReplyType);
// A reply from redis-server.
class RedisReply {
public:
// A default reply is a nil.
RedisReply();
// All SetXXX Method would allocate memory from *arena.
// The initial value for a reply is a nil.
// All needed memory is allocated on `arena'.
RedisReply(butil::Arena* arena);
// Type of the reply.
......@@ -109,8 +107,7 @@ public:
const RedisReply& operator[](size_t index) const;
RedisReply& operator[](size_t index);
// Parse from `buf' which may be incomplete and allocate needed memory
// on `arena'.
// Parse from `buf' which may be incomplete.
// Returns PARSE_OK when an intact reply is parsed and cut off from `buf'.
// Returns PARSE_ERROR_NOT_ENOUGH_DATA if data in `buf' is not enough to parse,
// and `buf' is guaranteed to be UNCHANGED so that you can call this
......@@ -120,7 +117,7 @@ public:
// reply. As a contrast, if the parsing needs `buf' to be intact,
// the complexity in worst case may be O(N^2).
// Returns PARSE_ERROR_ABSOLUTELY_WRONG if the parsing failed.
ParseError ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* arena);
ParseError ConsumePartialIOBuf(butil::IOBuf& buf);
// Serialize to iobuf appender using redis protocol
bool SerializeTo(butil::IOBufAppender* appender);
......@@ -134,12 +131,10 @@ public:
// Print fields into ostream
void Print(std::ostream& os) const;
// Copy from another reply allocating on a different Arena, and allocate
// required memory with `self_arena'.
void CopyFromDifferentArena(const RedisReply& other,
butil::Arena* self_arena);
// Copy from another reply allocating on `_arena', which is a deep copy.
void CopyFromDifferentArena(const RedisReply& other);
// Copy from another reply allocating on a same Arena.
// Copy from another reply allocating on a same Arena, which is a shallow copy.
void CopyFromSameArena(const RedisReply& other);
private:
......@@ -186,11 +181,6 @@ inline RedisReply::RedisReply(butil::Arena* arena)
Reset();
}
inline RedisReply::RedisReply()
: _arena(NULL) {
Reset();
}
inline bool RedisReply::is_nil() const {
return (_type == REDIS_REPLY_NIL || _length == npos);
}
......@@ -298,7 +288,7 @@ inline const RedisReply& RedisReply::operator[](size_t index) const {
if (is_array() && (int)index < _length) {
return _data.array.replies[index];
}
static RedisReply redis_nil;
static RedisReply redis_nil(NULL);
return redis_nil;
}
......
......@@ -644,7 +644,7 @@ TEST_F(RedisTest, redis_reply_codec) {
ASSERT_STREQ(buf.to_string().c_str(), "+OK\r\n");
ASSERT_STREQ(r.c_str(), "OK");
r.Clear();
brpc::ParseError err = r.ConsumePartialIOBuf(buf, &arena);
brpc::ParseError err = r.ConsumePartialIOBuf(buf);
ASSERT_EQ(err, brpc::PARSE_OK);
ASSERT_TRUE(r.is_string());
ASSERT_STREQ("OK", r.c_str());
......@@ -659,7 +659,7 @@ TEST_F(RedisTest, redis_reply_codec) {
appender.move_to(buf);
ASSERT_STREQ(buf.to_string().c_str(), "-not exist \'key\'\r\n");
r.Clear();
brpc::ParseError err = r.ConsumePartialIOBuf(buf, &arena);
brpc::ParseError err = r.ConsumePartialIOBuf(buf);
ASSERT_EQ(err, brpc::PARSE_OK);
ASSERT_TRUE(r.is_error());
ASSERT_STREQ("not exist \'key\'", r.error_message());
......@@ -674,7 +674,7 @@ TEST_F(RedisTest, redis_reply_codec) {
appender.move_to(buf);
ASSERT_STREQ(buf.to_string().c_str(), "$-1\r\n");
r.Clear();
brpc::ParseError err = r.ConsumePartialIOBuf(buf, &arena);
brpc::ParseError err = r.ConsumePartialIOBuf(buf);
ASSERT_EQ(err, brpc::PARSE_OK);
ASSERT_TRUE(r.is_nil());
......@@ -685,7 +685,7 @@ TEST_F(RedisTest, redis_reply_codec) {
ASSERT_STREQ(buf.to_string().c_str(), "$17\r\nabcde'hello world\r\n");
ASSERT_STREQ(r.c_str(), "abcde'hello world");
r.Clear();
err = r.ConsumePartialIOBuf(buf, &arena);
err = r.ConsumePartialIOBuf(buf);
ASSERT_EQ(err, brpc::PARSE_OK);
ASSERT_TRUE(r.is_string());
ASSERT_STREQ(r.c_str(), "abcde'hello world");
......@@ -705,7 +705,7 @@ TEST_F(RedisTest, redis_reply_codec) {
appender.move_to(buf);
ASSERT_STREQ(buf.to_string().c_str(), output[i]);
r.Clear();
brpc::ParseError err = r.ConsumePartialIOBuf(buf, &arena);
brpc::ParseError err = r.ConsumePartialIOBuf(buf);
ASSERT_EQ(err, brpc::PARSE_OK);
ASSERT_TRUE(r.is_integer());
ASSERT_EQ(r.integer(), input[i]);
......@@ -730,7 +730,7 @@ TEST_F(RedisTest, redis_reply_codec) {
"*3\r\n*2\r\n$14\r\nhello, it's me\r\n:422\r\n$21\r\n"
"To go over everything\r\n:1\r\n");
r.Clear();
ASSERT_EQ(r.ConsumePartialIOBuf(buf, &arena), brpc::PARSE_OK);
ASSERT_EQ(r.ConsumePartialIOBuf(buf), brpc::PARSE_OK);
ASSERT_TRUE(r.is_array());
ASSERT_EQ(3ul, r.size());
ASSERT_TRUE(r[0].is_array());
......@@ -750,7 +750,7 @@ TEST_F(RedisTest, redis_reply_codec) {
ASSERT_TRUE(r.SerializeTo(&appender));
appender.move_to(buf);
ASSERT_STREQ(buf.to_string().c_str(), "*-1\r\n");
ASSERT_EQ(r.ConsumePartialIOBuf(buf, &arena), brpc::PARSE_OK);
ASSERT_EQ(r.ConsumePartialIOBuf(buf), brpc::PARSE_OK);
ASSERT_TRUE(r.is_nil());
}
......@@ -763,8 +763,8 @@ TEST_F(RedisTest, redis_reply_codec) {
sub_reply[0].SetString("hello, it's me");
sub_reply[1].SetInteger(422);
brpc::RedisReply r2(NULL);
r2.CopyFromDifferentArena(r, &arena);
brpc::RedisReply r2(&arena);
r2.CopyFromDifferentArena(r);
ASSERT_TRUE(r2.is_array());
ASSERT_EQ((int)r2[0].size(), 2);
ASSERT_STREQ(r2[0][0].c_str(), sub_reply[0].c_str());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment