Commit dacc5236 authored by zhujiashun's avatar zhujiashun

redis_server_protocol: complete server parsing

parent 4cb13348
......@@ -489,7 +489,7 @@ static void GlobalInitializeOrDieImpl() {
Protocol redis_protocol = { ParseRedisMessage,
SerializeRedisRequest,
PackRedisRequest,
NULL, ProcessRedisResponse,
ProcessRedisRequest, ProcessRedisResponse,
NULL, NULL, GetRedisMethodName,
CONNECTION_TYPE_ALL, "redis" };
if (RegisterProtocol(PROTOCOL_REDIS, redis_protocol) != 0) {
......
......@@ -46,14 +46,14 @@ enum ProtocolType {
PROTOCOL_HTTP = 7;
PROTOCOL_PUBLIC_PBRPC = 8;
PROTOCOL_NOVA_PBRPC = 9;
PROTOCOL_NSHEAD_CLIENT = 10; // implemented in baidu-rpc-ub
PROTOCOL_NSHEAD = 11;
PROTOCOL_HADOOP_RPC = 12;
PROTOCOL_HADOOP_SERVER_RPC = 13;
PROTOCOL_MONGO = 14; // server side only
PROTOCOL_UBRPC_COMPACK = 15;
PROTOCOL_DIDX_CLIENT = 16; // Client side only
PROTOCOL_REDIS = 17; // Client side only
PROTOCOL_REDIS = 10;
PROTOCOL_NSHEAD_CLIENT = 11; // implemented in baidu-rpc-ub
PROTOCOL_NSHEAD = 12;
PROTOCOL_HADOOP_RPC = 13;
PROTOCOL_HADOOP_SERVER_RPC = 14;
PROTOCOL_MONGO = 15; // server side only
PROTOCOL_UBRPC_COMPACK = 16;
PROTOCOL_DIDX_CLIENT = 17; // Client side only
PROTOCOL_MEMCACHE = 18; // Client side only
PROTOCOL_ITP = 19;
PROTOCOL_NSHEAD_MCPACK = 20;
......
......@@ -31,11 +31,12 @@
#include "brpc/span.h"
#include "brpc/redis.h"
#include "brpc/policy/redis_protocol.h"
#include "bthread/execution_queue.h"
namespace brpc {
DECLARE_bool(enable_rpcz);
DECLARE_bool(usercode_in_pthread);
namespace policy {
......@@ -52,62 +53,147 @@ struct InputResponse : public InputMessageBase {
}
};
// "Message" = "Response" as we only implement the client for redis.
ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
bool /*read_eof*/, const void* /*arg*/) {
if (source->empty()) {
return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
struct ExecutionQueueContext {
RedisReply message;
SocketId socket_id;
};
int Consume(void* meta, bthread::TaskIterator<ExecutionQueueContext*>& iter) {
RedisConnection* conn = static_cast<RedisConnection*>(meta);
if (iter.is_queue_stopped()) {
delete conn;
return 0;
}
for (; iter; ++iter) {
std::unique_ptr<ExecutionQueueContext> ctx(*iter);
SocketUniquePtr s;
if (Socket::Address(ctx->socket_id, &s) != 0) {
LOG(WARNING) << "Fail to address redis socket";
continue;
}
RedisReply output;
conn->OnRedisMessage(ctx->message, &output);
butil::IOBuf sendbuf;
sendbuf.append("+OK\r\n");
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
s->Write(&sendbuf, &wopt);
}
// NOTE(gejun): PopPipelinedInfo() is actually more contended than what
// I thought before. The Socket._pipeline_q is a SPSC queue pushed before
// sending and popped when response comes back, being protected by a
// mutex. Previously the mutex is shared with Socket._id_wait_list. When
// 200 bthreads access one redis-server, ~1.5s in total is spent on
// contention in 10-second duration. If the mutex is separated, the time
// drops to ~0.25s. I further replaced PeekPipelinedInfo() with
// GivebackPipelinedInfo() to lock only once(when receiving response)
// in most cases, and the time decreases to ~0.14s.
PipelinedInfo pi;
if (!socket->PopPipelinedInfo(&pi)) {
LOG(WARNING) << "No corresponding PipelinedInfo in socket";
return MakeParseError(PARSE_ERROR_TRY_OTHERS);
return 0;
}
class ServerContext : public Destroyable {
public:
~ServerContext() {
bthread::execution_queue_stop(queue);
}
do {
InputResponse* msg = static_cast<InputResponse*>(socket->parsing_context());
if (msg == NULL) {
msg = new InputResponse;
socket->reset_parsing_context(msg);
// @Destroyable
void Destroy() { delete this; }
int init(RedisConnection* conn) {
bthread::ExecutionQueueOptions q_opt;
q_opt.bthread_attr =
FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL;
if (bthread::execution_queue_start(&queue, &q_opt, Consume, conn) != 0) {
LOG(ERROR) << "Fail to start execution queue";
return -1;
}
return 0;
}
const int consume_count = (pi.with_auth ? 1 : pi.count);
bthread::ExecutionQueueId<ExecutionQueueContext*> queue;
butil::Arena arena;
};
ParseError err = msg->response.ConsumePartialIOBuf(*source, consume_count);
// "Message" = "Response" as we only implement the client for redis.
ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
bool read_eof, const void* arg) {
if (read_eof || source->empty()) {
return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
}
const Server* server = static_cast<const Server*>(arg);
if (server) {
RedisConnection* conn = server->options().redis_service->NewConnection();
if (!conn) {
LOG(ERROR) << "Fail to new redis connection from redis service";
return MakeParseError(PARSE_ERROR_TRY_OTHERS);
}
ServerContext* ctx = static_cast<ServerContext*>(socket->parsing_context());
if (ctx == NULL) {
ctx = new ServerContext;
if (ctx->init(conn) != 0) {
delete ctx;
LOG(ERROR) << "Fail to init redis ServerContext";
return MakeParseError(PARSE_ERROR_NO_RESOURCE);
}
socket->initialize_parsing_context(&ctx);
}
RedisReply message;
ParseError err = message.ConsumePartialIOBuf(*source, &ctx->arena);
if (err != PARSE_OK) {
socket->GivebackPipelinedInfo(pi);
return MakeParseError(err);
}
ExecutionQueueContext* task = new ExecutionQueueContext;
task->message.Swap(message);
task->socket_id = socket->id();
if (bthread::execution_queue_execute(ctx->queue, task) != 0) {
LOG(ERROR) << "Fail to push execution queue";
return MakeParseError(PARSE_ERROR_NO_RESOURCE);
}
return MakeMessage(NULL);
} else {
// NOTE(gejun): PopPipelinedInfo() is actually more contended than what
// I thought before. The Socket._pipeline_q is a SPSC queue pushed before
// sending and popped when response comes back, being protected by a
// mutex. Previously the mutex is shared with Socket._id_wait_list. When
// 200 bthreads access one redis-server, ~1.5s in total is spent on
// contention in 10-second duration. If the mutex is separated, the time
// drops to ~0.25s. I further replaced PeekPipelinedInfo() with
// GivebackPipelinedInfo() to lock only once(when receiving response)
// in most cases, and the time decreases to ~0.14s.
PipelinedInfo pi;
if (!socket->PopPipelinedInfo(&pi)) {
LOG(WARNING) << "No corresponding PipelinedInfo in socket";
return MakeParseError(PARSE_ERROR_TRY_OTHERS);
}
if (pi.with_auth) {
if (msg->response.reply_size() != 1 ||
!(msg->response.reply(0).type() == brpc::REDIS_REPLY_STATUS &&
msg->response.reply(0).data().compare("OK") == 0)) {
LOG(ERROR) << "Redis Auth failed: " << msg->response;
return MakeParseError(PARSE_ERROR_NO_RESOURCE,
"Fail to authenticate with Redis");
do {
InputResponse* msg = static_cast<InputResponse*>(socket->parsing_context());
if (msg == NULL) {
msg = new InputResponse;
socket->reset_parsing_context(msg);
}
DestroyingPtr<InputResponse> auth_msg(
static_cast<InputResponse*>(socket->release_parsing_context()));
pi.with_auth = false;
continue;
}
const int consume_count = (pi.with_auth ? 1 : pi.count);
ParseError err = msg->response.ConsumePartialIOBuf(*source, consume_count);
if (err != PARSE_OK) {
socket->GivebackPipelinedInfo(pi);
return MakeParseError(err);
}
if (pi.with_auth) {
if (msg->response.reply_size() != 1 ||
!(msg->response.reply(0).type() == brpc::REDIS_REPLY_STATUS &&
msg->response.reply(0).data().compare("OK") == 0)) {
LOG(ERROR) << "Redis Auth failed: " << msg->response;
return MakeParseError(PARSE_ERROR_NO_RESOURCE,
"Fail to authenticate with Redis");
}
CHECK_EQ((uint32_t)msg->response.reply_size(), pi.count);
msg->id_wait = pi.id_wait;
socket->release_parsing_context();
return MakeMessage(msg);
} while(true);
DestroyingPtr<InputResponse> auth_msg(
static_cast<InputResponse*>(socket->release_parsing_context()));
pi.with_auth = false;
continue;
}
CHECK_EQ((uint32_t)msg->response.reply_size(), pi.count);
msg->id_wait = pi.id_wait;
socket->release_parsing_context();
return MakeMessage(msg);
} while(true);
}
return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
}
......@@ -158,6 +244,8 @@ void ProcessRedisResponse(InputMessageBase* msg_base) {
accessor.OnResponse(cid, saved_error);
}
void ProcessRedisRequest(InputMessageBase* msg_base) { }
void SerializeRedisRequest(butil::IOBuf* buf,
Controller* cntl,
const google::protobuf::Message* request) {
......
......@@ -33,6 +33,11 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket *socket, bool read_eo
// Actions to a redis response.
void ProcessRedisResponse(InputMessageBase* msg);
// Actions to a redis request, which is left unimplemented.
// All requests are processed in execution queue pushed in
// the parse process.
void ProcessRedisRequest(InputMessageBase* msg);
// Serialize a redis request.
void SerializeRedisRequest(butil::IOBuf* buf,
Controller* cntl,
......
......@@ -209,6 +209,18 @@ private:
std::ostream& operator<<(std::ostream& os, const RedisRequest&);
std::ostream& operator<<(std::ostream& os, const RedisResponse&);
class RedisConnection {
public:
virtual ~RedisConnection() {}
virtual void OnRedisMessage(const RedisReply& message, RedisReply* output) = 0;
};
class RedisService {
public:
virtual ~RedisService() {}
virtual RedisConnection* NewConnection() = 0;
};
} // namespace brpc
......
......@@ -42,6 +42,7 @@
#include "brpc/health_reporter.h"
#include "brpc/adaptive_max_concurrency.h"
#include "brpc/http2.h"
#include "brpc/redis.h"
namespace brpc {
......@@ -53,6 +54,7 @@ class SimpleDataPool;
class MongoServiceAdaptor;
class RestfulMap;
class RtmpService;
class RedisService;
struct SocketSSLContext;
struct ServerOptions {
......@@ -235,6 +237,10 @@ struct ServerOptions {
// Customize parameters of HTTP2, defined in http2.h
H2Settings h2_settings;
// For processing Redis conneections.
// Default: NULL (disabled)
RedisService* redis_service;
private:
// SSLOptions is large and not often used, allocate it on heap to
// prevent ServerOptions from being bloated in most cases.
......
......@@ -17,11 +17,12 @@
#include <iostream>
#include "butil/time.h"
#include "butil/logging.h"
#include <butil/time.h>
#include <butil/logging.h>
#include <brpc/redis.h>
#include <brpc/channel.h>
#include <brpc/policy/redis_authenticator.h>
#include <brpc/server.h>
#include <gtest/gtest.h>
namespace brpc {
......@@ -547,4 +548,43 @@ TEST_F(RedisTest, quote_and_escape) {
request.Clear();
}
class RedisConnectionImpl : public brpc::RedisConnection {
public:
void OnRedisMessage(const brpc::RedisReply& message, brpc::RedisReply* output) {
LOG(INFO) << "OnRedisMessage, m=" << message;
return;
}
};
class RedisServiceImpl : public brpc::RedisService {
public:
// @RedisService
brpc::RedisConnection* NewConnection() {
return new RedisConnectionImpl;
}
};
TEST_F(RedisTest, server) {
brpc::Server server;
brpc::ServerOptions server_options;
server_options.redis_service = new RedisServiceImpl;
brpc::PortRange pr(8081, 8900);
ASSERT_EQ(0, server.Start("127.0.0.1", pr, &server_options));
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_REDIS;
brpc::Channel channel;
ASSERT_EQ(0, channel.Init("127.0.0.1", server.listen_address().port, &options));
brpc::RedisRequest request;
brpc::RedisResponse response;
brpc::Controller cntl;
ASSERT_TRUE(request.AddCommand("get hello"));
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(1, response.reply_size());
ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(0).type());
ASSERT_EQ("OK", response.reply(0).data());
}
} //namespace
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment