Commit 5f4815ab authored by zhujiashun's avatar zhujiashun

redis_server_protocol: add set_xxx in RedisReply

parent dacc5236
...@@ -56,6 +56,7 @@ struct InputResponse : public InputMessageBase { ...@@ -56,6 +56,7 @@ struct InputResponse : public InputMessageBase {
struct ExecutionQueueContext { struct ExecutionQueueContext {
RedisReply message; RedisReply message;
SocketId socket_id; SocketId socket_id;
butil::Arena arena;
}; };
int Consume(void* meta, bthread::TaskIterator<ExecutionQueueContext*>& iter) { int Consume(void* meta, bthread::TaskIterator<ExecutionQueueContext*>& iter) {
...@@ -72,7 +73,7 @@ int Consume(void* meta, bthread::TaskIterator<ExecutionQueueContext*>& iter) { ...@@ -72,7 +73,7 @@ int Consume(void* meta, bthread::TaskIterator<ExecutionQueueContext*>& iter) {
continue; continue;
} }
RedisReply output; RedisReply output;
conn->OnRedisMessage(ctx->message, &output); conn->OnRedisMessage(ctx->message, &output, &ctx->arena);
butil::IOBuf sendbuf; butil::IOBuf sendbuf;
sendbuf.append("+OK\r\n"); sendbuf.append("+OK\r\n");
Socket::WriteOptions wopt; Socket::WriteOptions wopt;
...@@ -103,7 +104,6 @@ public: ...@@ -103,7 +104,6 @@ public:
} }
bthread::ExecutionQueueId<ExecutionQueueContext*> queue; bthread::ExecutionQueueId<ExecutionQueueContext*> queue;
butil::Arena arena;
}; };
// "Message" = "Response" as we only implement the client for redis. // "Message" = "Response" as we only implement the client for redis.
...@@ -129,18 +129,19 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket, ...@@ -129,18 +129,19 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
} }
socket->initialize_parsing_context(&ctx); socket->initialize_parsing_context(&ctx);
} }
std::unique_ptr<ExecutionQueueContext> task(new ExecutionQueueContext);
RedisReply message; RedisReply message;
ParseError err = message.ConsumePartialIOBuf(*source, &ctx->arena); ParseError err = message.ConsumePartialIOBuf(*source, &task->arena);
if (err != PARSE_OK) { if (err != PARSE_OK) {
return MakeParseError(err); return MakeParseError(err);
} }
ExecutionQueueContext* task = new ExecutionQueueContext;
task->message.Swap(message); task->message.Swap(message);
task->socket_id = socket->id(); task->socket_id = socket->id();
if (bthread::execution_queue_execute(ctx->queue, task) != 0) { if (bthread::execution_queue_execute(ctx->queue, task.get()) != 0) {
LOG(ERROR) << "Fail to push execution queue"; LOG(ERROR) << "Fail to push execution queue";
return MakeParseError(PARSE_ERROR_NO_RESOURCE); return MakeParseError(PARSE_ERROR_NO_RESOURCE);
} }
task.release();
return MakeMessage(NULL); return MakeMessage(NULL);
} else { } else {
// NOTE(gejun): PopPipelinedInfo() is actually more contended than what // NOTE(gejun): PopPipelinedInfo() is actually more contended than what
......
...@@ -212,7 +212,8 @@ std::ostream& operator<<(std::ostream& os, const RedisResponse&); ...@@ -212,7 +212,8 @@ std::ostream& operator<<(std::ostream& os, const RedisResponse&);
class RedisConnection { class RedisConnection {
public: public:
virtual ~RedisConnection() {} virtual ~RedisConnection() {}
virtual void OnRedisMessage(const RedisReply& message, RedisReply* output) = 0; virtual void OnRedisMessage(const RedisReply& message,
RedisReply* output, butil::Arena* arena) = 0;
}; };
class RedisService { class RedisService {
......
...@@ -37,6 +37,12 @@ const char* RedisReplyTypeToString(RedisReplyType type) { ...@@ -37,6 +37,12 @@ const char* RedisReplyTypeToString(RedisReplyType type) {
} }
} }
bool RedisReply::SerializeToIOBuf(butil::IOBuf* buf) {
//TODO
return true;
}
ParseError RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* arena) { ParseError RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* arena) {
if (_type == REDIS_REPLY_ARRAY && _data.array.last_index >= 0) { if (_type == REDIS_REPLY_ARRAY && _data.array.last_index >= 0) {
// The parsing was suspended while parsing sub replies, // The parsing was suspended while parsing sub replies,
......
...@@ -56,6 +56,13 @@ public: ...@@ -56,6 +56,13 @@ public:
bool is_string() const; // True if the reply is a string. bool is_string() const; // True if the reply is a string.
bool is_array() const; // True if the reply is an array. bool is_array() const; // True if the reply is an array.
bool set_nil_string(); // "$-1\r\n"
bool set_array(int size, butil::Arena* arena); // size == -1 means nil array("*-1\r\n")
bool set_simple_string(const std::string& str, butil::Arena* arena);
bool set_error(const std::string& str, butil::Arena* arena);
bool set_integer(int64_t value);
bool set_bulk_string(const std::string& str, butil::Arena* arena);
// Convert the reply into a signed 64-bit integer(according to // Convert the reply into a signed 64-bit integer(according to
// http://redis.io/topics/protocol). If the reply is not an integer, // http://redis.io/topics/protocol). If the reply is not an integer,
// call stacks are logged and 0 is returned. // call stacks are logged and 0 is returned.
...@@ -93,6 +100,8 @@ public: ...@@ -93,6 +100,8 @@ public:
// the complexity in worst case may be O(N^2). // the complexity in worst case may be O(N^2).
// Returns PARSE_ERROR_ABSOLUTELY_WRONG if the parsing failed. // Returns PARSE_ERROR_ABSOLUTELY_WRONG if the parsing failed.
ParseError ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* arena); ParseError ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* arena);
//
bool SerializeToIOBuf(butil::IOBuf* buf);
// Swap internal fields with another reply. // Swap internal fields with another reply.
void Swap(RedisReply& other); void Swap(RedisReply& other);
...@@ -115,6 +124,8 @@ private: ...@@ -115,6 +124,8 @@ private:
// RedisReply does not own the memory of fields, copying must be done // RedisReply does not own the memory of fields, copying must be done
// by calling CopyFrom[Different|Same]Arena. // by calling CopyFrom[Different|Same]Arena.
DISALLOW_COPY_AND_ASSIGN(RedisReply); DISALLOW_COPY_AND_ASSIGN(RedisReply);
bool set_basic_string(const std::string& str, butil::Arena* arena, RedisReplyType type);
RedisReplyType _type; RedisReplyType _type;
uint32_t _length; // length of short_str/long_str, count of replies uint32_t _length; // length of short_str/long_str, count of replies
...@@ -160,6 +171,71 @@ inline int64_t RedisReply::integer() const { ...@@ -160,6 +171,71 @@ inline int64_t RedisReply::integer() const {
return 0; return 0;
} }
inline bool RedisReply::set_nil_string() {
_type = REDIS_REPLY_STRING;
_length = -1;
return true;
}
inline bool RedisReply::set_array(int size, butil::Arena* arena) {
_type = REDIS_REPLY_ARRAY;
if (size < 0) {
_length = -1;
return true;
} else if (size == 0) {
_length = 0;
return true;
}
RedisReply* subs = (RedisReply*)arena->allocate(sizeof(RedisReply) * size);
if (!subs) {
LOG(FATAL) << "Fail to allocate RedisReply[" << size << "]";
return false;
}
for (int i = 0; i < size; ++i) {
new (&subs[i]) RedisReply;
}
_length = size;
_data.array.replies = subs;
return true;
}
inline bool RedisReply::set_basic_string(const std::string& str, butil::Arena* arena, RedisReplyType type) {
size_t size = str.size();
if (size < sizeof(_data.short_str)) {
memcpy(_data.short_str, str.c_str(), size);
} else {
char* d = (char*)arena->allocate((_length/8 + 1) * 8);
if (!d) {
LOG(FATAL) << "Fail to allocate string[" << size << "]";
return false;
}
memcpy(d, str.c_str(), size);
_data.long_str = d;
}
_type = type;
_length = size;
return true;
}
inline bool RedisReply::set_simple_string(const std::string& str, butil::Arena* arena) {
return set_basic_string(str, arena, REDIS_REPLY_STATUS);
}
inline bool RedisReply::set_error(const std::string& str, butil::Arena* arena) {
return set_basic_string(str, arena, REDIS_REPLY_ERROR);
}
inline bool RedisReply::set_integer(int64_t value) {
_type = REDIS_REPLY_INTEGER;
_length = 0;
_data.integer = value;
return true;
}
inline bool RedisReply::set_bulk_string(const std::string& str, butil::Arena* arena) {
return set_basic_string(str, arena, REDIS_REPLY_STRING);
}
inline const char* RedisReply::c_str() const { inline const char* RedisReply::c_str() const {
if (is_string()) { if (is_string()) {
if (_length < sizeof(_data.short_str)) { // SSO if (_length < sizeof(_data.short_str)) { // SSO
......
...@@ -548,20 +548,30 @@ TEST_F(RedisTest, quote_and_escape) { ...@@ -548,20 +548,30 @@ TEST_F(RedisTest, quote_and_escape) {
request.Clear(); request.Clear();
} }
class RedisServiceImpl;
class RedisConnectionImpl : public brpc::RedisConnection { class RedisConnectionImpl : public brpc::RedisConnection {
public: public:
void OnRedisMessage(const brpc::RedisReply& message, brpc::RedisReply* output) { RedisConnectionImpl(RedisServiceImpl* rs)
: _rs(rs) { }
void OnRedisMessage(const brpc::RedisReply& message, brpc::RedisReply* output, butil::Arena* arena) {
LOG(INFO) << "OnRedisMessage, m=" << message; LOG(INFO) << "OnRedisMessage, m=" << message;
return; return;
} }
public:
RedisServiceImpl* _rs;
}; };
class RedisServiceImpl : public brpc::RedisService { class RedisServiceImpl : public brpc::RedisService {
public: public:
// @RedisService // @RedisService
brpc::RedisConnection* NewConnection() { brpc::RedisConnection* NewConnection() {
return new RedisConnectionImpl; return new RedisConnectionImpl(this);
} }
std::map<std::string, std::string> m;
}; };
TEST_F(RedisTest, server) { TEST_F(RedisTest, server) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment