Commit 38e5f8bc authored by jamesge's avatar jamesge

fix redis related issues

parent b4c70ac2
...@@ -28,9 +28,11 @@ SOPATHS=$(addprefix -Wl$(COMMA)-rpath$(COMMA), $(LIBS)) ...@@ -28,9 +28,11 @@ SOPATHS=$(addprefix -Wl$(COMMA)-rpath$(COMMA), $(LIBS))
DYNAMIC_LINKINGS += -lreadline -lncurses DYNAMIC_LINKINGS += -lreadline -lncurses
PRESS_SOURCES = redis_press.cpp PRESS_SOURCES = redis_press.cpp
CLI_SOURCES = redis_cli.cpp CLI_SOURCES = redis_cli.cpp
SERVER_SOURCES = redis_server.cpp
PRESS_OBJS = $(addsuffix .o, $(basename $(PRESS_SOURCES))) PRESS_OBJS = $(addsuffix .o, $(basename $(PRESS_SOURCES)))
CLI_OBJS = $(addsuffix .o, $(basename $(CLI_SOURCES))) CLI_OBJS = $(addsuffix .o, $(basename $(CLI_SOURCES)))
SERVER_OBJS = $(addsuffix .o, $(basename $(SERVER_SOURCES)))
ifeq ($(SYSTEM),Darwin) ifeq ($(SYSTEM),Darwin)
ifneq ("$(LINK_SO)", "") ifneq ("$(LINK_SO)", "")
...@@ -48,12 +50,12 @@ else ifeq ($(SYSTEM),Linux) ...@@ -48,12 +50,12 @@ else ifeq ($(SYSTEM),Linux)
endif endif
.PHONY:all .PHONY:all
all: redis_press redis_cli all: redis_press redis_cli redis_server
.PHONY:clean .PHONY:clean
clean: clean:
@echo "Cleaning" @echo "Cleaning"
@rm -rf redis_press redis_cli $(PRESS_OBJS) $(CLI_OBJS) @rm -rf redis_press redis_cli $(PRESS_OBJS) $(CLI_OBJS) $(SERVER_OBJS)
redis_press:$(PRESS_OBJS) redis_press:$(PRESS_OBJS)
@echo "Linking $@" @echo "Linking $@"
...@@ -71,6 +73,14 @@ else ...@@ -71,6 +73,14 @@ else
@$(CXX) $(LIBPATHS) $(LINK_OPTIONS) -o $@ @$(CXX) $(LIBPATHS) $(LINK_OPTIONS) -o $@
endif endif
redis_server:$(SERVER_OBJS)
@echo "Linking $@"
ifneq ("$(LINK_SO)", "")
@$(CXX) $(LIBPATHS) $(SOPATHS) $(LINK_OPTIONS_SO) -o $@
else
@$(CXX) $(LIBPATHS) $(LINK_OPTIONS) -o $@
endif
%.o:%.cpp %.o:%.cpp
@echo "Compiling $@" @echo "Compiling $@"
@$(CXX) -c $(HDRPATHS) $(CXXFLAGS) $< -o $@ @$(CXX) -c $(HDRPATHS) $(CXXFLAGS) $< -o $@
......
...@@ -59,12 +59,12 @@ private: ...@@ -59,12 +59,12 @@ private:
class GetCommandHandler : public brpc::RedisCommandHandler { class GetCommandHandler : public brpc::RedisCommandHandler {
public: public:
GetCommandHandler(RedisServiceImpl* rsimpl) explicit GetCommandHandler(RedisServiceImpl* rsimpl)
: _rsimpl(rsimpl) {} : _rsimpl(rsimpl) {}
brpc::RedisCommandHandler::Result Run(const std::vector<const char*>& args, brpc::RedisCommandHandler::Result Run(const std::vector<const char*>& args,
brpc::RedisReply* output, brpc::RedisReply* output,
bool flush_batched) override { bool /*flush_batched*/) override {
if (args.size() <= 1) { if (args.size() <= 1) {
output->SetError("ERR wrong number of arguments for 'get' command"); output->SetError("ERR wrong number of arguments for 'get' command");
return brpc::RedisCommandHandler::OK; return brpc::RedisCommandHandler::OK;
...@@ -85,12 +85,12 @@ private: ...@@ -85,12 +85,12 @@ private:
class SetCommandHandler : public brpc::RedisCommandHandler { class SetCommandHandler : public brpc::RedisCommandHandler {
public: public:
SetCommandHandler(RedisServiceImpl* rsimpl) explicit SetCommandHandler(RedisServiceImpl* rsimpl)
: _rsimpl(rsimpl) {} : _rsimpl(rsimpl) {}
brpc::RedisCommandHandler::Result Run(const std::vector<const char*>& args, brpc::RedisCommandHandler::Result Run(const std::vector<const char*>& args,
brpc::RedisReply* output, brpc::RedisReply* output,
bool flush_batched) override { bool /*flush_batched*/) override {
if (args.size() <= 2) { if (args.size() <= 2) {
output->SetError("ERR wrong number of arguments for 'set' command"); output->SetError("ERR wrong number of arguments for 'set' command");
return brpc::RedisCommandHandler::OK; return brpc::RedisCommandHandler::OK;
......
...@@ -56,16 +56,15 @@ struct InputResponse : public InputMessageBase { ...@@ -56,16 +56,15 @@ struct InputResponse : public InputMessageBase {
// This class is as parsing_context in socket. // This class is as parsing_context in socket.
class RedisConnContext : public Destroyable { class RedisConnContext : public Destroyable {
public: public:
RedisConnContext() explicit RedisConnContext(const RedisService* rs)
: redis_service(NULL) : redis_service(rs)
, batched_size(0) {} , batched_size(0) {}
~RedisConnContext(); ~RedisConnContext();
// @Destroyable // @Destroyable
void Destroy() override; void Destroy() override;
SocketId socket_id; const RedisService* redis_service;
RedisService* redis_service;
// If user starts a transaction, transaction_handler indicates the // If user starts a transaction, transaction_handler indicates the
// handler pointer that runs the transaction command. // handler pointer that runs the transaction command.
std::unique_ptr<RedisCommandHandler> transaction_handler; std::unique_ptr<RedisCommandHandler> transaction_handler;
...@@ -151,15 +150,13 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket, ...@@ -151,15 +150,13 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
} }
const Server* server = static_cast<const Server*>(arg); const Server* server = static_cast<const Server*>(arg);
if (server) { if (server) {
RedisService* rs = server->options().redis_service; const RedisService* const rs = server->options().redis_service;
if (!rs) { if (!rs) {
return MakeParseError(PARSE_ERROR_TRY_OTHERS); return MakeParseError(PARSE_ERROR_TRY_OTHERS);
} }
RedisConnContext* ctx = static_cast<RedisConnContext*>(socket->parsing_context()); RedisConnContext* ctx = static_cast<RedisConnContext*>(socket->parsing_context());
if (ctx == NULL) { if (ctx == NULL) {
ctx = new RedisConnContext; ctx = new RedisConnContext(rs);
ctx->socket_id = socket->id();
ctx->redis_service = rs;
socket->reset_parsing_context(ctx); socket->reset_parsing_context(ctx);
} }
std::vector<const char*> current_commands; std::vector<const char*> current_commands;
......
...@@ -447,7 +447,7 @@ bool RedisService::AddCommandHandler(const std::string& name, RedisCommandHandle ...@@ -447,7 +447,7 @@ bool RedisService::AddCommandHandler(const std::string& name, RedisCommandHandle
return true; return true;
} }
RedisCommandHandler* RedisService::FindCommandHandler(const std::string& name) { RedisCommandHandler* RedisService::FindCommandHandler(const std::string& name) const {
std::string lcname = StringToLowerASCII(name); std::string lcname = StringToLowerASCII(name);
auto it = _command_map.find(lcname); auto it = _command_map.find(lcname);
if (it != _command_map.end()) { if (it != _command_map.end()) {
......
...@@ -214,19 +214,20 @@ std::ostream& operator<<(std::ostream& os, const RedisResponse&); ...@@ -214,19 +214,20 @@ std::ostream& operator<<(std::ostream& os, const RedisResponse&);
class RedisCommandHandler; class RedisCommandHandler;
// Implement this class and assign an instance to ServerOption.redis_service // Container of CommandHandlers.
// to enable redis support. // Assign an instance to ServerOption.redis_service to enable redis support.
class RedisService { class RedisService {
public: public:
typedef std::unordered_map<std::string, RedisCommandHandler*> CommandMap;
virtual ~RedisService() {} virtual ~RedisService() {}
// Call this function to register `handler` that can handle command `name`. // Call this function to register `handler` that can handle command `name`.
bool AddCommandHandler(const std::string& name, RedisCommandHandler* handler); bool AddCommandHandler(const std::string& name, RedisCommandHandler* handler);
// This function should not be touched by user and used by brpc deverloper only. // This function should not be touched by user and used by brpc deverloper only.
RedisCommandHandler* FindCommandHandler(const std::string& name); RedisCommandHandler* FindCommandHandler(const std::string& name) const;
private: private:
typedef std::unordered_map<std::string, RedisCommandHandler*> CommandMap;
CommandMap _command_map; CommandMap _command_map;
}; };
......
...@@ -433,6 +433,9 @@ Server::~Server() { ...@@ -433,6 +433,9 @@ Server::~Server() {
delete _options.auth; delete _options.auth;
_options.auth = NULL; _options.auth = NULL;
} }
delete _options.redis_service;
_options.redis_service = NULL;
} }
int Server::AddBuiltinServices() { int Server::AddBuiltinServices() {
......
...@@ -68,12 +68,12 @@ struct ServerOptions { ...@@ -68,12 +68,12 @@ struct ServerOptions {
std::string pid_file; std::string pid_file;
// Process requests in format of nshead_t + blob. // Process requests in format of nshead_t + blob.
// Owned by Server and deleted in server's destructor // Owned by Server and deleted in server's destructor.
// Default: NULL // Default: NULL
NsheadService* nshead_service; NsheadService* nshead_service;
// Process requests in format of thrift_binary_head_t + blob. // Process requests in format of thrift_binary_head_t + blob.
// Owned by Server and deleted in server's destructor // Owned by Server and deleted in server's destructor.
// Default: NULL // Default: NULL
ThriftService* thrift_service; ThriftService* thrift_service;
...@@ -215,7 +215,8 @@ struct ServerOptions { ...@@ -215,7 +215,8 @@ struct ServerOptions {
// including accesses to builtin services and pb services. // including accesses to builtin services and pb services.
// The service must have a method named "default_method" and the request // The service must have a method named "default_method" and the request
// and response must have no fields. // and response must have no fields.
// This service is owned by server and deleted in server's destructor //
// Owned by Server and deleted in server's destructor
google::protobuf::Service* http_master_service; google::protobuf::Service* http_master_service;
// If this field is on, contents on /health page is generated by calling // If this field is on, contents on /health page is generated by calling
...@@ -236,6 +237,7 @@ struct ServerOptions { ...@@ -236,6 +237,7 @@ struct ServerOptions {
H2Settings h2_settings; H2Settings h2_settings;
// For processing Redis connections. Read src/brpc/redis.h for details. // For processing Redis connections. Read src/brpc/redis.h for details.
// Owned by Server and deleted in server's destructor.
// Default: NULL (disabled) // Default: NULL (disabled)
RedisService* redis_service; RedisService* redis_service;
......
...@@ -850,8 +850,8 @@ public: ...@@ -850,8 +850,8 @@ public:
class SetCommandHandler : public brpc::RedisCommandHandler { class SetCommandHandler : public brpc::RedisCommandHandler {
public: public:
SetCommandHandler(bool batch_process = false) SetCommandHandler(RedisServiceImpl* rs, bool batch_process = false)
: rs(NULL) : _rs(rs)
, _batch_process(batch_process) {} , _batch_process(batch_process) {}
brpc::RedisCommandHandler::Result Run(const std::vector<const char*>& args, brpc::RedisCommandHandler::Result Run(const std::vector<const char*>& args,
...@@ -862,7 +862,7 @@ public: ...@@ -862,7 +862,7 @@ public:
return brpc::RedisCommandHandler::OK; return brpc::RedisCommandHandler::OK;
} }
if (_batch_process) { if (_batch_process) {
return rs->OnBatched(args, output, flush_batched); return _rs->OnBatched(args, output, flush_batched);
} else { } else {
DoSet(args[1], args[2], output); DoSet(args[1], args[2], output);
return brpc::RedisCommandHandler::OK; return brpc::RedisCommandHandler::OK;
...@@ -874,15 +874,15 @@ public: ...@@ -874,15 +874,15 @@ public:
output->SetStatus("OK"); output->SetStatus("OK");
} }
RedisServiceImpl* rs;
private: private:
RedisServiceImpl* _rs;
bool _batch_process; bool _batch_process;
}; };
class GetCommandHandler : public brpc::RedisCommandHandler { class GetCommandHandler : public brpc::RedisCommandHandler {
public: public:
GetCommandHandler(bool batch_process = false) GetCommandHandler(RedisServiceImpl* rs, bool batch_process = false)
: rs(NULL) : _rs(rs)
, _batch_process(batch_process) {} , _batch_process(batch_process) {}
brpc::RedisCommandHandler::Result Run(const std::vector<const char*>& args, brpc::RedisCommandHandler::Result Run(const std::vector<const char*>& args,
...@@ -893,7 +893,7 @@ public: ...@@ -893,7 +893,7 @@ public:
return brpc::RedisCommandHandler::OK; return brpc::RedisCommandHandler::OK;
} }
if (_batch_process) { if (_batch_process) {
return rs->OnBatched(args, output, flush_batched); return _rs->OnBatched(args, output, flush_batched);
} else { } else {
DoGet(args[1], output); DoGet(args[1], output);
return brpc::RedisCommandHandler::OK; return brpc::RedisCommandHandler::OK;
...@@ -909,8 +909,8 @@ public: ...@@ -909,8 +909,8 @@ public:
} }
} }
RedisServiceImpl* rs;
private: private:
RedisServiceImpl* _rs;
bool _batch_process; bool _batch_process;
}; };
...@@ -939,8 +939,8 @@ TEST_F(RedisTest, server_sanity) { ...@@ -939,8 +939,8 @@ TEST_F(RedisTest, server_sanity) {
brpc::Server server; brpc::Server server;
brpc::ServerOptions server_options; brpc::ServerOptions server_options;
RedisServiceImpl* rsimpl = new RedisServiceImpl; RedisServiceImpl* rsimpl = new RedisServiceImpl;
GetCommandHandler *gh = new GetCommandHandler; GetCommandHandler *gh = new GetCommandHandler(rsimpl);
SetCommandHandler *sh = new SetCommandHandler; SetCommandHandler *sh = new SetCommandHandler(rsimpl);
IncrCommandHandler *ih = new IncrCommandHandler; IncrCommandHandler *ih = new IncrCommandHandler;
rsimpl->AddCommandHandler("get", gh); rsimpl->AddCommandHandler("get", gh);
rsimpl->AddCommandHandler("set", sh); rsimpl->AddCommandHandler("set", sh);
...@@ -1084,8 +1084,8 @@ TEST_F(RedisTest, server_command_continue) { ...@@ -1084,8 +1084,8 @@ TEST_F(RedisTest, server_command_continue) {
brpc::Server server; brpc::Server server;
brpc::ServerOptions server_options; brpc::ServerOptions server_options;
RedisServiceImpl* rsimpl = new RedisServiceImpl; RedisServiceImpl* rsimpl = new RedisServiceImpl;
rsimpl->AddCommandHandler("get", new GetCommandHandler); rsimpl->AddCommandHandler("get", new GetCommandHandler(rsimpl));
rsimpl->AddCommandHandler("set", new SetCommandHandler); rsimpl->AddCommandHandler("set", new SetCommandHandler(rsimpl));
rsimpl->AddCommandHandler("incr", new IncrCommandHandler); rsimpl->AddCommandHandler("incr", new IncrCommandHandler);
rsimpl->AddCommandHandler("multi", new MultiCommandHandler); rsimpl->AddCommandHandler("multi", new MultiCommandHandler);
server_options.redis_service = rsimpl; server_options.redis_service = rsimpl;
...@@ -1159,10 +1159,8 @@ TEST_F(RedisTest, server_handle_pipeline) { ...@@ -1159,10 +1159,8 @@ TEST_F(RedisTest, server_handle_pipeline) {
brpc::Server server; brpc::Server server;
brpc::ServerOptions server_options; brpc::ServerOptions server_options;
RedisServiceImpl* rsimpl = new RedisServiceImpl; RedisServiceImpl* rsimpl = new RedisServiceImpl;
GetCommandHandler* getch = new GetCommandHandler(true); GetCommandHandler* getch = new GetCommandHandler(rsimpl, true);
SetCommandHandler* setch = new SetCommandHandler(true); SetCommandHandler* setch = new SetCommandHandler(rsimpl, true);
getch->rs = rsimpl;
setch->rs = rsimpl;
rsimpl->AddCommandHandler("get", getch); rsimpl->AddCommandHandler("get", getch);
rsimpl->AddCommandHandler("set", setch); rsimpl->AddCommandHandler("set", setch);
rsimpl->AddCommandHandler("multi", new MultiCommandHandler); rsimpl->AddCommandHandler("multi", new MultiCommandHandler);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment