Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
B
brpc
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
brpc
Commits
1b342f75
Commit
1b342f75
authored
Nov 29, 2019
by
zhujiashun
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
redis_server_protocol: rename RedisMessage back to RedisReply
parent
64d0bcea
Show whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
204 additions
and
204 deletions
+204
-204
redis_protocol.cpp
src/brpc/policy/redis_protocol.cpp
+5
-5
redis.cpp
src/brpc/redis.cpp
+7
-7
redis.h
src/brpc/redis.h
+8
-8
redis_reply.cpp
src/brpc/redis_reply.cpp
+50
-50
redis_reply.h
src/brpc/redis_reply.h
+73
-73
brpc_redis_unittest.cpp
test/brpc_redis_unittest.cpp
+61
-61
No files found.
src/brpc/policy/redis_protocol.cpp
View file @
1b342f75
...
...
@@ -58,7 +58,7 @@ struct InputResponse : public InputMessageBase {
}
};
const
char
**
ParseArgs
(
const
Redis
Message
&
message
)
{
const
char
**
ParseArgs
(
const
Redis
Reply
&
message
)
{
const
char
**
args
=
(
const
char
**
)
malloc
(
sizeof
(
const
char
*
)
*
(
message
.
size
()
+
1
/* NULL */
));
for
(
size_t
i
=
0
;
i
<
message
.
size
();
++
i
)
{
...
...
@@ -122,8 +122,8 @@ private:
butil
::
atomic
<
bool
>
_ready
;
public
:
Redis
Message
input_message
;
Redis
Message
output_message
;
Redis
Reply
input_message
;
Redis
Reply
output_message
;
RedisConnContext
*
ctx
;
butil
::
IOBuf
sendbuf
;
butil
::
Arena
arena
;
...
...
@@ -133,7 +133,7 @@ int ConsumeTask(RedisConnContext* ctx, ConsumeTaskDone* done) {
ClosureGuard
done_guard
(
done
);
done
->
ctx
=
ctx
;
ctx
->
Push
(
done
);
Redis
Message
&
output
=
done
->
output_message
;
Redis
Reply
&
output
=
done
->
output_message
;
const
char
**
args
=
ParseArgs
(
done
->
input_message
);
if
(
!
args
)
{
...
...
@@ -339,7 +339,7 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
if
(
pi
.
with_auth
)
{
if
(
msg
->
response
.
reply_size
()
!=
1
||
!
(
msg
->
response
.
reply
(
0
).
type
()
==
brpc
::
REDIS_
MESSAGE
_STATUS
&&
!
(
msg
->
response
.
reply
(
0
).
type
()
==
brpc
::
REDIS_
REPLY
_STATUS
&&
msg
->
response
.
reply
(
0
).
data
().
compare
(
"OK"
)
==
0
))
{
LOG
(
ERROR
)
<<
"Redis Auth failed: "
<<
msg
->
response
;
return
MakeParseError
(
PARSE_ERROR_NO_RESOURCE
,
...
...
src/brpc/redis.cpp
View file @
1b342f75
...
...
@@ -322,10 +322,10 @@ void RedisResponse::MergeFrom(const RedisResponse& from) {
_nreply
=
new_nreply
;
return
;
}
Redis
Message
*
new_others
=
(
Redis
Message
*
)
_arena
.
allocate
(
sizeof
(
RedisMessage
)
*
(
new_nreply
-
1
));
Redis
Reply
*
new_others
=
(
Redis
Reply
*
)
_arena
.
allocate
(
sizeof
(
RedisReply
)
*
(
new_nreply
-
1
));
for
(
int
i
=
0
;
i
<
new_nreply
-
1
;
++
i
)
{
new
(
new_others
+
i
)
Redis
Message
(
NULL
);
new
(
new_others
+
i
)
Redis
Reply
(
NULL
);
}
int
new_other_index
=
0
;
for
(
int
i
=
1
;
i
<
_nreply
;
++
i
)
{
...
...
@@ -394,14 +394,14 @@ ParseError RedisResponse::ConsumePartialIOBuf(butil::IOBuf& buf, int reply_count
}
if
(
reply_count
>
1
)
{
if
(
_other_replies
==
NULL
)
{
_other_replies
=
(
Redis
Message
*
)
_arena
.
allocate
(
sizeof
(
Redis
Message
)
*
(
reply_count
-
1
));
_other_replies
=
(
Redis
Reply
*
)
_arena
.
allocate
(
sizeof
(
Redis
Reply
)
*
(
reply_count
-
1
));
if
(
_other_replies
==
NULL
)
{
LOG
(
ERROR
)
<<
"Fail to allocate Redis
Message
["
<<
reply_count
-
1
<<
"]"
;
LOG
(
ERROR
)
<<
"Fail to allocate Redis
Reply
["
<<
reply_count
-
1
<<
"]"
;
return
PARSE_ERROR_ABSOLUTELY_WRONG
;
}
for
(
int
i
=
0
;
i
<
reply_count
-
1
;
++
i
)
{
new
(
&
_other_replies
[
i
])
Redis
Message
(
NULL
);
new
(
&
_other_replies
[
i
])
Redis
Reply
(
NULL
);
}
}
for
(
int
i
=
reply_size
();
i
<
reply_count
;
++
i
)
{
...
...
src/brpc/redis.h
View file @
1b342f75
...
...
@@ -27,7 +27,7 @@
#include "butil/strings/string_piece.h"
#include "butil/arena.h"
#include "brpc/proto_base.pb.h"
#include "brpc/redis_
message
.h"
#include "brpc/redis_
reply
.h"
#include "brpc/parse_result.h"
#include "brpc/callback.h"
#include "brpc/socket.h"
...
...
@@ -161,11 +161,11 @@ public:
int
reply_size
()
const
{
return
_nreply
;
}
// Get index-th reply. If index is out-of-bound, nil reply is returned.
const
Redis
Message
&
reply
(
int
index
)
const
{
const
Redis
Reply
&
reply
(
int
index
)
const
{
if
(
index
<
reply_size
())
{
return
(
index
==
0
?
_first_reply
:
_other_replies
[
index
-
1
]);
}
static
Redis
Message
redis_nil
;
static
Redis
Reply
redis_nil
;
return
redis_nil
;
}
...
...
@@ -203,8 +203,8 @@ private:
void
SharedDtor
();
void
SetCachedSize
(
int
size
)
const
;
Redis
Message
_first_reply
;
Redis
Message
*
_other_replies
;
Redis
Reply
_first_reply
;
Redis
Reply
*
_other_replies
;
butil
::
Arena
_arena
;
int
_nreply
;
mutable
int
_cached_size_
;
...
...
@@ -226,7 +226,7 @@ public:
private
:
typedef
std
::
unordered_map
<
std
::
string
,
std
::
shared_ptr
<
RedisCommandHandler
>>
CommandMap
;
friend
ParseResult
ParseRedis
Message
(
butil
::
IOBuf
*
,
Socket
*
,
bool
,
const
void
*
);
friend
ParseResult
ParseRedis
Reply
(
butil
::
IOBuf
*
,
Socket
*
,
bool
,
const
void
*
);
void
CloneCommandMap
(
CommandMap
*
map
);
CommandMap
_command_map
;
};
...
...
@@ -247,7 +247,7 @@ public:
// command "set foo bar" corresponds to args[0] == "set", args[1] == "foo",
// args[2] == "bar" and args[3] == nullptr.
// `output`, which should be filled by user, is the content that sent to client side.
// Read brpc/src/redis_
message
.h for more usage.
// Read brpc/src/redis_
reply
.h for more usage.
// Remember to call `done->Run()` when everything is set up into `output`. The return
// value should be RedisCommandHandler::OK for normal cases. If you want to implement
// transaction, return RedisCommandHandler::CONTINUE until server receives an ending
...
...
@@ -259,7 +259,7 @@ public:
// marker that ends the transaction. User may queue the commands and execute them
// all once an ending marker is received.
virtual
RedisCommandHandler
::
Result
Run
(
const
char
*
args
[],
Redis
Message
*
output
,
Redis
Reply
*
output
,
google
::
protobuf
::
Closure
*
done
)
=
0
;
// Whenever a tcp connection is established, a bunch of new handlers would be created
...
...
src/brpc/redis_
message
.cpp
→
src/brpc/redis_
reply
.cpp
View file @
1b342f75
...
...
@@ -19,32 +19,32 @@
#include <limits>
#include "butil/logging.h"
#include "brpc/redis_
message
.h"
#include "brpc/redis_
reply
.h"
namespace
brpc
{
//BAIDU_CASSERT(sizeof(Redis
Message
) == 24, size_match);
const
uint32_t
Redis
Message
::
npos
=
(
uint32_t
)
-
1
;
//BAIDU_CASSERT(sizeof(Redis
Reply
) == 24, size_match);
const
uint32_t
Redis
Reply
::
npos
=
(
uint32_t
)
-
1
;
const
char
*
Redis
MessageTypeToString
(
RedisMessage
Type
type
)
{
const
char
*
Redis
ReplyTypeToString
(
RedisReply
Type
type
)
{
switch
(
type
)
{
case
REDIS_
MESSAGE
_STRING
:
return
"string"
;
case
REDIS_
MESSAGE
_ARRAY
:
return
"array"
;
case
REDIS_
MESSAGE
_INTEGER
:
return
"integer"
;
case
REDIS_
MESSAGE
_NIL
:
return
"nil"
;
case
REDIS_
MESSAGE
_STATUS
:
return
"status"
;
case
REDIS_
MESSAGE
_ERROR
:
return
"error"
;
case
REDIS_
REPLY
_STRING
:
return
"string"
;
case
REDIS_
REPLY
_ARRAY
:
return
"array"
;
case
REDIS_
REPLY
_INTEGER
:
return
"integer"
;
case
REDIS_
REPLY
_NIL
:
return
"nil"
;
case
REDIS_
REPLY
_STATUS
:
return
"status"
;
case
REDIS_
REPLY
_ERROR
:
return
"error"
;
default:
return
"unknown redis type"
;
}
}
bool
Redis
Message
::
SerializeToIOBuf
(
butil
::
IOBuf
*
buf
)
{
bool
Redis
Reply
::
SerializeToIOBuf
(
butil
::
IOBuf
*
buf
)
{
butil
::
IOBufBuilder
builder
;
switch
(
_type
)
{
case
REDIS_
MESSAGE
_ERROR
:
case
REDIS_
REPLY
_ERROR
:
// fall through
case
REDIS_
MESSAGE
_STATUS
:
buf
->
push_back
((
_type
==
REDIS_
MESSAGE
_ERROR
)
?
'-'
:
'+'
);
case
REDIS_
REPLY
_STATUS
:
buf
->
push_back
((
_type
==
REDIS_
REPLY
_ERROR
)
?
'-'
:
'+'
);
if
(
_length
<
sizeof
(
_data
.
short_str
))
{
buf
->
append
(
_data
.
short_str
,
_length
);
}
else
{
...
...
@@ -52,11 +52,11 @@ bool RedisMessage::SerializeToIOBuf(butil::IOBuf* buf) {
}
buf
->
append
(
"
\r\n
"
);
break
;
case
REDIS_
MESSAGE
_INTEGER
:
case
REDIS_
REPLY
_INTEGER
:
builder
<<
':'
<<
_data
.
integer
<<
"
\r\n
"
;
buf
->
append
(
builder
.
buf
());
break
;
case
REDIS_
MESSAGE
_STRING
:
case
REDIS_
REPLY
_STRING
:
// Since _length is unsigned, we have to int casting _length to
// represent nil string
builder
<<
'$'
<<
(
int
)
_length
<<
"
\r\n
"
;
...
...
@@ -71,7 +71,7 @@ bool RedisMessage::SerializeToIOBuf(butil::IOBuf* buf) {
}
buf
->
append
(
"
\r\n
"
);
break
;
case
REDIS_
MESSAGE
_ARRAY
:
case
REDIS_
REPLY
_ARRAY
:
builder
<<
'*'
<<
(
int
)
_length
<<
"
\r\n
"
;
buf
->
append
(
builder
.
buf
());
if
(
_length
==
npos
)
{
...
...
@@ -83,7 +83,7 @@ bool RedisMessage::SerializeToIOBuf(butil::IOBuf* buf) {
}
}
break
;
case
REDIS_
MESSAGE
_NIL
:
case
REDIS_
REPLY
_NIL
:
buf
->
append
(
"$-1
\r\n
"
);
break
;
default
:
...
...
@@ -93,11 +93,11 @@ bool RedisMessage::SerializeToIOBuf(butil::IOBuf* buf) {
return
true
;
}
ParseError
Redis
Message
::
ConsumePartialIOBuf
(
butil
::
IOBuf
&
buf
,
butil
::
Arena
*
arena
)
{
if
(
_type
==
REDIS_
MESSAGE
_ARRAY
&&
_data
.
array
.
last_index
>=
0
)
{
ParseError
Redis
Reply
::
ConsumePartialIOBuf
(
butil
::
IOBuf
&
buf
,
butil
::
Arena
*
arena
)
{
if
(
_type
==
REDIS_
REPLY
_ARRAY
&&
_data
.
array
.
last_index
>=
0
)
{
// The parsing was suspended while parsing sub replies,
// continue the parsing.
Redis
Message
*
subs
=
(
RedisMessage
*
)
_data
.
array
.
replies
;
Redis
Reply
*
subs
=
(
RedisReply
*
)
_data
.
array
.
replies
;
for
(
uint32_t
i
=
_data
.
array
.
last_index
;
i
<
_length
;
++
i
)
{
ParseError
err
=
subs
[
i
].
ConsumePartialIOBuf
(
buf
,
arena
);
if
(
err
!=
PARSE_OK
)
{
...
...
@@ -132,7 +132,7 @@ ParseError RedisMessage::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* ar
const
size_t
len
=
str
.
size
()
-
1
;
if
(
len
<
sizeof
(
_data
.
short_str
))
{
// SSO short strings, including empty string.
_type
=
(
fc
==
'-'
?
REDIS_
MESSAGE_ERROR
:
REDIS_MESSAGE
_STATUS
);
_type
=
(
fc
==
'-'
?
REDIS_
REPLY_ERROR
:
REDIS_REPLY
_STATUS
);
_length
=
len
;
str
.
copy_to_cstr
(
_data
.
short_str
,
(
size_t
)
-
1L
,
1
/*skip fc*/
);
return
PARSE_OK
;
...
...
@@ -143,7 +143,7 @@ ParseError RedisMessage::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* ar
return
PARSE_ERROR_ABSOLUTELY_WRONG
;
}
CHECK_EQ
(
len
,
str
.
copy_to_cstr
(
d
,
(
size_t
)
-
1L
,
1
/*skip fc*/
));
_type
=
(
fc
==
'-'
?
REDIS_
MESSAGE_ERROR
:
REDIS_MESSAGE
_STATUS
);
_type
=
(
fc
==
'-'
?
REDIS_
REPLY_ERROR
:
REDIS_REPLY
_STATUS
);
_length
=
len
;
_data
.
long_str
=
d
;
return
PARSE_OK
;
...
...
@@ -166,7 +166,7 @@ ParseError RedisMessage::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* ar
}
if
(
fc
==
':'
)
{
buf
.
pop_front
(
crlf_pos
+
2
/*CRLF*/
);
_type
=
REDIS_
MESSAGE
_INTEGER
;
_type
=
REDIS_
REPLY
_INTEGER
;
_length
=
0
;
_data
.
integer
=
value
;
return
PARSE_OK
;
...
...
@@ -174,7 +174,7 @@ ParseError RedisMessage::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* ar
const
int64_t
len
=
value
;
// `value' is length of the string
if
(
len
<
0
)
{
// redis nil
buf
.
pop_front
(
crlf_pos
+
2
/*CRLF*/
);
_type
=
REDIS_
MESSAGE
_NIL
;
_type
=
REDIS_
REPLY
_NIL
;
_length
=
0
;
_data
.
integer
=
0
;
return
PARSE_OK
;
...
...
@@ -191,7 +191,7 @@ ParseError RedisMessage::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* ar
}
if
((
size_t
)
len
<
sizeof
(
_data
.
short_str
))
{
// SSO short strings, including empty string.
_type
=
REDIS_
MESSAGE
_STRING
;
_type
=
REDIS_
REPLY
_STRING
;
_length
=
len
;
buf
.
pop_front
(
crlf_pos
+
2
);
buf
.
cutn
(
_data
.
short_str
,
len
);
...
...
@@ -205,7 +205,7 @@ ParseError RedisMessage::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* ar
buf
.
pop_front
(
crlf_pos
+
2
/*CRLF*/
);
buf
.
cutn
(
d
,
len
);
d
[
len
]
=
'\0'
;
_type
=
REDIS_
MESSAGE
_STRING
;
_type
=
REDIS_
REPLY
_STRING
;
_length
=
len
;
_data
.
long_str
=
d
;
}
...
...
@@ -220,14 +220,14 @@ ParseError RedisMessage::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* ar
const
int64_t
count
=
value
;
// `value' is count of sub replies
if
(
count
<
0
)
{
// redis nil
buf
.
pop_front
(
crlf_pos
+
2
/*CRLF*/
);
_type
=
REDIS_
MESSAGE
_NIL
;
_type
=
REDIS_
REPLY
_NIL
;
_length
=
0
;
_data
.
integer
=
0
;
return
PARSE_OK
;
}
if
(
count
==
0
)
{
// empty array
buf
.
pop_front
(
crlf_pos
+
2
/*CRLF*/
);
_type
=
REDIS_
MESSAGE
_ARRAY
;
_type
=
REDIS_
REPLY
_ARRAY
;
_length
=
0
;
_data
.
array
.
last_index
=
-
1
;
_data
.
array
.
replies
=
NULL
;
...
...
@@ -239,16 +239,16 @@ ParseError RedisMessage::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* ar
return
PARSE_ERROR_ABSOLUTELY_WRONG
;
}
// FIXME(gejun): Call allocate_aligned instead.
Redis
Message
*
subs
=
(
RedisMessage
*
)
arena
->
allocate
(
sizeof
(
RedisMessage
)
*
count
);
Redis
Reply
*
subs
=
(
RedisReply
*
)
arena
->
allocate
(
sizeof
(
RedisReply
)
*
count
);
if
(
subs
==
NULL
)
{
LOG
(
FATAL
)
<<
"Fail to allocate Redis
Message
["
<<
count
<<
"]"
;
LOG
(
FATAL
)
<<
"Fail to allocate Redis
Reply
["
<<
count
<<
"]"
;
return
PARSE_ERROR_ABSOLUTELY_WRONG
;
}
for
(
int64_t
i
=
0
;
i
<
count
;
++
i
)
{
new
(
&
subs
[
i
])
Redis
Message
(
NULL
);
new
(
&
subs
[
i
])
Redis
Reply
(
NULL
);
}
buf
.
pop_front
(
crlf_pos
+
2
/*CRLF*/
);
_type
=
REDIS_
MESSAGE
_ARRAY
;
_type
=
REDIS_
REPLY
_ARRAY
;
_length
=
count
;
_data
.
array
.
replies
=
subs
;
...
...
@@ -317,9 +317,9 @@ void RedisStringPrinter::Print(std::ostream& os) const {
}
// Mimic how official redis-cli prints.
void
Redis
Message
::
Print
(
std
::
ostream
&
os
)
const
{
void
Redis
Reply
::
Print
(
std
::
ostream
&
os
)
const
{
switch
(
_type
)
{
case
REDIS_
MESSAGE
_STRING
:
case
REDIS_
REPLY
_STRING
:
os
<<
'"'
;
if
(
_length
<
sizeof
(
_data
.
short_str
))
{
os
<<
RedisStringPrinter
(
_data
.
short_str
,
_length
);
...
...
@@ -328,7 +328,7 @@ void RedisMessage::Print(std::ostream& os) const {
}
os
<<
'"'
;
break
;
case
REDIS_
MESSAGE
_ARRAY
:
case
REDIS_
REPLY
_ARRAY
:
os
<<
'['
;
for
(
uint32_t
i
=
0
;
i
<
_length
;
++
i
)
{
if
(
i
!=
0
)
{
...
...
@@ -338,16 +338,16 @@ void RedisMessage::Print(std::ostream& os) const {
}
os
<<
']'
;
break
;
case
REDIS_
MESSAGE
_INTEGER
:
case
REDIS_
REPLY
_INTEGER
:
os
<<
"(integer) "
<<
_data
.
integer
;
break
;
case
REDIS_
MESSAGE
_NIL
:
case
REDIS_
REPLY
_NIL
:
os
<<
"(nil)"
;
break
;
case
REDIS_
MESSAGE
_ERROR
:
case
REDIS_
REPLY
_ERROR
:
os
<<
"(error) "
;
// fall through
case
REDIS_
MESSAGE
_STATUS
:
case
REDIS_
REPLY
_STATUS
:
if
(
_length
<
sizeof
(
_data
.
short_str
))
{
os
<<
RedisStringPrinter
(
_data
.
short_str
,
_length
);
}
else
{
...
...
@@ -360,19 +360,19 @@ void RedisMessage::Print(std::ostream& os) const {
}
}
void
Redis
Message
::
CopyFromDifferentArena
(
const
RedisMessage
&
other
,
void
Redis
Reply
::
CopyFromDifferentArena
(
const
RedisReply
&
other
,
butil
::
Arena
*
arena
)
{
_type
=
other
.
_type
;
_length
=
other
.
_length
;
switch
(
_type
)
{
case
REDIS_
MESSAGE
_ARRAY
:
{
Redis
Message
*
subs
=
(
RedisMessage
*
)
arena
->
allocate
(
sizeof
(
RedisMessage
)
*
_length
);
case
REDIS_
REPLY
_ARRAY
:
{
Redis
Reply
*
subs
=
(
RedisReply
*
)
arena
->
allocate
(
sizeof
(
RedisReply
)
*
_length
);
if
(
subs
==
NULL
)
{
LOG
(
FATAL
)
<<
"Fail to allocate Redis
Message
["
<<
_length
<<
"]"
;
LOG
(
FATAL
)
<<
"Fail to allocate Redis
Reply
["
<<
_length
<<
"]"
;
return
;
}
for
(
uint32_t
i
=
0
;
i
<
_length
;
++
i
)
{
new
(
&
subs
[
i
])
Redis
Message
;
new
(
&
subs
[
i
])
Redis
Reply
;
}
_data
.
array
.
last_index
=
other
.
_data
.
array
.
last_index
;
if
(
_data
.
array
.
last_index
>
0
)
{
...
...
@@ -388,16 +388,16 @@ void RedisMessage::CopyFromDifferentArena(const RedisMessage& other,
_data
.
array
.
replies
=
subs
;
}
break
;
case
REDIS_
MESSAGE
_INTEGER
:
case
REDIS_
REPLY
_INTEGER
:
_data
.
integer
=
other
.
_data
.
integer
;
break
;
case
REDIS_
MESSAGE
_NIL
:
case
REDIS_
REPLY
_NIL
:
break
;
case
REDIS_
MESSAGE
_STRING
:
case
REDIS_
REPLY
_STRING
:
// fall through
case
REDIS_
MESSAGE
_ERROR
:
case
REDIS_
REPLY
_ERROR
:
// fall through
case
REDIS_
MESSAGE
_STATUS
:
case
REDIS_
REPLY
_STATUS
:
if
(
_length
<
sizeof
(
_data
.
short_str
))
{
memcpy
(
_data
.
short_str
,
other
.
_data
.
short_str
,
_length
+
1
);
}
else
{
...
...
src/brpc/redis_
message
.h
→
src/brpc/redis_
reply
.h
View file @
1b342f75
...
...
@@ -17,8 +17,8 @@
// Authors: Ge,Jun (gejun@baidu.com)
#ifndef BRPC_REDIS_
MESSAGE
_H
#define BRPC_REDIS_
MESSAGE
_H
#ifndef BRPC_REDIS_
REPLY
_H
#define BRPC_REDIS_
REPLY
_H
#include "butil/iobuf.h" // butil::IOBuf
#include "butil/strings/string_piece.h" // butil::StringPiece
...
...
@@ -30,28 +30,28 @@
namespace
brpc
{
// Different types of replies.
enum
Redis
Message
Type
{
REDIS_
MESSAGE
_STRING
=
1
,
// Bulk String
REDIS_
MESSAGE
_ARRAY
=
2
,
REDIS_
MESSAGE
_INTEGER
=
3
,
REDIS_
MESSAGE
_NIL
=
4
,
REDIS_
MESSAGE
_STATUS
=
5
,
// Simple String
REDIS_
MESSAGE
_ERROR
=
6
enum
Redis
Reply
Type
{
REDIS_
REPLY
_STRING
=
1
,
// Bulk String
REDIS_
REPLY
_ARRAY
=
2
,
REDIS_
REPLY
_INTEGER
=
3
,
REDIS_
REPLY
_NIL
=
4
,
REDIS_
REPLY
_STATUS
=
5
,
// Simple String
REDIS_
REPLY
_ERROR
=
6
};
const
char
*
Redis
MessageTypeToString
(
RedisMessage
Type
);
const
char
*
Redis
ReplyTypeToString
(
RedisReply
Type
);
// A reply from redis-server.
class
Redis
Message
{
class
Redis
Reply
{
public
:
// A default reply is a nil.
Redis
Message
();
Redis
Reply
();
// All SetXXX Method would allocate memory from *arena.
Redis
Message
(
butil
::
Arena
*
arena
);
Redis
Reply
(
butil
::
Arena
*
arena
);
// Type of the reply.
Redis
Message
Type
type
()
const
{
return
_type
;
}
Redis
Reply
Type
type
()
const
{
return
_type
;
}
bool
is_nil
()
const
;
// True if the reply is a (redis) nil.
bool
is_integer
()
const
;
// True if the reply is an integer.
...
...
@@ -89,17 +89,17 @@ public:
size_t
size
()
const
;
// Get the index-th sub reply. If this reply is not an array, a nil reply
// is returned (call stacks are not logged)
const
Redis
Message
&
operator
[](
size_t
index
)
const
;
Redis
Message
&
operator
[](
size_t
index
);
const
Redis
Reply
&
operator
[](
size_t
index
)
const
;
Redis
Reply
&
operator
[](
size_t
index
);
// Parse from `buf' which may be incomplete and allocate needed memory
// on `arena'.
// Returns PARSE_OK when an intact reply is parsed and cut off from `buf'.
// Returns PARSE_ERROR_NOT_ENOUGH_DATA if data in `buf' is not enough to parse,
// and `buf' is guaranteed to be UNCHANGED so that you can call this
// function on a Redis
Message
object with the same buf again and again until
// function on a Redis
Reply
object with the same buf again and again until
// the function returns PARSE_OK. This property makes sure the parsing of
// Redis
Message
in the worst case is O(N) where N is size of the on-wire
// Redis
Reply
in the worst case is O(N) where N is size of the on-wire
// reply. As a contrast, if the parsing needs `buf' to be intact,
// the complexity in worst case may be O(N^2).
// Returns PARSE_ERROR_ABSOLUTELY_WRONG if the parsing failed.
...
...
@@ -109,7 +109,7 @@ public:
bool
SerializeToIOBuf
(
butil
::
IOBuf
*
buf
);
// Swap internal fields with another reply.
void
Swap
(
Redis
Message
&
other
);
void
Swap
(
Redis
Reply
&
other
);
// Reset to the state that this reply was just constructed.
void
Clear
();
...
...
@@ -119,22 +119,22 @@ public:
// Copy from another reply allocating on a different Arena, and allocate
// required memory with `self_arena'.
void
CopyFromDifferentArena
(
const
Redis
Message
&
other
,
void
CopyFromDifferentArena
(
const
Redis
Reply
&
other
,
butil
::
Arena
*
self_arena
);
// Copy from another reply allocating on a same Arena.
void
CopyFromSameArena
(
const
Redis
Message
&
other
);
void
CopyFromSameArena
(
const
Redis
Reply
&
other
);
private
:
static
const
uint32_t
npos
;
// Redis
Message
does not own the memory of fields, copying must be done
// Redis
Reply
does not own the memory of fields, copying must be done
// by calling CopyFrom[Different|Same]Arena.
DISALLOW_COPY_AND_ASSIGN
(
Redis
Message
);
DISALLOW_COPY_AND_ASSIGN
(
Redis
Reply
);
bool
SetBasicString
(
const
std
::
string
&
str
,
Redis
Message
Type
type
);
bool
SetBasicString
(
const
std
::
string
&
str
,
Redis
Reply
Type
type
);
Redis
Message
Type
_type
;
Redis
Reply
Type
_type
;
uint32_t
_length
;
// length of short_str/long_str, count of replies
union
{
int64_t
integer
;
...
...
@@ -142,7 +142,7 @@ private:
const
char
*
long_str
;
struct
{
int32_t
last_index
;
// >= 0 if previous parsing suspends on replies.
Redis
Message
*
replies
;
Redis
Reply
*
replies
;
}
array
;
uint64_t
padding
[
2
];
// For swapping, must cover all bytes.
}
_data
;
...
...
@@ -151,56 +151,56 @@ private:
// =========== inline impl. ==============
inline
std
::
ostream
&
operator
<<
(
std
::
ostream
&
os
,
const
Redis
Message
&
r
)
{
inline
std
::
ostream
&
operator
<<
(
std
::
ostream
&
os
,
const
Redis
Reply
&
r
)
{
r
.
Print
(
os
);
return
os
;
}
inline
Redis
Message
::
RedisMessage
(
butil
::
Arena
*
arena
)
:
Redis
Message
()
{
inline
Redis
Reply
::
RedisReply
(
butil
::
Arena
*
arena
)
:
Redis
Reply
()
{
_arena
=
arena
;
}
inline
Redis
Message
::
RedisMessage
()
:
_type
(
REDIS_
MESSAGE
_NIL
)
inline
Redis
Reply
::
RedisReply
()
:
_type
(
REDIS_
REPLY
_NIL
)
,
_length
(
0
)
,
_arena
(
NULL
)
{
_data
.
array
.
last_index
=
-
1
;
_data
.
array
.
replies
=
NULL
;
}
inline
bool
Redis
Message
::
is_nil
()
const
{
return
(
_type
==
REDIS_
MESSAGE
_NIL
)
||
((
_type
==
REDIS_
MESSAGE_STRING
||
_type
==
REDIS_MESSAGE
_ARRAY
)
&&
inline
bool
Redis
Reply
::
is_nil
()
const
{
return
(
_type
==
REDIS_
REPLY
_NIL
)
||
((
_type
==
REDIS_
REPLY_STRING
||
_type
==
REDIS_REPLY
_ARRAY
)
&&
_length
==
npos
);
}
inline
bool
Redis
Message
::
is_error
()
const
{
return
_type
==
REDIS_MESSAGE
_ERROR
;
}
inline
bool
Redis
Message
::
is_integer
()
const
{
return
_type
==
REDIS_MESSAGE
_INTEGER
;
}
inline
bool
Redis
Message
::
is_string
()
const
{
return
_type
==
REDIS_
MESSAGE_STRING
||
_type
==
REDIS_MESSAGE
_STATUS
;
}
inline
bool
Redis
Message
::
is_array
()
const
{
return
_type
==
REDIS_MESSAGE
_ARRAY
;
}
inline
bool
Redis
Reply
::
is_error
()
const
{
return
_type
==
REDIS_REPLY
_ERROR
;
}
inline
bool
Redis
Reply
::
is_integer
()
const
{
return
_type
==
REDIS_REPLY
_INTEGER
;
}
inline
bool
Redis
Reply
::
is_string
()
const
{
return
_type
==
REDIS_
REPLY_STRING
||
_type
==
REDIS_REPLY
_STATUS
;
}
inline
bool
Redis
Reply
::
is_array
()
const
{
return
_type
==
REDIS_REPLY
_ARRAY
;
}
inline
int64_t
Redis
Message
::
integer
()
const
{
inline
int64_t
Redis
Reply
::
integer
()
const
{
if
(
is_integer
())
{
return
_data
.
integer
;
}
CHECK
(
false
)
<<
"The reply is "
<<
Redis
Message
TypeToString
(
_type
)
CHECK
(
false
)
<<
"The reply is "
<<
Redis
Reply
TypeToString
(
_type
)
<<
", not an integer"
;
return
0
;
}
inline
bool
Redis
Message
::
SetNilString
()
{
inline
bool
Redis
Reply
::
SetNilString
()
{
if
(
!
_arena
)
return
false
;
_type
=
REDIS_
MESSAGE
_STRING
;
_type
=
REDIS_
REPLY
_STRING
;
_length
=
npos
;
return
true
;
}
inline
bool
Redis
Message
::
SetArray
(
int
size
)
{
inline
bool
Redis
Reply
::
SetArray
(
int
size
)
{
if
(
!
_arena
)
{
return
false
;
}
_type
=
REDIS_
MESSAGE
_ARRAY
;
_type
=
REDIS_
REPLY
_ARRAY
;
if
(
size
<
0
)
{
_length
=
npos
;
return
true
;
...
...
@@ -208,20 +208,20 @@ inline bool RedisMessage::SetArray(int size) {
_length
=
0
;
return
true
;
}
Redis
Message
*
subs
=
(
RedisMessage
*
)
_arena
->
allocate
(
sizeof
(
RedisMessage
)
*
size
);
Redis
Reply
*
subs
=
(
RedisReply
*
)
_arena
->
allocate
(
sizeof
(
RedisReply
)
*
size
);
if
(
!
subs
)
{
LOG
(
FATAL
)
<<
"Fail to allocate Redis
Message
["
<<
size
<<
"]"
;
LOG
(
FATAL
)
<<
"Fail to allocate Redis
Reply
["
<<
size
<<
"]"
;
return
false
;
}
for
(
int
i
=
0
;
i
<
size
;
++
i
)
{
new
(
&
subs
[
i
])
Redis
Message
(
_arena
);
new
(
&
subs
[
i
])
Redis
Reply
(
_arena
);
}
_length
=
size
;
_data
.
array
.
replies
=
subs
;
return
true
;
}
inline
bool
Redis
Message
::
SetBasicString
(
const
std
::
string
&
str
,
RedisMessage
Type
type
)
{
inline
bool
Redis
Reply
::
SetBasicString
(
const
std
::
string
&
str
,
RedisReply
Type
type
)
{
if
(
!
_arena
)
{
return
false
;
}
...
...
@@ -244,26 +244,26 @@ inline bool RedisMessage::SetBasicString(const std::string& str, RedisMessageTyp
return
true
;
}
inline
bool
Redis
Message
::
SetStatus
(
const
std
::
string
&
str
)
{
return
SetBasicString
(
str
,
REDIS_
MESSAGE
_STATUS
);
inline
bool
Redis
Reply
::
SetStatus
(
const
std
::
string
&
str
)
{
return
SetBasicString
(
str
,
REDIS_
REPLY
_STATUS
);
}
inline
bool
Redis
Message
::
SetError
(
const
std
::
string
&
str
)
{
return
SetBasicString
(
str
,
REDIS_
MESSAGE
_ERROR
);
inline
bool
Redis
Reply
::
SetError
(
const
std
::
string
&
str
)
{
return
SetBasicString
(
str
,
REDIS_
REPLY
_ERROR
);
}
inline
bool
Redis
Message
::
SetInteger
(
int64_t
value
)
{
_type
=
REDIS_
MESSAGE
_INTEGER
;
inline
bool
Redis
Reply
::
SetInteger
(
int64_t
value
)
{
_type
=
REDIS_
REPLY
_INTEGER
;
_length
=
0
;
_data
.
integer
=
value
;
return
true
;
}
inline
bool
Redis
Message
::
SetBulkString
(
const
std
::
string
&
str
)
{
return
SetBasicString
(
str
,
REDIS_
MESSAGE
_STRING
);
inline
bool
Redis
Reply
::
SetBulkString
(
const
std
::
string
&
str
)
{
return
SetBasicString
(
str
,
REDIS_
REPLY
_STRING
);
}
inline
const
char
*
Redis
Message
::
c_str
()
const
{
inline
const
char
*
Redis
Reply
::
c_str
()
const
{
if
(
is_string
())
{
if
(
_length
<
sizeof
(
_data
.
short_str
))
{
// SSO
return
_data
.
short_str
;
...
...
@@ -271,12 +271,12 @@ inline const char* RedisMessage::c_str() const {
return
_data
.
long_str
;
}
}
CHECK
(
false
)
<<
"The reply is "
<<
Redis
Message
TypeToString
(
_type
)
CHECK
(
false
)
<<
"The reply is "
<<
Redis
Reply
TypeToString
(
_type
)
<<
", not a string"
;
return
""
;
}
inline
butil
::
StringPiece
Redis
Message
::
data
()
const
{
inline
butil
::
StringPiece
Redis
Reply
::
data
()
const
{
if
(
is_string
())
{
if
(
_length
<
sizeof
(
_data
.
short_str
))
{
// SSO
return
butil
::
StringPiece
(
_data
.
short_str
,
_length
);
...
...
@@ -284,12 +284,12 @@ inline butil::StringPiece RedisMessage::data() const {
return
butil
::
StringPiece
(
_data
.
long_str
,
_length
);
}
}
CHECK
(
false
)
<<
"The reply is "
<<
Redis
Message
TypeToString
(
_type
)
CHECK
(
false
)
<<
"The reply is "
<<
Redis
Reply
TypeToString
(
_type
)
<<
", not a string"
;
return
butil
::
StringPiece
();
}
inline
const
char
*
Redis
Message
::
error_message
()
const
{
inline
const
char
*
Redis
Reply
::
error_message
()
const
{
if
(
is_error
())
{
if
(
_length
<
sizeof
(
_data
.
short_str
))
{
// SSO
return
_data
.
short_str
;
...
...
@@ -297,43 +297,43 @@ inline const char* RedisMessage::error_message() const {
return
_data
.
long_str
;
}
}
CHECK
(
false
)
<<
"The reply is "
<<
Redis
Message
TypeToString
(
_type
)
CHECK
(
false
)
<<
"The reply is "
<<
Redis
Reply
TypeToString
(
_type
)
<<
", not an error"
;
return
""
;
}
inline
size_t
Redis
Message
::
size
()
const
{
inline
size_t
Redis
Reply
::
size
()
const
{
return
(
is_array
()
?
_length
:
0
);
}
inline
Redis
Message
&
RedisMessage
::
operator
[](
size_t
index
)
{
return
const_cast
<
Redis
Message
&>
(
const_cast
<
const
Redis
Message
*>
(
this
)
->
operator
[](
index
));
inline
Redis
Reply
&
RedisReply
::
operator
[](
size_t
index
)
{
return
const_cast
<
Redis
Reply
&>
(
const_cast
<
const
Redis
Reply
*>
(
this
)
->
operator
[](
index
));
}
inline
const
Redis
Message
&
RedisMessage
::
operator
[](
size_t
index
)
const
{
inline
const
Redis
Reply
&
RedisReply
::
operator
[](
size_t
index
)
const
{
if
(
is_array
()
&&
index
<
_length
)
{
return
_data
.
array
.
replies
[
index
];
}
static
Redis
Message
redis_nil
;
static
Redis
Reply
redis_nil
;
return
redis_nil
;
}
inline
void
Redis
Message
::
Swap
(
RedisMessage
&
other
)
{
inline
void
Redis
Reply
::
Swap
(
RedisReply
&
other
)
{
std
::
swap
(
_type
,
other
.
_type
);
std
::
swap
(
_length
,
other
.
_length
);
std
::
swap
(
_data
.
padding
[
0
],
other
.
_data
.
padding
[
0
]);
std
::
swap
(
_data
.
padding
[
1
],
other
.
_data
.
padding
[
1
]);
}
inline
void
Redis
Message
::
Clear
()
{
_type
=
REDIS_
MESSAGE
_NIL
;
inline
void
Redis
Reply
::
Clear
()
{
_type
=
REDIS_
REPLY
_NIL
;
_length
=
0
;
_data
.
array
.
last_index
=
-
1
;
_data
.
array
.
replies
=
NULL
;
}
inline
void
Redis
Message
::
CopyFromSameArena
(
const
RedisMessage
&
other
)
{
inline
void
Redis
Reply
::
CopyFromSameArena
(
const
RedisReply
&
other
)
{
_type
=
other
.
_type
;
_length
=
other
.
_length
;
_data
.
padding
[
0
]
=
other
.
_data
.
padding
[
0
];
...
...
test/brpc_redis_unittest.cpp
View file @
1b342f75
...
...
@@ -104,32 +104,32 @@ protected:
void
TearDown
()
{}
};
void
AssertReplyEqual
(
const
brpc
::
Redis
Message
&
reply1
,
const
brpc
::
Redis
Message
&
reply2
)
{
void
AssertReplyEqual
(
const
brpc
::
Redis
Reply
&
reply1
,
const
brpc
::
Redis
Reply
&
reply2
)
{
if
(
&
reply1
==
&
reply2
)
{
return
;
}
CHECK_EQ
(
reply1
.
type
(),
reply2
.
type
());
switch
(
reply1
.
type
())
{
case
brpc
:
:
REDIS_
MESSAGE
_ARRAY
:
case
brpc
:
:
REDIS_
REPLY
_ARRAY
:
ASSERT_EQ
(
reply1
.
size
(),
reply2
.
size
());
for
(
size_t
j
=
0
;
j
<
reply1
.
size
();
++
j
)
{
ASSERT_NE
(
&
reply1
[
j
],
&
reply2
[
j
]);
// from different arena
AssertReplyEqual
(
reply1
[
j
],
reply2
[
j
]);
}
break
;
case
brpc
:
:
REDIS_
MESSAGE
_INTEGER
:
case
brpc
:
:
REDIS_
REPLY
_INTEGER
:
ASSERT_EQ
(
reply1
.
integer
(),
reply2
.
integer
());
break
;
case
brpc
:
:
REDIS_
MESSAGE
_NIL
:
case
brpc
:
:
REDIS_
REPLY
_NIL
:
break
;
case
brpc
:
:
REDIS_
MESSAGE
_STRING
:
case
brpc
:
:
REDIS_
REPLY
_STRING
:
// fall through
case
brpc
:
:
REDIS_
MESSAGE
_STATUS
:
case
brpc
:
:
REDIS_
REPLY
_STATUS
:
ASSERT_NE
(
reply1
.
c_str
(),
reply2
.
c_str
());
// from different arena
ASSERT_STREQ
(
reply1
.
c_str
(),
reply2
.
c_str
());
break
;
case
brpc
:
:
REDIS_
MESSAGE
_ERROR
:
case
brpc
:
:
REDIS_
REPLY
_ERROR
:
ASSERT_NE
(
reply1
.
error_message
(),
reply2
.
error_message
());
// from different arena
ASSERT_STREQ
(
reply1
.
error_message
(),
reply2
.
error_message
());
break
;
...
...
@@ -169,7 +169,7 @@ TEST_F(RedisTest, sanity) {
channel
.
CallMethod
(
NULL
,
&
cntl
,
&
request
,
&
response
,
NULL
);
ASSERT_FALSE
(
cntl
.
Failed
())
<<
cntl
.
ErrorText
();
ASSERT_EQ
(
1
,
response
.
reply_size
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_NIL
,
response
.
reply
(
0
).
type
())
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_NIL
,
response
.
reply
(
0
).
type
())
<<
response
;
cntl
.
Reset
();
...
...
@@ -179,7 +179,7 @@ TEST_F(RedisTest, sanity) {
channel
.
CallMethod
(
NULL
,
&
cntl
,
&
request
,
&
response
,
NULL
);
ASSERT_FALSE
(
cntl
.
Failed
())
<<
cntl
.
ErrorText
();
ASSERT_EQ
(
1
,
response
.
reply_size
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_STATUS
,
response
.
reply
(
0
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_STATUS
,
response
.
reply
(
0
).
type
());
ASSERT_EQ
(
"OK"
,
response
.
reply
(
0
).
data
());
cntl
.
Reset
();
...
...
@@ -189,7 +189,7 @@ TEST_F(RedisTest, sanity) {
channel
.
CallMethod
(
NULL
,
&
cntl
,
&
request
,
&
response
,
NULL
);
ASSERT_FALSE
(
cntl
.
Failed
());
ASSERT_EQ
(
1
,
response
.
reply_size
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_STRING
,
response
.
reply
(
0
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_STRING
,
response
.
reply
(
0
).
type
());
ASSERT_EQ
(
"world"
,
response
.
reply
(
0
).
data
());
cntl
.
Reset
();
...
...
@@ -199,7 +199,7 @@ TEST_F(RedisTest, sanity) {
channel
.
CallMethod
(
NULL
,
&
cntl
,
&
request
,
&
response
,
NULL
);
ASSERT_FALSE
(
cntl
.
Failed
())
<<
cntl
.
ErrorText
();
ASSERT_EQ
(
1
,
response
.
reply_size
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_STATUS
,
response
.
reply
(
0
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_STATUS
,
response
.
reply
(
0
).
type
());
ASSERT_EQ
(
"OK"
,
response
.
reply
(
0
).
data
());
cntl
.
Reset
();
...
...
@@ -209,7 +209,7 @@ TEST_F(RedisTest, sanity) {
channel
.
CallMethod
(
NULL
,
&
cntl
,
&
request
,
&
response
,
NULL
);
ASSERT_FALSE
(
cntl
.
Failed
());
ASSERT_EQ
(
1
,
response
.
reply_size
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_STRING
,
response
.
reply
(
0
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_STRING
,
response
.
reply
(
0
).
type
());
ASSERT_EQ
(
"world2"
,
response
.
reply
(
0
).
data
());
cntl
.
Reset
();
...
...
@@ -218,7 +218,7 @@ TEST_F(RedisTest, sanity) {
ASSERT_TRUE
(
request
.
AddCommand
(
"del hello"
));
channel
.
CallMethod
(
NULL
,
&
cntl
,
&
request
,
&
response
,
NULL
);
ASSERT_FALSE
(
cntl
.
Failed
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_INTEGER
,
response
.
reply
(
0
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_INTEGER
,
response
.
reply
(
0
).
type
());
ASSERT_EQ
(
1
,
response
.
reply
(
0
).
integer
());
cntl
.
Reset
();
...
...
@@ -228,7 +228,7 @@ TEST_F(RedisTest, sanity) {
channel
.
CallMethod
(
NULL
,
&
cntl
,
&
request
,
&
response
,
NULL
);
ASSERT_FALSE
(
cntl
.
Failed
())
<<
cntl
.
ErrorText
();
ASSERT_EQ
(
1
,
response
.
reply_size
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_NIL
,
response
.
reply
(
0
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_NIL
,
response
.
reply
(
0
).
type
());
}
TEST_F
(
RedisTest
,
keys_with_spaces
)
{
...
...
@@ -258,19 +258,19 @@ TEST_F(RedisTest, keys_with_spaces) {
channel
.
CallMethod
(
NULL
,
&
cntl
,
&
request
,
&
response
,
NULL
);
ASSERT_FALSE
(
cntl
.
Failed
())
<<
cntl
.
ErrorText
();
ASSERT_EQ
(
7
,
response
.
reply_size
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_STATUS
,
response
.
reply
(
0
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_STATUS
,
response
.
reply
(
0
).
type
());
ASSERT_EQ
(
"OK"
,
response
.
reply
(
0
).
data
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_STATUS
,
response
.
reply
(
1
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_STATUS
,
response
.
reply
(
1
).
type
());
ASSERT_EQ
(
"OK"
,
response
.
reply
(
1
).
data
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_STATUS
,
response
.
reply
(
2
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_STATUS
,
response
.
reply
(
2
).
type
());
ASSERT_EQ
(
"OK"
,
response
.
reply
(
2
).
data
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_STRING
,
response
.
reply
(
3
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_STRING
,
response
.
reply
(
3
).
type
());
ASSERT_EQ
(
"he1 he1 da1"
,
response
.
reply
(
3
).
data
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_STRING
,
response
.
reply
(
4
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_STRING
,
response
.
reply
(
4
).
type
());
ASSERT_EQ
(
"he1 he1 da1"
,
response
.
reply
(
4
).
data
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_STRING
,
response
.
reply
(
5
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_STRING
,
response
.
reply
(
5
).
type
());
ASSERT_EQ
(
"he2 he2 da2"
,
response
.
reply
(
5
).
data
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_STRING
,
response
.
reply
(
6
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_STRING
,
response
.
reply
(
6
).
type
());
ASSERT_EQ
(
"he3 he3 da3"
,
response
.
reply
(
6
).
data
());
brpc
::
RedisResponse
response2
=
response
;
...
...
@@ -299,13 +299,13 @@ TEST_F(RedisTest, incr_and_decr) {
channel
.
CallMethod
(
NULL
,
&
cntl
,
&
request
,
&
response
,
NULL
);
ASSERT_FALSE
(
cntl
.
Failed
())
<<
cntl
.
ErrorText
();
ASSERT_EQ
(
4
,
response
.
reply_size
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_INTEGER
,
response
.
reply
(
0
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_INTEGER
,
response
.
reply
(
0
).
type
());
ASSERT_EQ
(
1
,
response
.
reply
(
0
).
integer
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_INTEGER
,
response
.
reply
(
1
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_INTEGER
,
response
.
reply
(
1
).
type
());
ASSERT_EQ
(
0
,
response
.
reply
(
1
).
integer
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_INTEGER
,
response
.
reply
(
2
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_INTEGER
,
response
.
reply
(
2
).
type
());
ASSERT_EQ
(
10
,
response
.
reply
(
2
).
integer
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_INTEGER
,
response
.
reply
(
3
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_INTEGER
,
response
.
reply
(
3
).
type
());
ASSERT_EQ
(
-
10
,
response
.
reply
(
3
).
integer
());
brpc
::
RedisResponse
response2
=
response
;
...
...
@@ -340,13 +340,13 @@ TEST_F(RedisTest, by_components) {
channel
.
CallMethod
(
NULL
,
&
cntl
,
&
request
,
&
response
,
NULL
);
ASSERT_FALSE
(
cntl
.
Failed
())
<<
cntl
.
ErrorText
();
ASSERT_EQ
(
4
,
response
.
reply_size
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_INTEGER
,
response
.
reply
(
0
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_INTEGER
,
response
.
reply
(
0
).
type
());
ASSERT_EQ
(
1
,
response
.
reply
(
0
).
integer
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_INTEGER
,
response
.
reply
(
1
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_INTEGER
,
response
.
reply
(
1
).
type
());
ASSERT_EQ
(
0
,
response
.
reply
(
1
).
integer
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_INTEGER
,
response
.
reply
(
2
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_INTEGER
,
response
.
reply
(
2
).
type
());
ASSERT_EQ
(
10
,
response
.
reply
(
2
).
integer
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_INTEGER
,
response
.
reply
(
3
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_INTEGER
,
response
.
reply
(
3
).
type
());
ASSERT_EQ
(
-
10
,
response
.
reply
(
3
).
integer
());
brpc
::
RedisResponse
response2
=
response
;
...
...
@@ -383,13 +383,13 @@ TEST_F(RedisTest, auth) {
channel
.
CallMethod
(
NULL
,
&
cntl
,
&
request
,
&
response
,
NULL
);
ASSERT_FALSE
(
cntl
.
Failed
())
<<
cntl
.
ErrorText
();
ASSERT_EQ
(
4
,
response
.
reply_size
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_STATUS
,
response
.
reply
(
0
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_STATUS
,
response
.
reply
(
0
).
type
());
ASSERT_STREQ
(
"OK"
,
response
.
reply
(
0
).
c_str
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_STATUS
,
response
.
reply
(
1
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_STATUS
,
response
.
reply
(
1
).
type
());
ASSERT_STREQ
(
"OK"
,
response
.
reply
(
1
).
c_str
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_STATUS
,
response
.
reply
(
2
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_STATUS
,
response
.
reply
(
2
).
type
());
ASSERT_STREQ
(
"OK"
,
response
.
reply
(
2
).
c_str
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_STRING
,
response
.
reply
(
3
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_STRING
,
response
.
reply
(
3
).
type
());
ASSERT_STREQ
(
"my_redis"
,
response
.
reply
(
3
).
c_str
());
}
...
...
@@ -410,7 +410,7 @@ TEST_F(RedisTest, auth) {
channel
.
CallMethod
(
NULL
,
&
cntl
,
&
request
,
&
response
,
NULL
);
ASSERT_FALSE
(
cntl
.
Failed
())
<<
cntl
.
ErrorText
();
ASSERT_EQ
(
1
,
response
.
reply_size
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_ERROR
,
response
.
reply
(
0
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_ERROR
,
response
.
reply
(
0
).
type
());
}
// Auth with RedisAuthenticator && clear auth
...
...
@@ -435,9 +435,9 @@ TEST_F(RedisTest, auth) {
channel
.
CallMethod
(
NULL
,
&
cntl
,
&
request
,
&
response
,
NULL
);
ASSERT_FALSE
(
cntl
.
Failed
())
<<
cntl
.
ErrorText
();
ASSERT_EQ
(
2
,
response
.
reply_size
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_STRING
,
response
.
reply
(
0
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_STRING
,
response
.
reply
(
0
).
type
());
ASSERT_STREQ
(
"my_redis"
,
response
.
reply
(
0
).
c_str
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_STATUS
,
response
.
reply
(
1
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_STATUS
,
response
.
reply
(
1
).
type
());
ASSERT_STREQ
(
"OK"
,
response
.
reply
(
1
).
c_str
());
}
...
...
@@ -458,7 +458,7 @@ TEST_F(RedisTest, auth) {
channel
.
CallMethod
(
NULL
,
&
cntl
,
&
request
,
&
response
,
NULL
);
ASSERT_FALSE
(
cntl
.
Failed
())
<<
cntl
.
ErrorText
();
ASSERT_EQ
(
1
,
response
.
reply_size
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_STRING
,
response
.
reply
(
0
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_STRING
,
response
.
reply
(
0
).
type
());
ASSERT_STREQ
(
"my_redis"
,
response
.
reply
(
0
).
c_str
());
}
}
...
...
@@ -553,7 +553,7 @@ TEST_F(RedisTest, codec) {
butil
::
Arena
arena
;
// status
{
brpc
::
Redis
Message
r
(
&
arena
);
brpc
::
Redis
Reply
r
(
&
arena
);
butil
::
IOBuf
buf
;
ASSERT_TRUE
(
r
.
SetStatus
(
"OK"
));
ASSERT_TRUE
(
r
.
SerializeToIOBuf
(
&
buf
));
...
...
@@ -567,7 +567,7 @@ TEST_F(RedisTest, codec) {
}
// error
{
brpc
::
Redis
Message
r
(
&
arena
);
brpc
::
Redis
Reply
r
(
&
arena
);
butil
::
IOBuf
buf
;
ASSERT_TRUE
(
r
.
SetError
(
"not exist
\'
key
\'
"
));
ASSERT_TRUE
(
r
.
SerializeToIOBuf
(
&
buf
));
...
...
@@ -580,7 +580,7 @@ TEST_F(RedisTest, codec) {
}
// string
{
brpc
::
Redis
Message
r
(
&
arena
);
brpc
::
Redis
Reply
r
(
&
arena
);
butil
::
IOBuf
buf
;
ASSERT_TRUE
(
r
.
SetNilString
());
ASSERT_TRUE
(
r
.
SerializeToIOBuf
(
&
buf
));
...
...
@@ -603,7 +603,7 @@ TEST_F(RedisTest, codec) {
}
// integer
{
brpc
::
Redis
Message
r
(
&
arena
);
brpc
::
Redis
Reply
r
(
&
arena
);
butil
::
IOBuf
buf
;
int
t
=
2
;
int
input
[]
=
{
-
1
,
1234567
};
...
...
@@ -622,10 +622,10 @@ TEST_F(RedisTest, codec) {
}
// array
{
brpc
::
Redis
Message
r
(
&
arena
);
brpc
::
Redis
Reply
r
(
&
arena
);
butil
::
IOBuf
buf
;
ASSERT_TRUE
(
r
.
SetArray
(
3
));
brpc
::
Redis
Message
&
sub_reply
=
r
[
0
];
brpc
::
Redis
Reply
&
sub_reply
=
r
[
0
];
sub_reply
.
SetArray
(
2
);
sub_reply
[
0
].
SetBulkString
(
"hello, it's me"
);
sub_reply
[
1
].
SetInteger
(
422
);
...
...
@@ -684,7 +684,7 @@ public:
:
_sleep
(
sleep
)
{}
brpc
::
RedisCommandHandler
::
Result
Run
(
const
char
*
args
[],
brpc
::
Redis
Message
*
output
,
brpc
::
Redis
Reply
*
output
,
google
::
protobuf
::
Closure
*
done
)
{
brpc
::
ClosureGuard
done_guard
(
done
);
std
::
string
key
=
args
[
1
];
...
...
@@ -716,7 +716,7 @@ public:
:
_sleep
(
sleep
)
{}
brpc
::
RedisCommandHandler
::
Result
Run
(
const
char
*
args
[],
brpc
::
Redis
Message
*
output
,
brpc
::
Redis
Reply
*
output
,
google
::
protobuf
::
Closure
*
done
)
{
brpc
::
ClosureGuard
done_guard
(
done
);
std
::
string
key
=
args
[
1
];
...
...
@@ -751,7 +751,7 @@ public:
:
_sleep
(
sleep
)
{}
brpc
::
RedisCommandHandler
::
Result
Run
(
const
char
*
args
[],
brpc
::
Redis
Message
*
output
,
brpc
::
Redis
Reply
*
output
,
google
::
protobuf
::
Closure
*
done
)
{
brpc
::
ClosureGuard
done_guard
(
done
);
int64_t
value
;
...
...
@@ -812,17 +812,17 @@ TEST_F(RedisTest, server_sanity) {
channel
.
CallMethod
(
NULL
,
&
cntl
,
&
request
,
&
response
,
NULL
);
ASSERT_FALSE
(
cntl
.
Failed
())
<<
cntl
.
ErrorText
();
ASSERT_EQ
(
7
,
response
.
reply_size
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_NIL
,
response
.
reply
(
0
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_NIL
,
response
.
reply
(
1
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_STATUS
,
response
.
reply
(
2
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_NIL
,
response
.
reply
(
0
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_NIL
,
response
.
reply
(
1
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_STATUS
,
response
.
reply
(
2
).
type
());
ASSERT_STREQ
(
"OK"
,
response
.
reply
(
2
).
c_str
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_STRING
,
response
.
reply
(
3
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_STRING
,
response
.
reply
(
3
).
type
());
ASSERT_STREQ
(
"value1"
,
response
.
reply
(
3
).
c_str
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_STATUS
,
response
.
reply
(
4
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_STATUS
,
response
.
reply
(
4
).
type
());
ASSERT_STREQ
(
"OK"
,
response
.
reply
(
4
).
c_str
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_STRING
,
response
.
reply
(
5
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_STRING
,
response
.
reply
(
5
).
type
());
ASSERT_STREQ
(
"value2"
,
response
.
reply
(
5
).
c_str
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_ERROR
,
response
.
reply
(
6
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_ERROR
,
response
.
reply
(
6
).
type
());
ASSERT_TRUE
(
butil
::
StringPiece
(
response
.
reply
(
6
).
error_message
()).
starts_with
(
"ERR unknown command"
));
ASSERT_EQ
(
gh
->
new_count
(),
1
);
...
...
@@ -884,7 +884,7 @@ public:
:
_started
(
false
)
{}
RedisCommandHandler
::
Result
Run
(
const
char
*
args
[],
brpc
::
Redis
Message
*
output
,
brpc
::
Redis
Reply
*
output
,
google
::
protobuf
::
Closure
*
done
)
{
brpc
::
ClosureGuard
done_guard
(
done
);
if
(
strcasecmp
(
args
[
0
],
"multi"
)
==
0
)
{
...
...
@@ -966,13 +966,13 @@ TEST_F(RedisTest, server_command_continue) {
channel
.
CallMethod
(
NULL
,
&
cntl
,
&
request
,
&
response
,
NULL
);
ASSERT_EQ
(
12
,
response
.
reply_size
());
ASSERT_FALSE
(
cntl
.
Failed
())
<<
cntl
.
ErrorText
();
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_STATUS
,
response
.
reply
(
0
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_STATUS
,
response
.
reply
(
0
).
type
());
ASSERT_STREQ
(
"OK"
,
response
.
reply
(
0
).
c_str
());
for
(
int
i
=
1
;
i
<
count
+
1
;
++
i
)
{
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_STATUS
,
response
.
reply
(
i
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_STATUS
,
response
.
reply
(
i
).
type
());
ASSERT_STREQ
(
"QUEUED"
,
response
.
reply
(
i
).
c_str
());
}
const
brpc
::
Redis
Message
&
m
=
response
.
reply
(
count
+
1
);
const
brpc
::
Redis
Reply
&
m
=
response
.
reply
(
count
+
1
);
ASSERT_EQ
(
count
,
(
int
)
m
.
size
());
for
(
int
i
=
0
;
i
<
count
;
++
i
)
{
ASSERT_EQ
(
i
+
1
,
m
[
i
].
integer
());
...
...
@@ -990,10 +990,10 @@ TEST_F(RedisTest, server_command_continue) {
channel
.
CallMethod
(
NULL
,
&
cntl
,
&
request
,
&
response
,
NULL
);
ASSERT_FALSE
(
cntl
.
Failed
())
<<
cntl
.
ErrorText
();
ASSERT_STREQ
(
"world"
,
response
.
reply
(
0
).
c_str
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_NIL
,
response
.
reply
(
1
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_STATUS
,
response
.
reply
(
2
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_NIL
,
response
.
reply
(
1
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_STATUS
,
response
.
reply
(
2
).
type
());
ASSERT_STREQ
(
"OK"
,
response
.
reply
(
2
).
c_str
());
ASSERT_EQ
(
brpc
::
REDIS_
MESSAGE
_STRING
,
response
.
reply
(
3
).
type
());
ASSERT_EQ
(
brpc
::
REDIS_
REPLY
_STRING
,
response
.
reply
(
3
).
type
());
ASSERT_STREQ
(
"value1"
,
response
.
reply
(
3
).
c_str
());
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment