Commit d483a54c authored by zhujiashun's avatar zhujiashun

redis_server_protocol: command before transaction are regareded as last command

parent cab9db70
......@@ -78,6 +78,7 @@ public:
int ConsumeCommand(RedisConnContext* ctx,
const std::vector<const char*>& commands,
const std::string& next_command,
butil::Arena* arena,
bool is_last,
butil::IOBuf* sendbuf) {
......@@ -98,10 +99,16 @@ int ConsumeCommand(RedisConnContext* ctx,
snprintf(buf, sizeof(buf), "ERR unknown command `%s`", commands[0]);
output.SetError(buf);
} else {
RedisCommandHandler* next_ch =
ctx->redis_service->FindCommandHandler(next_command);
if (next_ch && next_ch->TransactionMarker()) {
is_last = true;
}
result = ch->Run(commands, &output, is_last);
if (result == RedisCommandHandler::CONTINUE) {
if (ctx->batched_size) {
LOG(ERROR) << "CONTINUE should not be returned in redis batched process.";
if (ctx->batched_size != 0) {
LOG(ERROR) << "Do you forget to return OK "
"when is_last is true?";
return -1;
}
ctx->transaction_handler.reset(ch->NewTransactionHandler());
......@@ -139,6 +146,8 @@ int ConsumeCommand(RedisConnContext* ctx,
RedisConnContext::~RedisConnContext() { }
void RedisConnContext::Destroy() {
delete this;
}
// ========== impl of RedisConnContext ==========
......@@ -176,12 +185,16 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
if (err != PARSE_OK) {
break;
}
if (ConsumeCommand(ctx, current_commands, &arena, false, &sendbuf) != 0) {
std::string next_command_name;
if (!next_commands.empty()) {
next_command_name = next_commands[0];
}
if (ConsumeCommand(ctx, current_commands, next_command_name, &arena, false, &sendbuf) != 0) {
return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
}
current_commands.swap(next_commands);
}
if (ConsumeCommand(ctx, current_commands, &arena,
if (ConsumeCommand(ctx, current_commands, "", &arena,
true /* must be last message */, &sendbuf) != 0) {
return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
}
......
......@@ -263,20 +263,20 @@ public:
brpc::RedisReply* output,
bool is_last) = 0;
// This function is called to new a transaction handler once Run() returns
// RedisCommandHandler::CONTINUE. All the following commands are sent to this
// handler until it return Result::OK. For example, for command "multi; set k1 v1;
// set k2 v2; set k3 v3; exec":
// 1) In Run(), command is "multi", so return RedisCommandHandler::CONTINUE, and
// brpc calls NewTransactionHandler() to new a handler tran_handler.
// 2) brpc calls tran_handler.Run() with command "set k1 v1", which should return
// RedisCommandHandler::CONTINUE and buffer the command.
// 3) brpc calls tran_handler.Run() with command "set k2 v2", which should return
// RedisCommandHandler::CONTINUE and buffer the command.
// 4) brpc calls tran_handler.Run() with command "set k3 v3", which should return
// RedisCommandHandler::CONTINUE and buffer the command.
// 5) An ending marker(exec) is found in tran_handler.Run(), user exeuctes all
// the command and return RedisCommandHandler::OK. This Transation is done.
// The Run() returns CONTINUE for "multi", which makes brpc call this method to
// create a transaction_handler to process following commands until transaction_handler
// returns OK. For example, for command "multi; set k1 v1; set k2 v2; set k3 v3;
// exec":
// 1) First command is "multi" and Run() return RedisCommandHandler::CONTINUE, then
// brpc calls NewTransactionHandler() to new a transaction_handler.
// 2) Call transaction_handler.Run() with command "set k1 v1", which should return
// CONTINUE.
// 3) Call transaction_handler.Run() with command "set k2 v2", which should return
// CONTINUE.
// 4) Call transaction_handler.Run() with command "set k3 v3", which should return
// CONTINUE.
// 5) An ending marker(exec) is found in transaction_handler.Run(), user exeuctes all
// the commands and return OK. This Transation is done.
virtual RedisCommandHandler* NewTransactionHandler();
// return true if a transaction is started when met this command.
......
......@@ -405,6 +405,12 @@ ParseError RedisCommandParser::Consume(butil::IOBuf& buf,
buf.cutn(d, len);
d[len] = '\0';
_commands[_index] = d;
if (_index == 0) {
// convert it to lowercase when it is command name
for (int i = 0; i < len; ++i) {
d[i] = ::tolower(d[i]);
}
}
char crlf[2];
buf.cutn(crlf, sizeof(crlf));
if (crlf[0] != '\r' || crlf[1] != '\n') {
......
......@@ -25,7 +25,7 @@
namespace brpc {
//BAIDU_CASSERT(sizeof(RedisReply) == 24, size_match);
const uint32_t RedisReply::npos = (uint32_t)-1;
const int RedisReply::npos = -1;
const char* RedisReplyTypeToString(RedisReplyType type) {
switch (type) {
......@@ -47,7 +47,7 @@ bool RedisReply::SerializeTo(butil::IOBuf* buf) {
// fall through
case REDIS_REPLY_STATUS:
buf->push_back((_type == REDIS_REPLY_ERROR)? '-' : '+');
if (_length < sizeof(_data.short_str)) {
if (_length < (int)sizeof(_data.short_str)) {
buf->append(_data.short_str, _length);
} else {
buf->append(_data.long_str, _length);
......@@ -62,17 +62,15 @@ bool RedisReply::SerializeTo(butil::IOBuf* buf) {
buf->append(prefix_buf, len + 3 /* 1 for ':', 2 for "\r\n" */);
break;
case REDIS_REPLY_STRING:
// Since _length is unsigned, we have to casting _length to signed
// representing nil string
prefix_buf[0] = '$';
len = butil::AppendDecimal(&prefix_buf[1], (int)_length);
len = butil::AppendDecimal(&prefix_buf[1], _length);
prefix_buf[len + 1] = '\r';
prefix_buf[len + 2] = '\n';
buf->append(prefix_buf, len + 3 /* 1 for ':', 2 for "\r\n" */);
if (_length == npos) {
break;
}
if (_length < sizeof(_data.short_str)) {
if (_length < (int)sizeof(_data.short_str)) {
buf->append(_data.short_str, _length);
} else {
buf->append(_data.long_str, _length);
......@@ -80,17 +78,15 @@ bool RedisReply::SerializeTo(butil::IOBuf* buf) {
buf->append("\r\n");
break;
case REDIS_REPLY_ARRAY:
// Since _length is unsigned, we have to casting _length to signed
// representing nil string
prefix_buf[0] = '*';
len = butil::AppendDecimal(&prefix_buf[1], (int)_length);
len = butil::AppendDecimal(&prefix_buf[1], _length);
prefix_buf[len + 1] = '\r';
prefix_buf[len + 2] = '\n';
buf->append(prefix_buf, len + 3 /* 1 for ':', 2 for "\r\n" */);
if (_length == npos) {
break;
}
for (size_t i = 0; i < _length; ++i) {
for (int i = 0; i < _length; ++i) {
if (!_data.array.replies[i].SerializeTo(buf)) {
return false;
}
......@@ -111,7 +107,7 @@ ParseError RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* aren
// The parsing was suspended while parsing sub replies,
// continue the parsing.
RedisReply* subs = (RedisReply*)_data.array.replies;
for (uint32_t i = _data.array.last_index; i < _length; ++i) {
for (int i = _data.array.last_index; i < _length; ++i) {
ParseError err = subs[i].ConsumePartialIOBuf(buf, arena);
if (err != PARSE_OK) {
return err;
......@@ -334,7 +330,7 @@ void RedisReply::Print(std::ostream& os) const {
switch (_type) {
case REDIS_REPLY_STRING:
os << '"';
if (_length < sizeof(_data.short_str)) {
if (_length < (int)sizeof(_data.short_str)) {
os << RedisStringPrinter(_data.short_str, _length);
} else {
os << RedisStringPrinter(_data.long_str, _length);
......@@ -343,7 +339,7 @@ void RedisReply::Print(std::ostream& os) const {
break;
case REDIS_REPLY_ARRAY:
os << '[';
for (uint32_t i = 0; i < _length; ++i) {
for (int i = 0; i < _length; ++i) {
if (i != 0) {
os << ", ";
}
......@@ -361,7 +357,7 @@ void RedisReply::Print(std::ostream& os) const {
os << "(error) ";
// fall through
case REDIS_REPLY_STATUS:
if (_length < sizeof(_data.short_str)) {
if (_length < (int)sizeof(_data.short_str)) {
os << RedisStringPrinter(_data.short_str, _length);
} else {
os << RedisStringPrinter(_data.long_str, _length);
......@@ -384,7 +380,7 @@ void RedisReply::CopyFromDifferentArena(const RedisReply& other,
LOG(FATAL) << "Fail to allocate RedisReply[" << _length << "]";
return;
}
for (uint32_t i = 0; i < _length; ++i) {
for (int i = 0; i < _length; ++i) {
new (&subs[i]) RedisReply;
}
_data.array.last_index = other._data.array.last_index;
......@@ -394,7 +390,7 @@ void RedisReply::CopyFromDifferentArena(const RedisReply& other,
subs[i].CopyFromDifferentArena(other._data.array.replies[i], arena);
}
} else {
for (size_t i = 0; i < _length; ++i) {
for (int i = 0; i < _length; ++i) {
subs[i].CopyFromDifferentArena(other._data.array.replies[i], arena);
}
}
......@@ -411,7 +407,7 @@ void RedisReply::CopyFromDifferentArena(const RedisReply& other,
case REDIS_REPLY_ERROR:
// fall through
case REDIS_REPLY_STATUS:
if (_length < sizeof(_data.short_str)) {
if (_length < (int)sizeof(_data.short_str)) {
memcpy(_data.short_str, other._data.short_str, _length + 1);
} else {
char* d = (char*)arena->allocate((_length/8 + 1)*8);
......
......@@ -143,7 +143,7 @@ public:
void CopyFromSameArena(const RedisReply& other);
private:
static const uint32_t npos;
static const int npos;
// RedisReply does not own the memory of fields, copying must be done
// by calling CopyFrom[Different|Same]Arena.
......@@ -153,7 +153,7 @@ private:
void Reset();
RedisReplyType _type;
uint32_t _length; // length of short_str/long_str, count of replies
int _length; // length of short_str/long_str, count of replies
union {
int64_t integer;
char short_str[16];
......@@ -181,9 +181,9 @@ inline void RedisReply::Reset() {
_data.array.replies = NULL;
}
inline RedisReply::RedisReply(butil::Arena* arena) {
inline RedisReply::RedisReply(butil::Arena* arena)
: _arena(arena) {
Reset();
_arena = arena;
}
inline RedisReply::RedisReply() {
......@@ -248,7 +248,7 @@ inline void RedisReply::SetString(const std::string& str) {
inline const char* RedisReply::c_str() const {
if (is_string()) {
if (_length < sizeof(_data.short_str)) { // SSO
if (_length < (int)sizeof(_data.short_str)) { // SSO
return _data.short_str;
} else {
return _data.long_str;
......@@ -261,7 +261,7 @@ inline const char* RedisReply::c_str() const {
inline butil::StringPiece RedisReply::data() const {
if (is_string()) {
if (_length < sizeof(_data.short_str)) { // SSO
if (_length < (int)sizeof(_data.short_str)) { // SSO
return butil::StringPiece(_data.short_str, _length);
} else {
return butil::StringPiece(_data.long_str, _length);
......@@ -274,7 +274,7 @@ inline butil::StringPiece RedisReply::data() const {
inline const char* RedisReply::error_message() const {
if (is_error()) {
if (_length < sizeof(_data.short_str)) { // SSO
if (_length < (int)sizeof(_data.short_str)) { // SSO
return _data.short_str;
} else {
return _data.long_str;
......@@ -286,7 +286,7 @@ inline const char* RedisReply::error_message() const {
}
inline size_t RedisReply::size() const {
return ((is_array() || is_string()) ? _length : 0);
return _length;
}
inline RedisReply& RedisReply::operator[](size_t index) {
......@@ -295,7 +295,7 @@ inline RedisReply& RedisReply::operator[](size_t index) {
}
inline const RedisReply& RedisReply::operator[](size_t index) const {
if (is_array() && index < _length) {
if (is_array() && (int)index < _length) {
return _data.array.replies[index];
}
static RedisReply redis_nil;
......
......@@ -788,15 +788,15 @@ public:
brpc::RedisCommandHandler::Result OnBatched(const std::vector<const char*> args,
brpc::RedisReply* output, bool is_last) {
if (_batched_command.empty() && is_last) {
if (strcasecmp(args[0], "set") == 0) {
if (strcmp(args[0], "set") == 0) {
DoSet(args[1], args[2], output);
} else if (strcasecmp(args[0], "get") == 0) {
} else if (strcmp(args[0], "get") == 0) {
DoGet(args[1], output);
}
return brpc::RedisCommandHandler::OK;
}
std::vector<std::string> comm;
for (int i = 0; args[i]; ++i) {
for (int i = 0; i < (int)args.size(); ++i) {
comm.push_back(args[i]);
}
_batched_command.push_back(comm);
......@@ -1027,22 +1027,25 @@ public:
return brpc::RedisCommandHandler::CONTINUE;
}
RedisCommandHandler* NewTransactionHandler() {
RedisCommandHandler* NewTransactionHandler() override {
return new MultiTransactionHandler;
}
bool TransactionMarker() override {
return true;
}
class MultiTransactionHandler : public brpc::RedisCommandHandler {
public:
brpc::RedisCommandHandler::Result Run(const std::vector<const char*>& args,
brpc::RedisReply* output,
bool is_last) {
if (strcasecmp(args[0], "multi") == 0) {
if (strcmp(args[0], "multi") == 0) {
output->SetError("ERR duplicate multi");
return brpc::RedisCommandHandler::CONTINUE;
}
if (strcasecmp(args[0], "exec") != 0) {
if (strcmp(args[0], "exec") != 0) {
std::vector<std::string> comm;
for (int i = 0; args[i]; ++i) {
for (int i = 0; i < (int)args.size(); ++i) {
comm.push_back(args[i]);
}
_commands.push_back(comm);
......@@ -1057,7 +1060,7 @@ public:
value = ++int_map[_commands[i][1]];
(*output)[i].SetInteger(value);
} else {
(*output)[i].SetStatus("unknowne command");
(*output)[i].SetStatus("unknown command");
}
}
s_mutex.unlock();
......@@ -1153,6 +1156,7 @@ TEST_F(RedisTest, server_handle_pipeline) {
setch->rs = rsimpl;
rsimpl->AddCommandHandler("get", getch);
rsimpl->AddCommandHandler("set", setch);
rsimpl->AddCommandHandler("multi", new MultiCommandHandler);
server_options.redis_service = rsimpl;
brpc::PortRange pr(8081, 8900);
ASSERT_EQ(0, server.Start("127.0.0.1", pr, &server_options));
......@@ -1173,12 +1177,20 @@ TEST_F(RedisTest, server_handle_pipeline) {
ASSERT_TRUE(request.AddCommand("set key1 world"));
ASSERT_TRUE(request.AddCommand("set key2 world"));
ASSERT_TRUE(request.AddCommand("get key2"));
ASSERT_TRUE(request.AddCommand("multi"));
ASSERT_TRUE(request.AddCommand("incr key4"));
ASSERT_TRUE(request.AddCommand("exec"));
ASSERT_TRUE(request.AddCommand("set key1 v1"));
ASSERT_TRUE(request.AddCommand("set key2 v2"));
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(8, response.reply_size());
ASSERT_EQ(1, rsimpl->_batch_count);
ASSERT_EQ(13, response.reply_size());
ASSERT_EQ(2, rsimpl->_batch_count);
ASSERT_TRUE(response.reply(7).is_string());
ASSERT_STREQ(response.reply(7).c_str(), "world");
ASSERT_TRUE(response.reply(10).is_array());
ASSERT_TRUE(response.reply(10)[0].is_integer());
ASSERT_EQ(response.reply(10)[0].integer(), 1);
}
} //namespace
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment