Commit 0c96ecef authored by zhujiashun's avatar zhujiashun

redis_server_protocol: rename RedisReply to RedisMessage

parent 978814fe
......@@ -54,7 +54,7 @@ struct InputResponse : public InputMessageBase {
};
struct ExecutionQueueContext {
RedisReply message;
RedisMessage message;
SocketId socket_id;
butil::Arena arena;
};
......@@ -72,7 +72,7 @@ int Consume(void* meta, bthread::TaskIterator<ExecutionQueueContext*>& iter) {
LOG(WARNING) << "Fail to address redis socket";
continue;
}
RedisReply output;
RedisMessage output;
conn->OnRedisMessage(ctx->message, &output, &ctx->arena);
butil::IOBuf sendbuf;
output.SerializeToIOBuf(&sendbuf);
......@@ -132,7 +132,7 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
socket->reset_parsing_context(ctx);
}
std::unique_ptr<ExecutionQueueContext> task(new ExecutionQueueContext);
RedisReply message;
RedisMessage message;
ParseError err = message.ConsumePartialIOBuf(*source, &task->arena);
if (err != PARSE_OK) {
return MakeParseError(err);
......@@ -178,7 +178,7 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
if (pi.with_auth) {
if (msg->response.reply_size() != 1 ||
!(msg->response.reply(0).type() == brpc::REDIS_REPLY_STATUS &&
!(msg->response.reply(0).type() == brpc::REDIS_MESSAGE_STATUS &&
msg->response.reply(0).data().compare("OK") == 0)) {
LOG(ERROR) << "Redis Auth failed: " << msg->response;
return MakeParseError(PARSE_ERROR_NO_RESOURCE,
......
......@@ -322,10 +322,10 @@ void RedisResponse::MergeFrom(const RedisResponse& from) {
_nreply = new_nreply;
return;
}
RedisReply* new_others =
(RedisReply*)_arena.allocate(sizeof(RedisReply) * (new_nreply - 1));
RedisMessage* new_others =
(RedisMessage*)_arena.allocate(sizeof(RedisMessage) * (new_nreply - 1));
for (int i = 0; i < new_nreply - 1; ++i) {
new (new_others + i) RedisReply;
new (new_others + i) RedisMessage;
}
int new_other_index = 0;
for (int i = 1; i < _nreply; ++i) {
......@@ -394,14 +394,14 @@ ParseError RedisResponse::ConsumePartialIOBuf(butil::IOBuf& buf, int reply_count
}
if (reply_count > 1) {
if (_other_replies == NULL) {
_other_replies = (RedisReply*)_arena.allocate(
sizeof(RedisReply) * (reply_count - 1));
_other_replies = (RedisMessage*)_arena.allocate(
sizeof(RedisMessage) * (reply_count - 1));
if (_other_replies == NULL) {
LOG(ERROR) << "Fail to allocate RedisReply[" << reply_count -1 << "]";
LOG(ERROR) << "Fail to allocate RedisMessage[" << reply_count -1 << "]";
return PARSE_ERROR_ABSOLUTELY_WRONG;
}
for (int i = 0; i < reply_count - 1; ++i) {
new (&_other_replies[i]) RedisReply;
new (&_other_replies[i]) RedisMessage;
}
}
for (int i = reply_size(); i < reply_count; ++i) {
......
......@@ -25,7 +25,7 @@
#include "butil/strings/string_piece.h"
#include "butil/arena.h"
#include "brpc/proto_base.pb.h"
#include "brpc/redis_reply.h"
#include "brpc/redis_message.h"
#include "brpc/parse_result.h"
namespace brpc {
......@@ -157,11 +157,11 @@ public:
int reply_size() const { return _nreply; }
// Get index-th reply. If index is out-of-bound, nil reply is returned.
const RedisReply& reply(int index) const {
const RedisMessage& reply(int index) const {
if (index < reply_size()) {
return (index == 0 ? _first_reply : _other_replies[index - 1]);
}
static RedisReply redis_nil;
static RedisMessage redis_nil;
return redis_nil;
}
......@@ -199,8 +199,8 @@ private:
void SharedDtor();
void SetCachedSize(int size) const;
RedisReply _first_reply;
RedisReply* _other_replies;
RedisMessage _first_reply;
RedisMessage* _other_replies;
butil::Arena _arena;
int _nreply;
mutable int _cached_size_;
......@@ -212,8 +212,8 @@ std::ostream& operator<<(std::ostream& os, const RedisResponse&);
class RedisConnection {
public:
virtual ~RedisConnection() {}
virtual void OnRedisMessage(const RedisReply& message,
RedisReply* output, butil::Arena* arena) = 0;
virtual void OnRedisMessage(const RedisMessage& message,
RedisMessage* output, butil::Arena* arena) = 0;
};
class RedisService {
......
......@@ -19,31 +19,32 @@
#include <limits>
#include "butil/logging.h"
#include "brpc/redis_reply.h"
#include "brpc/redis_message.h"
namespace brpc {
//BAIDU_CASSERT(sizeof(RedisReply) == 24, size_match);
const uint32_t RedisReply::npos = (uint32_t)-1;
//BAIDU_CASSERT(sizeof(RedisMessage) == 24, size_match);
const uint32_t RedisMessage::npos = (uint32_t)-1;
const char* RedisReplyTypeToString(RedisReplyType type) {
const char* RedisMessageTypeToString(RedisMessageType type) {
switch (type) {
case REDIS_REPLY_STRING: return "string";
case REDIS_REPLY_ARRAY: return "array";
case REDIS_REPLY_INTEGER: return "integer";
case REDIS_REPLY_NIL: return "nil";
case REDIS_REPLY_STATUS: return "status";
case REDIS_REPLY_ERROR: return "error";
case REDIS_MESSAGE_STRING: return "string";
case REDIS_MESSAGE_ARRAY: return "array";
case REDIS_MESSAGE_INTEGER: return "integer";
case REDIS_MESSAGE_NIL: return "nil";
case REDIS_MESSAGE_STATUS: return "status";
case REDIS_MESSAGE_ERROR: return "error";
default: return "unknown redis type";
}
}
bool RedisReply::SerializeToIOBuf(butil::IOBuf* buf) {
bool RedisMessage::SerializeToIOBuf(butil::IOBuf* buf) {
butil::IOBufBuilder builder;
switch (_type) {
case REDIS_REPLY_ERROR:
case REDIS_REPLY_STATUS:
buf->push_back((_type == REDIS_REPLY_ERROR)? '-' : '+');
case REDIS_MESSAGE_ERROR:
// fall through
case REDIS_MESSAGE_STATUS:
buf->push_back((_type == REDIS_MESSAGE_ERROR)? '-' : '+');
if (_length < sizeof(_data.short_str)) {
buf->append(_data.short_str, _length);
} else {
......@@ -51,11 +52,11 @@ bool RedisReply::SerializeToIOBuf(butil::IOBuf* buf) {
}
buf->append("\r\n");
break;
case REDIS_REPLY_INTEGER:
case REDIS_MESSAGE_INTEGER:
builder << ':' << _data.integer << "\r\n";
buf->append(builder.buf());
break;
case REDIS_REPLY_STRING:
case REDIS_MESSAGE_STRING:
// Since _length is unsigned, we have to int casting _length to
// represent nil string
builder << '$' << (int)_length << "\r\n";
......@@ -70,7 +71,7 @@ bool RedisReply::SerializeToIOBuf(butil::IOBuf* buf) {
}
buf->append("\r\n");
break;
case REDIS_REPLY_ARRAY:
case REDIS_MESSAGE_ARRAY:
builder << '*' << (int)_length << "\r\n";
buf->append(builder.buf());
if (_length == npos) {
......@@ -82,7 +83,7 @@ bool RedisReply::SerializeToIOBuf(butil::IOBuf* buf) {
}
}
break;
case REDIS_REPLY_NIL:
case REDIS_MESSAGE_NIL:
buf->append("$-1\r\n");
break;
default:
......@@ -92,11 +93,11 @@ bool RedisReply::SerializeToIOBuf(butil::IOBuf* buf) {
return true;
}
ParseError RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* arena) {
if (_type == REDIS_REPLY_ARRAY && _data.array.last_index >= 0) {
ParseError RedisMessage::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* arena) {
if (_type == REDIS_MESSAGE_ARRAY && _data.array.last_index >= 0) {
// The parsing was suspended while parsing sub replies,
// continue the parsing.
RedisReply* subs = (RedisReply*)_data.array.replies;
RedisMessage* subs = (RedisMessage*)_data.array.replies;
for (uint32_t i = _data.array.last_index; i < _length; ++i) {
ParseError err = subs[i].ConsumePartialIOBuf(buf, arena);
if (err != PARSE_OK) {
......@@ -131,7 +132,7 @@ ParseError RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* aren
const size_t len = str.size() - 1;
if (len < sizeof(_data.short_str)) {
// SSO short strings, including empty string.
_type = (fc == '-' ? REDIS_REPLY_ERROR : REDIS_REPLY_STATUS);
_type = (fc == '-' ? REDIS_MESSAGE_ERROR : REDIS_MESSAGE_STATUS);
_length = len;
str.copy_to_cstr(_data.short_str, (size_t)-1L, 1/*skip fc*/);
return PARSE_OK;
......@@ -142,7 +143,7 @@ ParseError RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* aren
return PARSE_ERROR_ABSOLUTELY_WRONG;
}
CHECK_EQ(len, str.copy_to_cstr(d, (size_t)-1L, 1/*skip fc*/));
_type = (fc == '-' ? REDIS_REPLY_ERROR : REDIS_REPLY_STATUS);
_type = (fc == '-' ? REDIS_MESSAGE_ERROR : REDIS_MESSAGE_STATUS);
_length = len;
_data.long_str = d;
return PARSE_OK;
......@@ -165,7 +166,7 @@ ParseError RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* aren
}
if (fc == ':') {
buf.pop_front(crlf_pos + 2/*CRLF*/);
_type = REDIS_REPLY_INTEGER;
_type = REDIS_MESSAGE_INTEGER;
_length = 0;
_data.integer = value;
return PARSE_OK;
......@@ -173,7 +174,7 @@ ParseError RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* aren
const int64_t len = value; // `value' is length of the string
if (len < 0) { // redis nil
buf.pop_front(crlf_pos + 2/*CRLF*/);
_type = REDIS_REPLY_NIL;
_type = REDIS_MESSAGE_NIL;
_length = 0;
_data.integer = 0;
return PARSE_OK;
......@@ -190,7 +191,7 @@ ParseError RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* aren
}
if ((size_t)len < sizeof(_data.short_str)) {
// SSO short strings, including empty string.
_type = REDIS_REPLY_STRING;
_type = REDIS_MESSAGE_STRING;
_length = len;
buf.pop_front(crlf_pos + 2);
buf.cutn(_data.short_str, len);
......@@ -204,7 +205,7 @@ ParseError RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* aren
buf.pop_front(crlf_pos + 2/*CRLF*/);
buf.cutn(d, len);
d[len] = '\0';
_type = REDIS_REPLY_STRING;
_type = REDIS_MESSAGE_STRING;
_length = len;
_data.long_str = d;
}
......@@ -219,14 +220,14 @@ ParseError RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* aren
const int64_t count = value; // `value' is count of sub replies
if (count < 0) { // redis nil
buf.pop_front(crlf_pos + 2/*CRLF*/);
_type = REDIS_REPLY_NIL;
_type = REDIS_MESSAGE_NIL;
_length = 0;
_data.integer = 0;
return PARSE_OK;
}
if (count == 0) { // empty array
buf.pop_front(crlf_pos + 2/*CRLF*/);
_type = REDIS_REPLY_ARRAY;
_type = REDIS_MESSAGE_ARRAY;
_length = 0;
_data.array.last_index = -1;
_data.array.replies = NULL;
......@@ -238,16 +239,16 @@ ParseError RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* aren
return PARSE_ERROR_ABSOLUTELY_WRONG;
}
// FIXME(gejun): Call allocate_aligned instead.
RedisReply* subs = (RedisReply*)arena->allocate(sizeof(RedisReply) * count);
RedisMessage* subs = (RedisMessage*)arena->allocate(sizeof(RedisMessage) * count);
if (subs == NULL) {
LOG(FATAL) << "Fail to allocate RedisReply[" << count << "]";
LOG(FATAL) << "Fail to allocate RedisMessage[" << count << "]";
return PARSE_ERROR_ABSOLUTELY_WRONG;
}
for (int64_t i = 0; i < count; ++i) {
new (&subs[i]) RedisReply;
new (&subs[i]) RedisMessage;
}
buf.pop_front(crlf_pos + 2/*CRLF*/);
_type = REDIS_REPLY_ARRAY;
_type = REDIS_MESSAGE_ARRAY;
_length = count;
_data.array.replies = subs;
......@@ -316,9 +317,9 @@ void RedisStringPrinter::Print(std::ostream& os) const {
}
// Mimic how official redis-cli prints.
void RedisReply::Print(std::ostream& os) const {
void RedisMessage::Print(std::ostream& os) const {
switch (_type) {
case REDIS_REPLY_STRING:
case REDIS_MESSAGE_STRING:
os << '"';
if (_length < sizeof(_data.short_str)) {
os << RedisStringPrinter(_data.short_str, _length);
......@@ -327,7 +328,7 @@ void RedisReply::Print(std::ostream& os) const {
}
os << '"';
break;
case REDIS_REPLY_ARRAY:
case REDIS_MESSAGE_ARRAY:
os << '[';
for (uint32_t i = 0; i < _length; ++i) {
if (i != 0) {
......@@ -337,16 +338,16 @@ void RedisReply::Print(std::ostream& os) const {
}
os << ']';
break;
case REDIS_REPLY_INTEGER:
case REDIS_MESSAGE_INTEGER:
os << "(integer) " << _data.integer;
break;
case REDIS_REPLY_NIL:
case REDIS_MESSAGE_NIL:
os << "(nil)";
break;
case REDIS_REPLY_ERROR:
case REDIS_MESSAGE_ERROR:
os << "(error) ";
// fall through
case REDIS_REPLY_STATUS:
case REDIS_MESSAGE_STATUS:
if (_length < sizeof(_data.short_str)) {
os << RedisStringPrinter(_data.short_str, _length);
} else {
......@@ -359,19 +360,19 @@ void RedisReply::Print(std::ostream& os) const {
}
}
void RedisReply::CopyFromDifferentArena(const RedisReply& other,
void RedisMessage::CopyFromDifferentArena(const RedisMessage& other,
butil::Arena* arena) {
_type = other._type;
_length = other._length;
switch (_type) {
case REDIS_REPLY_ARRAY: {
RedisReply* subs = (RedisReply*)arena->allocate(sizeof(RedisReply) * _length);
case REDIS_MESSAGE_ARRAY: {
RedisMessage* subs = (RedisMessage*)arena->allocate(sizeof(RedisMessage) * _length);
if (subs == NULL) {
LOG(FATAL) << "Fail to allocate RedisReply[" << _length << "]";
LOG(FATAL) << "Fail to allocate RedisMessage[" << _length << "]";
return;
}
for (uint32_t i = 0; i < _length; ++i) {
new (&subs[i]) RedisReply;
new (&subs[i]) RedisMessage;
}
_data.array.last_index = other._data.array.last_index;
if (_data.array.last_index > 0) {
......@@ -382,16 +383,16 @@ void RedisReply::CopyFromDifferentArena(const RedisReply& other,
_data.array.replies = subs;
}
break;
case REDIS_REPLY_INTEGER:
case REDIS_MESSAGE_INTEGER:
_data.integer = other._data.integer;
break;
case REDIS_REPLY_NIL:
case REDIS_MESSAGE_NIL:
break;
case REDIS_REPLY_STRING:
case REDIS_MESSAGE_STRING:
// fall through
case REDIS_REPLY_ERROR:
case REDIS_MESSAGE_ERROR:
// fall through
case REDIS_REPLY_STATUS:
case REDIS_MESSAGE_STATUS:
if (_length < sizeof(_data.short_str)) {
memcpy(_data.short_str, other._data.short_str, _length + 1);
} else {
......
......@@ -17,8 +17,8 @@
// Authors: Ge,Jun (gejun@baidu.com)
#ifndef BRPC_REDIS_REPLY_H
#define BRPC_REDIS_REPLY_H
#ifndef BRPC_REDIS_MESSAGE_H
#define BRPC_REDIS_MESSAGE_H
#include "butil/iobuf.h" // butil::IOBuf
#include "butil/strings/string_piece.h" // butil::StringPiece
......@@ -30,25 +30,25 @@
namespace brpc {
// Different types of replies.
enum RedisReplyType {
REDIS_REPLY_STRING = 1, // Bulk String
REDIS_REPLY_ARRAY = 2,
REDIS_REPLY_INTEGER = 3,
REDIS_REPLY_NIL = 4,
REDIS_REPLY_STATUS = 5, // Simple String
REDIS_REPLY_ERROR = 6
enum RedisMessageType {
REDIS_MESSAGE_STRING = 1, // Bulk String
REDIS_MESSAGE_ARRAY = 2,
REDIS_MESSAGE_INTEGER = 3,
REDIS_MESSAGE_NIL = 4,
REDIS_MESSAGE_STATUS = 5, // Simple String
REDIS_MESSAGE_ERROR = 6
};
const char* RedisReplyTypeToString(RedisReplyType);
const char* RedisMessageTypeToString(RedisMessageType);
// A reply from redis-server.
class RedisReply {
class RedisMessage {
public:
// A default constructed reply is a nil.
RedisReply();
RedisMessage();
// Type of the reply.
RedisReplyType type() const { return _type; }
RedisMessageType type() const { return _type; }
bool is_nil() const; // True if the reply is a (redis) nil.
bool is_integer() const; // True if the reply is an integer.
......@@ -86,17 +86,17 @@ public:
size_t size() const;
// Get the index-th sub reply. If this reply is not an array, a nil reply
// is returned (call stacks are not logged)
const RedisReply& operator[](size_t index) const;
RedisReply& operator[](size_t index);
const RedisMessage& operator[](size_t index) const;
RedisMessage& operator[](size_t index);
// Parse from `buf' which may be incomplete and allocate needed memory
// on `arena'.
// Returns PARSE_OK when an intact reply is parsed and cut off from `buf'.
// Returns PARSE_ERROR_NOT_ENOUGH_DATA if data in `buf' is not enough to parse,
// and `buf' is guaranteed to be UNCHANGED so that you can call this
// function on a RedisReply object with the same buf again and again until
// function on a RedisMessage object with the same buf again and again until
// the function returns PARSE_OK. This property makes sure the parsing of
// RedisReply in the worst case is O(N) where N is size of the on-wire
// RedisMessage in the worst case is O(N) where N is size of the on-wire
// reply. As a contrast, if the parsing needs `buf' to be intact,
// the complexity in worst case may be O(N^2).
// Returns PARSE_ERROR_ABSOLUTELY_WRONG if the parsing failed.
......@@ -106,7 +106,7 @@ public:
bool SerializeToIOBuf(butil::IOBuf* buf);
// Swap internal fields with another reply.
void Swap(RedisReply& other);
void Swap(RedisMessage& other);
// Reset to the state that this reply was just constructed.
void Clear();
......@@ -116,22 +116,22 @@ public:
// Copy from another reply allocating on a different Arena, and allocate
// required memory with `self_arena'.
void CopyFromDifferentArena(const RedisReply& other,
void CopyFromDifferentArena(const RedisMessage& other,
butil::Arena* self_arena);
// Copy from another reply allocating on a same Arena.
void CopyFromSameArena(const RedisReply& other);
void CopyFromSameArena(const RedisMessage& other);
private:
static const uint32_t npos;
// RedisReply does not own the memory of fields, copying must be done
// RedisMessage does not own the memory of fields, copying must be done
// by calling CopyFrom[Different|Same]Arena.
DISALLOW_COPY_AND_ASSIGN(RedisReply);
DISALLOW_COPY_AND_ASSIGN(RedisMessage);
bool set_basic_string(const std::string& str, butil::Arena* arena, RedisReplyType type);
bool set_basic_string(const std::string& str, butil::Arena* arena, RedisMessageType type);
RedisReplyType _type;
RedisMessageType _type;
uint32_t _length; // length of short_str/long_str, count of replies
union {
int64_t integer;
......@@ -139,7 +139,7 @@ private:
const char* long_str;
struct {
int32_t last_index; // >= 0 if previous parsing suspends on replies.
RedisReply* replies;
RedisMessage* replies;
} array;
uint64_t padding[2]; // For swapping, must cover all bytes.
} _data;
......@@ -147,46 +147,46 @@ private:
// =========== inline impl. ==============
inline std::ostream& operator<<(std::ostream& os, const RedisReply& r) {
inline std::ostream& operator<<(std::ostream& os, const RedisMessage& r) {
r.Print(os);
return os;
}
inline RedisReply::RedisReply()
: _type(REDIS_REPLY_NIL)
inline RedisMessage::RedisMessage()
: _type(REDIS_MESSAGE_NIL)
, _length(0) {
_data.array.last_index = -1;
_data.array.replies = NULL;
}
inline bool RedisReply::is_nil() const {
return (_type == REDIS_REPLY_NIL) ||
((_type == REDIS_REPLY_STRING || _type == REDIS_REPLY_ARRAY) &&
inline bool RedisMessage::is_nil() const {
return (_type == REDIS_MESSAGE_NIL) ||
((_type == REDIS_MESSAGE_STRING || _type == REDIS_MESSAGE_ARRAY) &&
_length == uint32_t(-1));
}
inline bool RedisReply::is_error() const { return _type == REDIS_REPLY_ERROR; }
inline bool RedisReply::is_integer() const { return _type == REDIS_REPLY_INTEGER; }
inline bool RedisReply::is_string() const
{ return _type == REDIS_REPLY_STRING || _type == REDIS_REPLY_STATUS; }
inline bool RedisReply::is_array() const { return _type == REDIS_REPLY_ARRAY; }
inline bool RedisMessage::is_error() const { return _type == REDIS_MESSAGE_ERROR; }
inline bool RedisMessage::is_integer() const { return _type == REDIS_MESSAGE_INTEGER; }
inline bool RedisMessage::is_string() const
{ return _type == REDIS_MESSAGE_STRING || _type == REDIS_MESSAGE_STATUS; }
inline bool RedisMessage::is_array() const { return _type == REDIS_MESSAGE_ARRAY; }
inline int64_t RedisReply::integer() const {
inline int64_t RedisMessage::integer() const {
if (is_integer()) {
return _data.integer;
}
CHECK(false) << "The reply is " << RedisReplyTypeToString(_type)
CHECK(false) << "The reply is " << RedisMessageTypeToString(_type)
<< ", not an integer";
return 0;
}
inline bool RedisReply::set_nil_string() {
_type = REDIS_REPLY_STRING;
inline bool RedisMessage::set_nil_string() {
_type = REDIS_MESSAGE_STRING;
_length = npos;
return true;
}
inline bool RedisReply::set_array(int size, butil::Arena* arena) {
_type = REDIS_REPLY_ARRAY;
inline bool RedisMessage::set_array(int size, butil::Arena* arena) {
_type = REDIS_MESSAGE_ARRAY;
if (size < 0) {
_length = npos;
return true;
......@@ -194,20 +194,20 @@ inline bool RedisReply::set_array(int size, butil::Arena* arena) {
_length = 0;
return true;
}
RedisReply* subs = (RedisReply*)arena->allocate(sizeof(RedisReply) * size);
RedisMessage* subs = (RedisMessage*)arena->allocate(sizeof(RedisMessage) * size);
if (!subs) {
LOG(FATAL) << "Fail to allocate RedisReply[" << size << "]";
LOG(FATAL) << "Fail to allocate RedisMessage[" << size << "]";
return false;
}
for (int i = 0; i < size; ++i) {
new (&subs[i]) RedisReply;
new (&subs[i]) RedisMessage;
}
_length = size;
_data.array.replies = subs;
return true;
}
inline bool RedisReply::set_basic_string(const std::string& str, butil::Arena* arena, RedisReplyType type) {
inline bool RedisMessage::set_basic_string(const std::string& str, butil::Arena* arena, RedisMessageType type) {
size_t size = str.size();
if (size < sizeof(_data.short_str)) {
memcpy(_data.short_str, str.c_str(), size);
......@@ -225,26 +225,26 @@ inline bool RedisReply::set_basic_string(const std::string& str, butil::Arena* a
return true;
}
inline bool RedisReply::set_status(const std::string& str, butil::Arena* arena) {
return set_basic_string(str, arena, REDIS_REPLY_STATUS);
inline bool RedisMessage::set_status(const std::string& str, butil::Arena* arena) {
return set_basic_string(str, arena, REDIS_MESSAGE_STATUS);
}
inline bool RedisReply::set_error(const std::string& str, butil::Arena* arena) {
return set_basic_string(str, arena, REDIS_REPLY_ERROR);
inline bool RedisMessage::set_error(const std::string& str, butil::Arena* arena) {
return set_basic_string(str, arena, REDIS_MESSAGE_ERROR);
}
inline bool RedisReply::set_integer(int64_t value) {
_type = REDIS_REPLY_INTEGER;
inline bool RedisMessage::set_integer(int64_t value) {
_type = REDIS_MESSAGE_INTEGER;
_length = 0;
_data.integer = value;
return true;
}
inline bool RedisReply::set_bulk_string(const std::string& str, butil::Arena* arena) {
return set_basic_string(str, arena, REDIS_REPLY_STRING);
inline bool RedisMessage::set_bulk_string(const std::string& str, butil::Arena* arena) {
return set_basic_string(str, arena, REDIS_MESSAGE_STRING);
}
inline const char* RedisReply::c_str() const {
inline const char* RedisMessage::c_str() const {
if (is_string()) {
if (_length < sizeof(_data.short_str)) { // SSO
return _data.short_str;
......@@ -252,12 +252,12 @@ inline const char* RedisReply::c_str() const {
return _data.long_str;
}
}
CHECK(false) << "The reply is " << RedisReplyTypeToString(_type)
CHECK(false) << "The reply is " << RedisMessageTypeToString(_type)
<< ", not a string";
return "";
}
inline butil::StringPiece RedisReply::data() const {
inline butil::StringPiece RedisMessage::data() const {
if (is_string()) {
if (_length < sizeof(_data.short_str)) { // SSO
return butil::StringPiece(_data.short_str, _length);
......@@ -265,12 +265,12 @@ inline butil::StringPiece RedisReply::data() const {
return butil::StringPiece(_data.long_str, _length);
}
}
CHECK(false) << "The reply is " << RedisReplyTypeToString(_type)
CHECK(false) << "The reply is " << RedisMessageTypeToString(_type)
<< ", not a string";
return butil::StringPiece();
}
inline const char* RedisReply::error_message() const {
inline const char* RedisMessage::error_message() const {
if (is_error()) {
if (_length < sizeof(_data.short_str)) { // SSO
return _data.short_str;
......@@ -278,43 +278,43 @@ inline const char* RedisReply::error_message() const {
return _data.long_str;
}
}
CHECK(false) << "The reply is " << RedisReplyTypeToString(_type)
CHECK(false) << "The reply is " << RedisMessageTypeToString(_type)
<< ", not an error";
return "";
}
inline size_t RedisReply::size() const {
inline size_t RedisMessage::size() const {
return (is_array() ? _length : 0);
}
inline RedisReply& RedisReply::operator[](size_t index) {
return const_cast<RedisReply&>(
const_cast<const RedisReply*>(this)->operator[](index));
inline RedisMessage& RedisMessage::operator[](size_t index) {
return const_cast<RedisMessage&>(
const_cast<const RedisMessage*>(this)->operator[](index));
}
inline const RedisReply& RedisReply::operator[](size_t index) const {
inline const RedisMessage& RedisMessage::operator[](size_t index) const {
if (is_array() && index < _length) {
return _data.array.replies[index];
}
static RedisReply redis_nil;
static RedisMessage redis_nil;
return redis_nil;
}
inline void RedisReply::Swap(RedisReply& other) {
inline void RedisMessage::Swap(RedisMessage& other) {
std::swap(_type, other._type);
std::swap(_length, other._length);
std::swap(_data.padding[0], other._data.padding[0]);
std::swap(_data.padding[1], other._data.padding[1]);
}
inline void RedisReply::Clear() {
_type = REDIS_REPLY_NIL;
inline void RedisMessage::Clear() {
_type = REDIS_MESSAGE_NIL;
_length = 0;
_data.array.last_index = -1;
_data.array.replies = NULL;
}
inline void RedisReply::CopyFromSameArena(const RedisReply& other) {
inline void RedisMessage::CopyFromSameArena(const RedisMessage& other) {
_type = other._type;
_length = other._length;
_data.padding[0] = other._data.padding[0];
......
......@@ -103,32 +103,32 @@ protected:
void TearDown() {}
};
void AssertReplyEqual(const brpc::RedisReply& reply1,
const brpc::RedisReply& reply2) {
void AssertReplyEqual(const brpc::RedisMessage& reply1,
const brpc::RedisMessage& reply2) {
if (&reply1 == &reply2) {
return;
}
CHECK_EQ(reply1.type(), reply2.type());
switch (reply1.type()) {
case brpc::REDIS_REPLY_ARRAY:
case brpc::REDIS_MESSAGE_ARRAY:
ASSERT_EQ(reply1.size(), reply2.size());
for (size_t j = 0; j < reply1.size(); ++j) {
ASSERT_NE(&reply1[j], &reply2[j]); // from different arena
AssertReplyEqual(reply1[j], reply2[j]);
}
break;
case brpc::REDIS_REPLY_INTEGER:
case brpc::REDIS_MESSAGE_INTEGER:
ASSERT_EQ(reply1.integer(), reply2.integer());
break;
case brpc::REDIS_REPLY_NIL:
case brpc::REDIS_MESSAGE_NIL:
break;
case brpc::REDIS_REPLY_STRING:
case brpc::REDIS_MESSAGE_STRING:
// fall through
case brpc::REDIS_REPLY_STATUS:
case brpc::REDIS_MESSAGE_STATUS:
ASSERT_NE(reply1.c_str(), reply2.c_str()); // from different arena
ASSERT_STREQ(reply1.c_str(), reply2.c_str());
break;
case brpc::REDIS_REPLY_ERROR:
case brpc::REDIS_MESSAGE_ERROR:
ASSERT_NE(reply1.error_message(), reply2.error_message()); // from different arena
ASSERT_STREQ(reply1.error_message(), reply2.error_message());
break;
......@@ -168,7 +168,7 @@ TEST_F(RedisTest, sanity) {
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(1, response.reply_size());
ASSERT_EQ(brpc::REDIS_REPLY_NIL, response.reply(0).type())
ASSERT_EQ(brpc::REDIS_MESSAGE_NIL, response.reply(0).type())
<< response;
cntl.Reset();
......@@ -178,7 +178,7 @@ TEST_F(RedisTest, sanity) {
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(1, response.reply_size());
ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(0).type());
ASSERT_EQ(brpc::REDIS_MESSAGE_STATUS, response.reply(0).type());
ASSERT_EQ("OK", response.reply(0).data());
cntl.Reset();
......@@ -188,7 +188,7 @@ TEST_F(RedisTest, sanity) {
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed());
ASSERT_EQ(1, response.reply_size());
ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(0).type());
ASSERT_EQ(brpc::REDIS_MESSAGE_STRING, response.reply(0).type());
ASSERT_EQ("world", response.reply(0).data());
cntl.Reset();
......@@ -198,7 +198,7 @@ TEST_F(RedisTest, sanity) {
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(1, response.reply_size());
ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(0).type());
ASSERT_EQ(brpc::REDIS_MESSAGE_STATUS, response.reply(0).type());
ASSERT_EQ("OK", response.reply(0).data());
cntl.Reset();
......@@ -208,7 +208,7 @@ TEST_F(RedisTest, sanity) {
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed());
ASSERT_EQ(1, response.reply_size());
ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(0).type());
ASSERT_EQ(brpc::REDIS_MESSAGE_STRING, response.reply(0).type());
ASSERT_EQ("world2", response.reply(0).data());
cntl.Reset();
......@@ -217,7 +217,7 @@ TEST_F(RedisTest, sanity) {
ASSERT_TRUE(request.AddCommand("del hello"));
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed());
ASSERT_EQ(brpc::REDIS_REPLY_INTEGER, response.reply(0).type());
ASSERT_EQ(brpc::REDIS_MESSAGE_INTEGER, response.reply(0).type());
ASSERT_EQ(1, response.reply(0).integer());
cntl.Reset();
......@@ -227,7 +227,7 @@ TEST_F(RedisTest, sanity) {
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(1, response.reply_size());
ASSERT_EQ(brpc::REDIS_REPLY_NIL, response.reply(0).type());
ASSERT_EQ(brpc::REDIS_MESSAGE_NIL, response.reply(0).type());
}
TEST_F(RedisTest, keys_with_spaces) {
......@@ -257,19 +257,19 @@ TEST_F(RedisTest, keys_with_spaces) {
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(7, response.reply_size());
ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(0).type());
ASSERT_EQ(brpc::REDIS_MESSAGE_STATUS, response.reply(0).type());
ASSERT_EQ("OK", response.reply(0).data());
ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(1).type());
ASSERT_EQ(brpc::REDIS_MESSAGE_STATUS, response.reply(1).type());
ASSERT_EQ("OK", response.reply(1).data());
ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(2).type());
ASSERT_EQ(brpc::REDIS_MESSAGE_STATUS, response.reply(2).type());
ASSERT_EQ("OK", response.reply(2).data());
ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(3).type());
ASSERT_EQ(brpc::REDIS_MESSAGE_STRING, response.reply(3).type());
ASSERT_EQ("he1 he1 da1", response.reply(3).data());
ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(4).type());
ASSERT_EQ(brpc::REDIS_MESSAGE_STRING, response.reply(4).type());
ASSERT_EQ("he1 he1 da1", response.reply(4).data());
ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(5).type());
ASSERT_EQ(brpc::REDIS_MESSAGE_STRING, response.reply(5).type());
ASSERT_EQ("he2 he2 da2", response.reply(5).data());
ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(6).type());
ASSERT_EQ(brpc::REDIS_MESSAGE_STRING, response.reply(6).type());
ASSERT_EQ("he3 he3 da3", response.reply(6).data());
brpc::RedisResponse response2 = response;
......@@ -298,13 +298,13 @@ TEST_F(RedisTest, incr_and_decr) {
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(4, response.reply_size());
ASSERT_EQ(brpc::REDIS_REPLY_INTEGER, response.reply(0).type());
ASSERT_EQ(brpc::REDIS_MESSAGE_INTEGER, response.reply(0).type());
ASSERT_EQ(1, response.reply(0).integer());
ASSERT_EQ(brpc::REDIS_REPLY_INTEGER, response.reply(1).type());
ASSERT_EQ(brpc::REDIS_MESSAGE_INTEGER, response.reply(1).type());
ASSERT_EQ(0, response.reply(1).integer());
ASSERT_EQ(brpc::REDIS_REPLY_INTEGER, response.reply(2).type());
ASSERT_EQ(brpc::REDIS_MESSAGE_INTEGER, response.reply(2).type());
ASSERT_EQ(10, response.reply(2).integer());
ASSERT_EQ(brpc::REDIS_REPLY_INTEGER, response.reply(3).type());
ASSERT_EQ(brpc::REDIS_MESSAGE_INTEGER, response.reply(3).type());
ASSERT_EQ(-10, response.reply(3).integer());
brpc::RedisResponse response2 = response;
......@@ -339,13 +339,13 @@ TEST_F(RedisTest, by_components) {
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(4, response.reply_size());
ASSERT_EQ(brpc::REDIS_REPLY_INTEGER, response.reply(0).type());
ASSERT_EQ(brpc::REDIS_MESSAGE_INTEGER, response.reply(0).type());
ASSERT_EQ(1, response.reply(0).integer());
ASSERT_EQ(brpc::REDIS_REPLY_INTEGER, response.reply(1).type());
ASSERT_EQ(brpc::REDIS_MESSAGE_INTEGER, response.reply(1).type());
ASSERT_EQ(0, response.reply(1).integer());
ASSERT_EQ(brpc::REDIS_REPLY_INTEGER, response.reply(2).type());
ASSERT_EQ(brpc::REDIS_MESSAGE_INTEGER, response.reply(2).type());
ASSERT_EQ(10, response.reply(2).integer());
ASSERT_EQ(brpc::REDIS_REPLY_INTEGER, response.reply(3).type());
ASSERT_EQ(brpc::REDIS_MESSAGE_INTEGER, response.reply(3).type());
ASSERT_EQ(-10, response.reply(3).integer());
brpc::RedisResponse response2 = response;
......@@ -382,13 +382,13 @@ TEST_F(RedisTest, auth) {
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(4, response.reply_size());
ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(0).type());
ASSERT_EQ(brpc::REDIS_MESSAGE_STATUS, response.reply(0).type());
ASSERT_STREQ("OK", response.reply(0).c_str());
ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(1).type());
ASSERT_EQ(brpc::REDIS_MESSAGE_STATUS, response.reply(1).type());
ASSERT_STREQ("OK", response.reply(1).c_str());
ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(2).type());
ASSERT_EQ(brpc::REDIS_MESSAGE_STATUS, response.reply(2).type());
ASSERT_STREQ("OK", response.reply(2).c_str());
ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(3).type());
ASSERT_EQ(brpc::REDIS_MESSAGE_STRING, response.reply(3).type());
ASSERT_STREQ("my_redis", response.reply(3).c_str());
}
......@@ -409,7 +409,7 @@ TEST_F(RedisTest, auth) {
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(1, response.reply_size());
ASSERT_EQ(brpc::REDIS_REPLY_ERROR, response.reply(0).type());
ASSERT_EQ(brpc::REDIS_MESSAGE_ERROR, response.reply(0).type());
}
// Auth with RedisAuthenticator && clear auth
......@@ -434,9 +434,9 @@ TEST_F(RedisTest, auth) {
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(2, response.reply_size());
ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(0).type());
ASSERT_EQ(brpc::REDIS_MESSAGE_STRING, response.reply(0).type());
ASSERT_STREQ("my_redis", response.reply(0).c_str());
ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(1).type());
ASSERT_EQ(brpc::REDIS_MESSAGE_STATUS, response.reply(1).type());
ASSERT_STREQ("OK", response.reply(1).c_str());
}
......@@ -457,7 +457,7 @@ TEST_F(RedisTest, auth) {
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(1, response.reply_size());
ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(0).type());
ASSERT_EQ(brpc::REDIS_MESSAGE_STRING, response.reply(0).type());
ASSERT_STREQ("my_redis", response.reply(0).c_str());
}
}
......@@ -552,7 +552,7 @@ TEST_F(RedisTest, codec) {
butil::Arena arena;
// status
{
brpc::RedisReply r;
brpc::RedisMessage r;
butil::IOBuf buf;
ASSERT_TRUE(r.set_status("OK", &arena));
ASSERT_TRUE(r.SerializeToIOBuf(&buf));
......@@ -565,7 +565,7 @@ TEST_F(RedisTest, codec) {
}
// error
{
brpc::RedisReply r;
brpc::RedisMessage r;
butil::IOBuf buf;
ASSERT_TRUE(r.set_error("not exist \'key\'", &arena));
ASSERT_TRUE(r.SerializeToIOBuf(&buf));
......@@ -578,7 +578,7 @@ TEST_F(RedisTest, codec) {
}
// string
{
brpc::RedisReply r;
brpc::RedisMessage r;
butil::IOBuf buf;
ASSERT_TRUE(r.set_nil_string());
ASSERT_TRUE(r.SerializeToIOBuf(&buf));
......@@ -600,7 +600,7 @@ TEST_F(RedisTest, codec) {
}
// integer
{
brpc::RedisReply r;
brpc::RedisMessage r;
butil::IOBuf buf;
int t = 2;
int input[] = { -1, 1234567 };
......@@ -619,10 +619,10 @@ TEST_F(RedisTest, codec) {
}
// array
{
brpc::RedisReply r;
brpc::RedisMessage r;
butil::IOBuf buf;
ASSERT_TRUE(r.set_array(3, &arena));
brpc::RedisReply& sub_reply = r[0];
brpc::RedisMessage& sub_reply = r[0];
sub_reply.set_array(2, &arena);
sub_reply[0].set_bulk_string("hello, it's me", &arena);
sub_reply[1].set_integer(422);
......@@ -668,12 +668,12 @@ public:
RedisConnectionImpl(RedisServiceImpl* rs)
: _rs(rs) { }
void OnRedisMessage(const brpc::RedisReply& message, brpc::RedisReply* output, butil::Arena* arena) {
void OnRedisMessage(const brpc::RedisMessage& message, brpc::RedisMessage* output, butil::Arena* arena) {
if (!message.is_array() || message.size() == 0) {
output->set_error("command not valid array", arena);
return;
}
const brpc::RedisReply& comm = message[0];
const brpc::RedisMessage& comm = message[0];
if (!comm.is_string()) {
output->set_error("command not string", arena);
return;
......@@ -752,17 +752,17 @@ TEST_F(RedisTest, server_sanity) {
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(7, response.reply_size());
ASSERT_EQ(brpc::REDIS_REPLY_NIL, response.reply(0).type());
ASSERT_EQ(brpc::REDIS_REPLY_NIL, response.reply(1).type());
ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(2).type());
ASSERT_EQ(brpc::REDIS_MESSAGE_NIL, response.reply(0).type());
ASSERT_EQ(brpc::REDIS_MESSAGE_NIL, response.reply(1).type());
ASSERT_EQ(brpc::REDIS_MESSAGE_STATUS, response.reply(2).type());
ASSERT_STREQ("OK", response.reply(2).c_str());
ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(3).type());
ASSERT_EQ(brpc::REDIS_MESSAGE_STRING, response.reply(3).type());
ASSERT_STREQ("value1", response.reply(3).c_str());
ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(4).type());
ASSERT_EQ(brpc::REDIS_MESSAGE_STATUS, response.reply(4).type());
ASSERT_STREQ("OK", response.reply(4).c_str());
ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(5).type());
ASSERT_EQ(brpc::REDIS_MESSAGE_STRING, response.reply(5).type());
ASSERT_STREQ("value2", response.reply(5).c_str());
ASSERT_EQ(brpc::REDIS_REPLY_ERROR, response.reply(6).type());
ASSERT_EQ(brpc::REDIS_MESSAGE_ERROR, response.reply(6).type());
ASSERT_TRUE(butil::StringPiece(response.reply(6).error_message()).starts_with("ERR unknown command"));
ASSERT_EQ(rsimpl->call_count, 1);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment