Commit 86c95f3e authored by LorinLee's avatar LorinLee

Fix redis parse

parent 6c9c0802
...@@ -79,9 +79,10 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket, ...@@ -79,9 +79,10 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
const int consume_count = (pi.with_auth ? 1 : pi.count); const int consume_count = (pi.with_auth ? 1 : pi.count);
if (!msg->response.ConsumePartialIOBuf(*source, consume_count)) { ParseError err = msg->response.ConsumePartialIOBuf(*source, consume_count);
if (err != PARSE_OK) {
socket->GivebackPipelinedInfo(pi); socket->GivebackPipelinedInfo(pi);
return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA); return MakeParseError(err);
} }
if (pi.with_auth) { if (pi.with_auth) {
......
...@@ -515,11 +515,12 @@ void RedisResponse::Swap(RedisResponse* other) { ...@@ -515,11 +515,12 @@ void RedisResponse::Swap(RedisResponse* other) {
// =================================================================== // ===================================================================
bool RedisResponse::ConsumePartialIOBuf(butil::IOBuf& buf, int reply_count) { ParseError RedisResponse::ConsumePartialIOBuf(butil::IOBuf& buf, int reply_count) {
size_t oldsize = buf.size(); size_t oldsize = buf.size();
if (reply_size() == 0) { if (reply_size() == 0) {
if (!_first_reply.ConsumePartialIOBuf(buf, &_arena)) { ParseError err = _first_reply.ConsumePartialIOBuf(buf, &_arena);
return false; if (err != PARSE_OK) {
return err;
} }
const size_t newsize = buf.size(); const size_t newsize = buf.size();
_cached_size_ += oldsize - newsize; _cached_size_ += oldsize - newsize;
...@@ -532,15 +533,16 @@ bool RedisResponse::ConsumePartialIOBuf(butil::IOBuf& buf, int reply_count) { ...@@ -532,15 +533,16 @@ bool RedisResponse::ConsumePartialIOBuf(butil::IOBuf& buf, int reply_count) {
sizeof(RedisReply) * (reply_count - 1)); sizeof(RedisReply) * (reply_count - 1));
if (_other_replies == NULL) { if (_other_replies == NULL) {
LOG(ERROR) << "Fail to allocate RedisReply[" << reply_count -1 << "]"; LOG(ERROR) << "Fail to allocate RedisReply[" << reply_count -1 << "]";
return false; return PARSE_ERROR_ABSOLUTELY_WRONG;
} }
for (int i = 0; i < reply_count - 1; ++i) { for (int i = 0; i < reply_count - 1; ++i) {
new (&_other_replies[i]) RedisReply; new (&_other_replies[i]) RedisReply;
} }
} }
for (int i = reply_size(); i < reply_count; ++i) { for (int i = reply_size(); i < reply_count; ++i) {
if (!_other_replies[i - 1].ConsumePartialIOBuf(buf, &_arena)) { ParseError err = _other_replies[i - 1].ConsumePartialIOBuf(buf, &_arena);
return false; if (err != PARSE_OK) {
return err;
} }
const size_t newsize = buf.size(); const size_t newsize = buf.size();
_cached_size_ += oldsize - newsize; _cached_size_ += oldsize - newsize;
...@@ -548,7 +550,7 @@ bool RedisResponse::ConsumePartialIOBuf(butil::IOBuf& buf, int reply_count) { ...@@ -548,7 +550,7 @@ bool RedisResponse::ConsumePartialIOBuf(butil::IOBuf& buf, int reply_count) {
++_nreply; ++_nreply;
} }
} }
return true; return PARSE_OK;
} }
std::ostream& operator<<(std::ostream& os, const RedisResponse& response) { std::ostream& operator<<(std::ostream& os, const RedisResponse& response) {
......
...@@ -30,6 +30,7 @@ ...@@ -30,6 +30,7 @@
#include "butil/strings/string_piece.h" #include "butil/strings/string_piece.h"
#include "butil/arena.h" #include "butil/arena.h"
#include "redis_reply.h" #include "redis_reply.h"
#include "parse_result.h"
namespace brpc { namespace brpc {
...@@ -177,8 +178,10 @@ public: ...@@ -177,8 +178,10 @@ public:
} }
// Parse and consume intact replies from the buf. // Parse and consume intact replies from the buf.
// Returns true on success, false otherwise. // Returns PARSE_OK on success.
bool ConsumePartialIOBuf(butil::IOBuf& buf, int reply_count); // Returns PARSE_ERROR_NOT_ENOUGH_DATA if data in `buf' is not enough to parse.
// Returns PARSE_ERROR_ABSOLUTELY_WRONG if the parsing failed.
ParseError ConsumePartialIOBuf(butil::IOBuf& buf, int reply_count);
// implements Message ---------------------------------------------- // implements Message ----------------------------------------------
......
...@@ -34,26 +34,27 @@ const char* RedisReplyTypeToString(RedisReplyType type) { ...@@ -34,26 +34,27 @@ const char* RedisReplyTypeToString(RedisReplyType type) {
} }
} }
bool RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* arena) { ParseError RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* arena) {
if (_type == REDIS_REPLY_ARRAY && _data.array.last_index >= 0) { if (_type == REDIS_REPLY_ARRAY && _data.array.last_index >= 0) {
// The parsing was suspended while parsing sub replies, // The parsing was suspended while parsing sub replies,
// continue the parsing. // continue the parsing.
RedisReply* subs = (RedisReply*)_data.array.replies; RedisReply* subs = (RedisReply*)_data.array.replies;
for (uint32_t i = _data.array.last_index; i < _length; ++i) { for (uint32_t i = _data.array.last_index; i < _length; ++i) {
if (!subs[i].ConsumePartialIOBuf(buf, arena)) { ParseError err = subs[i].ConsumePartialIOBuf(buf, arena);
return false; if (err != PARSE_OK) {
return err;
} }
++_data.array.last_index; ++_data.array.last_index;
} }
// We've got an intact reply. reset the index. // We've got an intact reply. reset the index.
_data.array.last_index = -1; _data.array.last_index = -1;
return true; return PARSE_OK;
} }
// Notice that all branches returning false must not change `buf'. // Notice that all branches returning PARSE_ERROR_NOT_ENOUGH_DATA must not change `buf'.
const char* pfc = (const char*)buf.fetch1(); const char* pfc = (const char*)buf.fetch1();
if (pfc == NULL) { if (pfc == NULL) {
return false; return PARSE_ERROR_NOT_ENOUGH_DATA;
} }
const char fc = *pfc; // first character const char fc = *pfc; // first character
switch (fc) { switch (fc) {
...@@ -61,7 +62,13 @@ bool RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* arena) { ...@@ -61,7 +62,13 @@ bool RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* arena) {
case '+': { // Simple String "+<string>\r\n" case '+': { // Simple String "+<string>\r\n"
butil::IOBuf str; butil::IOBuf str;
if (buf.cut_until(&str, "\r\n") != 0) { if (buf.cut_until(&str, "\r\n") != 0) {
return false; const size_t len = buf.size();
if (len > std::numeric_limits<uint32_t>::max()) {
LOG(ERROR) << "simple string is too long! max length=2^32-1,"
" actually=" << len;
return PARSE_ERROR_ABSOLUTELY_WRONG;
}
return PARSE_ERROR_NOT_ENOUGH_DATA;
} }
const size_t len = str.size() - 1; const size_t len = str.size() - 1;
if (len < sizeof(_data.short_str)) { if (len < sizeof(_data.short_str)) {
...@@ -69,18 +76,18 @@ bool RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* arena) { ...@@ -69,18 +76,18 @@ bool RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* arena) {
_type = (fc == '-' ? REDIS_REPLY_ERROR : REDIS_REPLY_STATUS); _type = (fc == '-' ? REDIS_REPLY_ERROR : REDIS_REPLY_STATUS);
_length = len; _length = len;
str.copy_to_cstr(_data.short_str, (size_t)-1L, 1/*skip fc*/); str.copy_to_cstr(_data.short_str, (size_t)-1L, 1/*skip fc*/);
return true; return PARSE_OK;
} }
char* d = (char*)arena->allocate((len/8 + 1)*8); char* d = (char*)arena->allocate((len/8 + 1)*8);
if (d == NULL) { if (d == NULL) {
LOG(FATAL) << "Fail to allocate string[" << len << "]"; LOG(FATAL) << "Fail to allocate string[" << len << "]";
return false; return PARSE_ERROR_ABSOLUTELY_WRONG;
} }
CHECK_EQ(len, str.copy_to_cstr(d, (size_t)-1L, 1/*skip fc*/)); CHECK_EQ(len, str.copy_to_cstr(d, (size_t)-1L, 1/*skip fc*/));
_type = (fc == '-' ? REDIS_REPLY_ERROR : REDIS_REPLY_STATUS); _type = (fc == '-' ? REDIS_REPLY_ERROR : REDIS_REPLY_STATUS);
_length = len; _length = len;
_data.long_str = d; _data.long_str = d;
return true; return PARSE_OK;
} }
case '$': // Bulk String "$<length>\r\n<string>\r\n" case '$': // Bulk String "$<length>\r\n<string>\r\n"
case '*': // Array "*<size>\r\n<sub-reply1><sub-reply2>..." case '*': // Array "*<size>\r\n<sub-reply1><sub-reply2>..."
...@@ -90,20 +97,20 @@ bool RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* arena) { ...@@ -90,20 +97,20 @@ bool RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* arena) {
intbuf[ncopied] = '\0'; intbuf[ncopied] = '\0';
const size_t crlf_pos = butil::StringPiece(intbuf, ncopied).find("\r\n"); const size_t crlf_pos = butil::StringPiece(intbuf, ncopied).find("\r\n");
if (crlf_pos == butil::StringPiece::npos) { // not enough data if (crlf_pos == butil::StringPiece::npos) { // not enough data
return false; return PARSE_ERROR_NOT_ENOUGH_DATA;
} }
char* endptr = NULL; char* endptr = NULL;
int64_t value = strtoll(intbuf + 1/*skip fc*/, &endptr, 10); int64_t value = strtoll(intbuf + 1/*skip fc*/, &endptr, 10);
if (endptr != intbuf + crlf_pos) { if (endptr != intbuf + crlf_pos) {
LOG(ERROR) << '`' << intbuf + 1 << "' is not a valid 64-bit decimal"; LOG(ERROR) << '`' << intbuf + 1 << "' is not a valid 64-bit decimal";
return false; return PARSE_ERROR_ABSOLUTELY_WRONG;
} }
if (fc == ':') { if (fc == ':') {
buf.pop_front(crlf_pos + 2/*CRLF*/); buf.pop_front(crlf_pos + 2/*CRLF*/);
_type = REDIS_REPLY_INTEGER; _type = REDIS_REPLY_INTEGER;
_length = 0; _length = 0;
_data.integer = value; _data.integer = value;
return true; return PARSE_OK;
} else if (fc == '$') { } else if (fc == '$') {
const int64_t len = value; // `value' is length of the string const int64_t len = value; // `value' is length of the string
if (len < 0) { // redis nil if (len < 0) { // redis nil
...@@ -111,17 +118,17 @@ bool RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* arena) { ...@@ -111,17 +118,17 @@ bool RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* arena) {
_type = REDIS_REPLY_NIL; _type = REDIS_REPLY_NIL;
_length = 0; _length = 0;
_data.integer = 0; _data.integer = 0;
return true; return PARSE_OK;
} }
if (len > (int64_t)std::numeric_limits<uint32_t>::max()) { if (len > (int64_t)std::numeric_limits<uint32_t>::max()) {
LOG(ERROR) << "bulk string is too long! max length=2^32-1," LOG(ERROR) << "bulk string is too long! max length=2^32-1,"
" actually=" << len; " actually=" << len;
return false; return PARSE_ERROR_ABSOLUTELY_WRONG;
} }
// We provide c_str(), thus even if bulk string is started with // We provide c_str(), thus even if bulk string is started with
// length, we have to end it with \0. // length, we have to end it with \0.
if (buf.size() < crlf_pos + 2 + (size_t)len + 2/*CRLF*/) { if (buf.size() < crlf_pos + 2 + (size_t)len + 2/*CRLF*/) {
return false; return PARSE_ERROR_NOT_ENOUGH_DATA;
} }
if ((size_t)len < sizeof(_data.short_str)) { if ((size_t)len < sizeof(_data.short_str)) {
// SSO short strings, including empty string. // SSO short strings, including empty string.
...@@ -134,7 +141,7 @@ bool RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* arena) { ...@@ -134,7 +141,7 @@ bool RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* arena) {
char* d = (char*)arena->allocate((len/8 + 1)*8); char* d = (char*)arena->allocate((len/8 + 1)*8);
if (d == NULL) { if (d == NULL) {
LOG(FATAL) << "Fail to allocate string[" << len << "]"; LOG(FATAL) << "Fail to allocate string[" << len << "]";
return false; return PARSE_ERROR_ABSOLUTELY_WRONG;
} }
buf.pop_front(crlf_pos + 2/*CRLF*/); buf.pop_front(crlf_pos + 2/*CRLF*/);
buf.cutn(d, len); buf.cutn(d, len);
...@@ -147,8 +154,9 @@ bool RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* arena) { ...@@ -147,8 +154,9 @@ bool RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* arena) {
buf.cutn(crlf, sizeof(crlf)); buf.cutn(crlf, sizeof(crlf));
if (crlf[0] != '\r' || crlf[1] != '\n') { if (crlf[0] != '\r' || crlf[1] != '\n') {
LOG(ERROR) << "Bulk string is not ended with CRLF"; LOG(ERROR) << "Bulk string is not ended with CRLF";
return PARSE_ERROR_ABSOLUTELY_WRONG;
} }
return true; return PARSE_OK;
} else { } else {
const int64_t count = value; // `value' is count of sub replies const int64_t count = value; // `value' is count of sub replies
if (count < 0) { // redis nil if (count < 0) { // redis nil
...@@ -156,7 +164,7 @@ bool RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* arena) { ...@@ -156,7 +164,7 @@ bool RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* arena) {
_type = REDIS_REPLY_NIL; _type = REDIS_REPLY_NIL;
_length = 0; _length = 0;
_data.integer = 0; _data.integer = 0;
return true; return PARSE_OK;
} }
if (count == 0) { // empty array if (count == 0) { // empty array
buf.pop_front(crlf_pos + 2/*CRLF*/); buf.pop_front(crlf_pos + 2/*CRLF*/);
...@@ -164,18 +172,18 @@ bool RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* arena) { ...@@ -164,18 +172,18 @@ bool RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* arena) {
_length = 0; _length = 0;
_data.array.last_index = -1; _data.array.last_index = -1;
_data.array.replies = NULL; _data.array.replies = NULL;
return true; return PARSE_OK;
} }
if (count > (int64_t)std::numeric_limits<uint32_t>::max()) { if (count > (int64_t)std::numeric_limits<uint32_t>::max()) {
LOG(ERROR) << "Too many sub replies! max count=2^32-1," LOG(ERROR) << "Too many sub replies! max count=2^32-1,"
" actually=" << count; " actually=" << count;
return false; return PARSE_ERROR_ABSOLUTELY_WRONG;
} }
// FIXME(gejun): Call allocate_aligned instead. // FIXME(gejun): Call allocate_aligned instead.
RedisReply* subs = (RedisReply*)arena->allocate(sizeof(RedisReply) * count); RedisReply* subs = (RedisReply*)arena->allocate(sizeof(RedisReply) * count);
if (subs == NULL) { if (subs == NULL) {
LOG(FATAL) << "Fail to allocate RedisReply[" << count << "]"; LOG(FATAL) << "Fail to allocate RedisReply[" << count << "]";
return false; return PARSE_ERROR_ABSOLUTELY_WRONG;
} }
for (int64_t i = 0; i < count; ++i) { for (int64_t i = 0; i < count; ++i) {
new (&subs[i]) RedisReply; new (&subs[i]) RedisReply;
...@@ -185,24 +193,25 @@ bool RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* arena) { ...@@ -185,24 +193,25 @@ bool RedisReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* arena) {
_length = count; _length = count;
_data.array.replies = subs; _data.array.replies = subs;
// Resursively parse sub replies. If any of them fails, it will // Recursively parse sub replies. If any of them fails, it will
// be continued in next calls by tracking _data.array.last_index. // be continued in next calls by tracking _data.array.last_index.
_data.array.last_index = 0; _data.array.last_index = 0;
for (int64_t i = 0; i < count; ++i) { for (int64_t i = 0; i < count; ++i) {
if (!subs[i].ConsumePartialIOBuf(buf, arena)) { ParseError err = subs[i].ConsumePartialIOBuf(buf, arena);
return false; if (err != PARSE_OK) {
return err;
} }
++_data.array.last_index; ++_data.array.last_index;
} }
_data.array.last_index = -1; _data.array.last_index = -1;
return true; return PARSE_OK;
} }
} }
default: default:
LOG(ERROR) << "Invalid first character=" << (int)fc; LOG(ERROR) << "Invalid first character=" << (int)fc;
return false; return PARSE_ERROR_ABSOLUTELY_WRONG;
} }
return false; return PARSE_ERROR_ABSOLUTELY_WRONG;
} }
class RedisStringPrinter { class RedisStringPrinter {
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include "butil/strings/string_piece.h" // butil::StringPiece #include "butil/strings/string_piece.h" // butil::StringPiece
#include "butil/arena.h" // butil::Arena #include "butil/arena.h" // butil::Arena
#include "butil/logging.h" // CHECK #include "butil/logging.h" // CHECK
#include "parse_result.h" // ParseError
namespace brpc { namespace brpc {
...@@ -79,14 +80,16 @@ public: ...@@ -79,14 +80,16 @@ public:
// Parse from `buf' which may be incomplete and allocate needed memory // Parse from `buf' which may be incomplete and allocate needed memory
// on `arena'. // on `arena'.
// Returns true when an intact reply is parsed and cut off from `buf', // Returns PARSE_OK when an intact reply is parsed and cut off from `buf'.
// false otherwise and `buf' is guaranteed to be UNCHANGED so that you // Returns PARSE_ERROR_NOT_ENOUGH_DATA if data in `buf' is not enough to parse,
// can call this function on a RedisReply object with the same buf again // and `buf' is guaranteed to be UNCHANGED so that you can call this
// and again until the function returns true. This property makes sure // function on a RedisReply object with the same buf again and again until
// the parsing of RedisReply in the worst case is O(N) where N is size // the function returns PARSE_OK. This property makes sure the parsing of
// of the on-wire reply. As a contrast, if the parsing needs `buf' to be // RedisReply in the worst case is O(N) where N is size of the on-wire
// intact, the complexity in worst case may be O(N^2). // reply. As a contrast, if the parsing needs `buf' to be intact,
bool ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* arena); // the complexity in worst case may be O(N^2).
// Returns PARSE_ERROR_ABSOLUTELY_WRONG if the parsing failed.
ParseError ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* arena);
// Swap internal fields with another reply. // Swap internal fields with another reply.
void Swap(RedisReply& other); void Swap(RedisReply& other);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment