Commit 64393017 authored by gejun's avatar gejun

Responses of redis and memcache support MergeFrom

parent fbb588d9
...@@ -396,6 +396,10 @@ void MemcacheResponse::MergeFrom(const ::google::protobuf::Message& from) { ...@@ -396,6 +396,10 @@ void MemcacheResponse::MergeFrom(const ::google::protobuf::Message& from) {
void MemcacheResponse::MergeFrom(const MemcacheResponse& from) { void MemcacheResponse::MergeFrom(const MemcacheResponse& from) {
GOOGLE_CHECK_NE(&from, this); GOOGLE_CHECK_NE(&from, this);
_err = from._err;
// responses of memcached according to their binary layout, should be
// directly concatenatible.
_buf.append(from._buf);
} }
void MemcacheResponse::CopyFrom(const ::google::protobuf::Message& from) { void MemcacheResponse::CopyFrom(const ::google::protobuf::Message& from) {
......
...@@ -195,6 +195,7 @@ void NsheadMessage::MergeFrom(const ::google::protobuf::Message& from) { ...@@ -195,6 +195,7 @@ void NsheadMessage::MergeFrom(const ::google::protobuf::Message& from) {
void NsheadMessage::MergeFrom(const NsheadMessage& from) { void NsheadMessage::MergeFrom(const NsheadMessage& from) {
GOOGLE_CHECK_NE(&from, this); GOOGLE_CHECK_NE(&from, this);
// No way to merge two nshead messages, just overwrite.
head = from.head; head = from.head;
body = from.body; body = from.body;
} }
......
...@@ -448,6 +448,35 @@ void RedisResponse::MergeFrom(const ::google::protobuf::Message& from) { ...@@ -448,6 +448,35 @@ void RedisResponse::MergeFrom(const ::google::protobuf::Message& from) {
void RedisResponse::MergeFrom(const RedisResponse& from) { void RedisResponse::MergeFrom(const RedisResponse& from) {
GOOGLE_CHECK_NE(&from, this); GOOGLE_CHECK_NE(&from, this);
if (from._nreply == 0) {
return;
}
_cached_size_ += from._cached_size_;
if (_nreply == 0) {
_first_reply.CopyFromDifferentArena(from._first_reply, &_arena);
}
const int new_nreply = _nreply + from._nreply;
if (new_nreply == 1) {
_nreply = new_nreply;
return;
}
RedisReply* new_others =
(RedisReply*)_arena.allocate(sizeof(RedisReply) * (new_nreply - 1));
for (int i = 0; i < new_nreply - 1; ++i) {
new (new_others + i) RedisReply;
}
int new_other_index = 0;
for (int i = 1; i < _nreply; ++i) {
new_others[new_other_index++].CopyFromSameArena(
_other_replies[i - 1]);
}
for (int i = !_nreply; i < from._nreply; ++i) {
new_others[new_other_index++].CopyFromDifferentArena(
from.reply(i), &_arena);
}
DCHECK_EQ(new_nreply - 1, new_other_index);
_other_replies = new_others;
_nreply = new_nreply;
} }
void RedisResponse::CopyFrom(const ::google::protobuf::Message& from) { void RedisResponse::CopyFrom(const ::google::protobuf::Message& from) {
......
...@@ -210,7 +210,7 @@ private: ...@@ -210,7 +210,7 @@ private:
RedisReply _first_reply; RedisReply _first_reply;
RedisReply* _other_replies; RedisReply* _other_replies;
butil::Arena _arena; butil::Arena _arena;
uint32_t _nreply; int _nreply;
mutable int _cached_size_; mutable int _cached_size_;
friend void protobuf_AddDesc_baidu_2frpc_2fredis_5fbase_2eproto_impl(); friend void protobuf_AddDesc_baidu_2frpc_2fredis_5fbase_2eproto_impl();
......
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
#include "butil/logging.h" #include "butil/logging.h"
#include "brpc/redis_reply.h" #include "brpc/redis_reply.h"
namespace brpc { namespace brpc {
//BAIDU_CASSERT(sizeof(RedisReply) == 24, size_match); //BAIDU_CASSERT(sizeof(RedisReply) == 24, size_match);
...@@ -279,4 +278,52 @@ void RedisReply::Print(std::ostream& os) const { ...@@ -279,4 +278,52 @@ void RedisReply::Print(std::ostream& os) const {
} }
} }
void RedisReply::CopyFromDifferentArena(const RedisReply& other,
butil::Arena* arena) {
_type = other._type;
_length = other._length;
switch (_type) {
case REDIS_REPLY_ARRAY: {
RedisReply* subs = (RedisReply*)arena->allocate(sizeof(RedisReply) * _length);
if (subs == NULL) {
LOG(FATAL) << "Fail to allocate RedisReply[" << _length << "]";
return;
}
for (uint32_t i = 0; i < _length; ++i) {
new (&subs[i]) RedisReply;
}
_data.array.last_index = other._data.array.last_index;
if (_data.array.last_index > 0) {
for (int i = 0; i < _data.array.last_index; ++i) {
subs[i].CopyFromDifferentArena(other._data.array.replies[i], arena);
}
}
_data.array.replies = subs;
}
break;
case REDIS_REPLY_INTEGER:
_data.integer = other._data.integer;
break;
case REDIS_REPLY_NIL:
break;
case REDIS_REPLY_STRING:
// fall through
case REDIS_REPLY_ERROR:
// fall through
case REDIS_REPLY_STATUS:
if (_length < sizeof(_data.short_str)) {
memcpy(_data.short_str, other._data.short_str, _length + 1);
} else {
char* d = (char*)arena->allocate((_length/8 + 1)*8);
if (d == NULL) {
LOG(FATAL) << "Fail to allocate string[" << _length << "]";
return;
}
memcpy(d, other._data.long_str, _length + 1);
_data.long_str = d;
}
break;
}
}
} // namespace brpc } // namespace brpc
...@@ -97,9 +97,17 @@ public: ...@@ -97,9 +97,17 @@ public:
// Print fields into ostream // Print fields into ostream
void Print(std::ostream& os) const; void Print(std::ostream& os) const;
// Copy from another reply allocating on a different Arena, and allocate
// required memory with `self_arena'.
void CopyFromDifferentArena(const RedisReply& other,
butil::Arena* self_arena);
// Copy from another reply allocating on a same Arena.
void CopyFromSameArena(const RedisReply& other);
private: private:
// RedisReply does not own the memory (pointed by internal pointers), // RedisReply does not own the memory of fields, copying must be done
// Copying is extremely dangerous and must be disabled. // by calling CopyFrom[Different|Same]Arena.
DISALLOW_COPY_AND_ASSIGN(RedisReply); DISALLOW_COPY_AND_ASSIGN(RedisReply);
RedisReplyType _type; RedisReplyType _type;
...@@ -211,7 +219,13 @@ inline void RedisReply::Clear() { ...@@ -211,7 +219,13 @@ inline void RedisReply::Clear() {
_data.array.replies = NULL; _data.array.replies = NULL;
} }
} // namespace brpc inline void RedisReply::CopyFromSameArena(const RedisReply& other) {
_type = other._type;
_length = other._length;
_data.padding[0] = other._data.padding[0];
_data.padding[1] = other._data.padding[1];
}
} // namespace brpc
#endif // BRPC_REDIS_H #endif // BRPC_REDIS_H
...@@ -63,6 +63,54 @@ protected: ...@@ -63,6 +63,54 @@ protected:
void TearDown() {} void TearDown() {}
}; };
void AssertReplyEqual(const brpc::RedisReply& reply1,
const brpc::RedisReply& reply2) {
if (&reply1 == &reply2) {
return;
}
CHECK_EQ(reply1.type(), reply2.type());
switch (reply1.type()) {
case brpc::REDIS_REPLY_ARRAY:
ASSERT_EQ(reply1.size(), reply2.size());
for (size_t j = 0; j < reply1.size(); ++j) {
ASSERT_NE(&reply1[j], &reply2[j]); // from different arena
AssertReplyEqual(reply1[j], reply2[j]);
}
break;
case brpc::REDIS_REPLY_INTEGER:
ASSERT_EQ(reply1.integer(), reply2.integer());
break;
case brpc::REDIS_REPLY_NIL:
break;
case brpc::REDIS_REPLY_STRING:
// fall through
case brpc::REDIS_REPLY_STATUS:
ASSERT_NE(reply1.c_str(), reply2.c_str()); // from different arena
ASSERT_STREQ(reply1.c_str(), reply2.c_str());
break;
case brpc::REDIS_REPLY_ERROR:
ASSERT_NE(reply1.error_message(), reply2.error_message()); // from different arena
ASSERT_STREQ(reply1.error_message(), reply2.error_message());
break;
}
}
void AssertResponseEqual(const brpc::RedisResponse& r1,
const brpc::RedisResponse& r2,
int repeated_times = 1) {
if (&r1 == &r2) {
ASSERT_EQ(repeated_times, 1);
return;
}
ASSERT_EQ(r2.reply_size()* repeated_times, r1.reply_size());
for (int j = 0; j < repeated_times; ++j) {
for (int i = 0; i < r2.reply_size(); ++i) {
ASSERT_NE(&r2.reply(i), &r1.reply(j * r2.reply_size() + i));
AssertReplyEqual(r2.reply(i), r1.reply(j * r2.reply_size() + i));
}
}
}
TEST_F(RedisTest, sanity) { TEST_F(RedisTest, sanity) {
brpc::ChannelOptions options; brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_REDIS; options.protocol = brpc::PROTOCOL_REDIS;
...@@ -175,6 +223,11 @@ TEST_F(RedisTest, keys_with_spaces) { ...@@ -175,6 +223,11 @@ TEST_F(RedisTest, keys_with_spaces) {
ASSERT_EQ("he2 he2 da2", response.reply(5).data()); ASSERT_EQ("he2 he2 da2", response.reply(5).data());
ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(6).type()); ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(6).type());
ASSERT_EQ("he3 he3 da3", response.reply(6).data()); ASSERT_EQ("he3 he3 da3", response.reply(6).data());
brpc::RedisResponse response2 = response;
AssertResponseEqual(response2, response);
response2.MergeFrom(response);
AssertResponseEqual(response2, response, 2);
} }
TEST_F(RedisTest, incr_and_decr) { TEST_F(RedisTest, incr_and_decr) {
...@@ -201,6 +254,11 @@ TEST_F(RedisTest, incr_and_decr) { ...@@ -201,6 +254,11 @@ TEST_F(RedisTest, incr_and_decr) {
ASSERT_EQ(10, response.reply(2).integer()); ASSERT_EQ(10, response.reply(2).integer());
ASSERT_EQ(brpc::REDIS_REPLY_INTEGER, response.reply(3).type()); ASSERT_EQ(brpc::REDIS_REPLY_INTEGER, response.reply(3).type());
ASSERT_EQ(-10, response.reply(3).integer()); ASSERT_EQ(-10, response.reply(3).integer());
brpc::RedisResponse response2 = response;
AssertResponseEqual(response2, response);
response2.MergeFrom(response);
AssertResponseEqual(response2, response, 2);
} }
TEST_F(RedisTest, by_components) { TEST_F(RedisTest, by_components) {
...@@ -233,6 +291,11 @@ TEST_F(RedisTest, by_components) { ...@@ -233,6 +291,11 @@ TEST_F(RedisTest, by_components) {
ASSERT_EQ(10, response.reply(2).integer()); ASSERT_EQ(10, response.reply(2).integer());
ASSERT_EQ(brpc::REDIS_REPLY_INTEGER, response.reply(3).type()); ASSERT_EQ(brpc::REDIS_REPLY_INTEGER, response.reply(3).type());
ASSERT_EQ(-10, response.reply(3).integer()); ASSERT_EQ(-10, response.reply(3).integer());
brpc::RedisResponse response2 = response;
AssertResponseEqual(response2, response);
response2.MergeFrom(response);
AssertResponseEqual(response2, response, 2);
} }
} //namespace } //namespace
#endif // BAIDU_INTERNAL #endif // BAIDU_INTERNAL
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment