Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
B
brpc
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
brpc
Commits
5a1b7235
Commit
5a1b7235
authored
May 03, 2018
by
wangxuefeng
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Update as comments
parent
98d6584d
Hide whitespace changes
Inline
Side-by-side
Showing
11 changed files
with
165 additions
and
152 deletions
+165
-152
client.cpp
example/thrift_extension_c++/client.cpp
+4
-6
server.cpp
example/thrift_extension_c++/server.cpp
+24
-11
thrift_utils.h
src/brpc/details/thrift_utils.h
+2
-0
global.cpp
src/brpc/global.cpp
+3
-3
thrift_protocol.cpp
src/brpc/policy/thrift_protocol.cpp
+41
-41
thrift_protocol.h
src/brpc/policy/thrift_protocol.h
+6
-6
server.h
src/brpc/server.h
+2
-2
thrift_message.cpp
src/brpc/thrift_message.cpp
+36
-36
thrift_message.h
src/brpc/thrift_message.h
+18
-18
thrift_service.cpp
src/brpc/thrift_service.cpp
+5
-5
thrift_service.h
src/brpc/thrift_service.h
+24
-24
No files found.
example/thrift_extension_c++/client.cpp
View file @
5a1b7235
...
...
@@ -21,10 +21,10 @@
#include <butil/logging.h>
#include <butil/time.h>
#include <butil/thrift_utils.h>
#include <butil/strings/string_piece.h>
#include <brpc/channel.h>
#include <brpc/thrift_framed_message.h>
#include <brpc/details/thrift_utils.h>
#include <brpc/thrift_message.h>
#include <bvar/bvar.h>
#include <thrift/transport/TBufferTransports.h>
...
...
@@ -68,8 +68,8 @@ int main(int argc, char* argv[]) {
cntl
.
set_log_id
(
log_id
++
);
// set by user
// wrapper thrift raw request into ThriftMessage
brpc
::
ThriftMessage
<
example
::
EchoRequest
>
req
;
brpc
::
ThriftMessage
<
example
::
EchoResponse
>
res
;
brpc
::
Thrift
Template
Message
<
example
::
EchoRequest
>
req
;
brpc
::
Thrift
Template
Message
<
example
::
EchoResponse
>
res
;
req
.
raw
().
data
=
"hello"
;
...
...
@@ -98,6 +98,4 @@ int main(int argc, char* argv[]) {
return
0
;
}
template
class
brpc
::
ThriftMessage
<
example
::
EchoRequest
>
;
template
class
brpc
::
ThriftMessage
<
example
::
EchoResponse
>
;
example/thrift_extension_c++/server.cpp
View file @
5a1b7235
...
...
@@ -45,13 +45,15 @@ public:
};
// Adapt your own thrift-based protocol to use brpc
class
MyThriftProtocol
:
public
brpc
::
Thrift
Framed
Service
{
class
MyThriftProtocol
:
public
brpc
::
ThriftService
{
public
:
MyThriftProtocol
(
EchoServiceHandler
*
handler
)
:
_handler
(
handler
)
{
}
void
ProcessThriftFramedRequest
(
const
brpc
::
Server
&
,
brpc
::
Controller
*
cntl
,
brpc
::
Thrift
Framed
Message
*
request
,
brpc
::
Thrift
FramedMessage
*
response
,
brpc
::
Thrift
Framed
Closure
*
done
)
{
brpc
::
ThriftMessage
*
request
,
brpc
::
Thrift
Message
*
response
,
brpc
::
ThriftClosure
*
done
)
{
// This object helps you to call done->Run() in RAII style. If you need
// to process the request asynchronously, pass done_guard.release().
brpc
::
ClosureGuard
done_guard
(
done
);
...
...
@@ -63,27 +65,35 @@ public:
return
;
}
auto
handler
=
new
EchoServiceHandler
();
example
::
EchoRequest
*
req
=
request
->
cast
<
example
::
EchoRequest
>
();
example
::
EchoResponse
*
res
=
response
->
cast
<
example
::
EchoResponse
>
();
// process with req and res
handler
->
Echo
(
*
res
,
*
req
);
if
(
_handler
)
{
_handler
->
Echo
(
*
res
,
*
req
);
}
else
{
cntl
->
CloseConnection
(
"Close connection due to no valid handler"
);
LOG
(
ERROR
)
<<
"Fail to process thrift request due to no valid handler"
;
return
;
}
LOG
(
INFO
)
<<
"success to process thrift request in brpc with handler"
;
}
private
:
EchoServiceHandler
*
_handler
;
};
// Adapt your own thrift-based protocol to use brpc
class
MyThriftProtocolPbManner
:
public
brpc
::
Thrift
Framed
Service
{
class
MyThriftProtocolPbManner
:
public
brpc
::
ThriftService
{
public
:
void
ProcessThriftFramedRequest
(
const
brpc
::
Server
&
,
brpc
::
Controller
*
cntl
,
brpc
::
Thrift
Framed
Message
*
request
,
brpc
::
Thrift
FramedMessage
*
response
,
brpc
::
Thrift
Framed
Closure
*
done
)
{
brpc
::
ThriftMessage
*
request
,
brpc
::
Thrift
Message
*
response
,
brpc
::
ThriftClosure
*
done
)
{
// This object helps you to call done->Run() in RAII style. If you need
// to process the request asynchronously, pass done_guard.release().
brpc
::
ClosureGuard
done_guard
(
done
);
...
...
@@ -114,7 +124,10 @@ int main(int argc, char* argv[]) {
brpc
::
Server
server
;
brpc
::
ServerOptions
options
;
options
.
thrift_service
=
new
MyThriftProtocol
;
auto
thrift_service_handler
=
new
EchoServiceHandler
();
options
.
thrift_service
=
new
MyThriftProtocol
(
thrift_service_handler
);
options
.
idle_timeout_sec
=
FLAGS_idle_timeout_s
;
options
.
max_concurrency
=
FLAGS_max_concurrency
;
...
...
src/brpc/details/thrift_utils.h
View file @
5a1b7235
...
...
@@ -21,6 +21,8 @@
#include <boost/make_shared.hpp>
#include "butil/iobuf.h"
#include <thrift/TDispatchProcessor.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/protocol/TBinaryProtocol.h>
...
...
src/brpc/global.cpp
View file @
5a1b7235
...
...
@@ -97,7 +97,7 @@ using namespace policy;
const
char
*
const
DUMMY_SERVER_PORT_FILE
=
"dummy_server.port"
;
void
__attribute__
((
weak
))
RegisterThrift
Framed
Protocol
();
void
__attribute__
((
weak
))
RegisterThriftProtocol
();
struct
GlobalExtensions
{
GlobalExtensions
()
...
...
@@ -469,8 +469,8 @@ static void GlobalInitializeOrDieImpl() {
// Register Thrift framed protocol if linked
if
(
brpc
::
RegisterThrift
Framed
Protocol
)
{
brpc
::
RegisterThrift
Framed
Protocol
();
if
(
brpc
::
RegisterThriftProtocol
)
{
brpc
::
RegisterThriftProtocol
();
}
// Only valid at client side
...
...
src/brpc/policy/thrift_protocol.cpp
View file @
5a1b7235
...
...
@@ -46,7 +46,7 @@ void bthread_assign_data(void* data) __THROW;
namespace
brpc
{
Thrift
FramedClosure
::
ThriftFramed
Closure
(
void
*
additional_space
)
Thrift
Closure
::
Thrift
Closure
(
void
*
additional_space
)
:
_socket_ptr
(
NULL
)
,
_server
(
NULL
)
,
_start_parse_us
(
0
)
...
...
@@ -54,25 +54,25 @@ ThriftFramedClosure::ThriftFramedClosure(void* additional_space)
,
_additional_space
(
additional_space
)
{
}
Thrift
FramedClosure
::~
ThriftFramed
Closure
()
{
Thrift
Closure
::~
Thrift
Closure
()
{
LogErrorTextAndDelete
(
false
)(
&
_controller
);
}
void
Thrift
Framed
Closure
::
DoNotRespond
()
{
void
ThriftClosure
::
DoNotRespond
()
{
_do_respond
=
false
;
}
class
DeleteThrift
Framed
Closure
{
class
DeleteThriftClosure
{
public
:
void
operator
()(
Thrift
Framed
Closure
*
done
)
const
{
done
->~
Thrift
Framed
Closure
();
void
operator
()(
ThriftClosure
*
done
)
const
{
done
->~
ThriftClosure
();
free
(
done
);
}
};
void
Thrift
Framed
Closure
::
Run
()
{
void
ThriftClosure
::
Run
()
{
// Recycle itself after `Run'
std
::
unique_ptr
<
Thrift
FramedClosure
,
DeleteThriftFramed
Closure
>
recycle_ctx
(
this
);
std
::
unique_ptr
<
Thrift
Closure
,
DeleteThrift
Closure
>
recycle_ctx
(
this
);
SocketUniquePtr
sock
(
_socket_ptr
);
ScopedRemoveConcurrency
remove_concurrency_dummy
(
_server
,
&
_controller
);
...
...
@@ -187,7 +187,7 @@ void ThriftFramedClosure::Run() {
}
}
void
Thrift
Framed
Closure
::
SetMethodName
(
const
std
::
string
&
full_method_name
)
{
void
ThriftClosure
::
SetMethodName
(
const
std
::
string
&
full_method_name
)
{
ControllerPrivateAccessor
accessor
(
&
_controller
);
Span
*
span
=
accessor
.
span
();
if
(
span
)
{
...
...
@@ -197,7 +197,7 @@ void ThriftFramedClosure::SetMethodName(const std::string& full_method_name) {
namespace
policy
{
ParseResult
ParseThrift
Framed
Message
(
butil
::
IOBuf
*
source
,
ParseResult
ParseThriftMessage
(
butil
::
IOBuf
*
source
,
Socket
*
,
bool
/*read_eof*/
,
const
void
*
/*arg*/
)
{
char
header_buf
[
sizeof
(
thrift_binary_head_t
)
+
3
];
...
...
@@ -232,12 +232,12 @@ ParseResult ParseThriftFramedMessage(butil::IOBuf* source,
}
struct
CallMethodInBackupThreadArgs
{
Thrift
Framed
Service
*
service
;
ThriftService
*
service
;
const
Server
*
server
;
Controller
*
controller
;
Thrift
Framed
Message
*
request
;
Thrift
Framed
Message
*
response
;
Thrift
Framed
Closure
*
done
;
ThriftMessage
*
request
;
ThriftMessage
*
response
;
ThriftClosure
*
done
;
};
static
void
CallMethodInBackupThread
(
void
*
void_args
)
{
...
...
@@ -248,12 +248,12 @@ static void CallMethodInBackupThread(void* void_args) {
delete
args
;
}
static
void
EndRunningCallMethodInPool
(
Thrift
Framed
Service
*
service
,
static
void
EndRunningCallMethodInPool
(
ThriftService
*
service
,
const
Server
&
server
,
Controller
*
controller
,
Thrift
Framed
Message
*
request
,
Thrift
Framed
Message
*
response
,
Thrift
Framed
Closure
*
done
)
{
ThriftMessage
*
request
,
ThriftMessage
*
response
,
ThriftClosure
*
done
)
{
CallMethodInBackupThreadArgs
*
args
=
new
CallMethodInBackupThreadArgs
;
args
->
service
=
service
;
args
->
server
=
&
server
;
...
...
@@ -264,7 +264,7 @@ static void EndRunningCallMethodInPool(ThriftFramedService* service,
return
EndRunningUserCodeInPool
(
CallMethodInBackupThread
,
args
);
};
void
ProcessThrift
Framed
Request
(
InputMessageBase
*
msg_base
)
{
void
ProcessThriftRequest
(
InputMessageBase
*
msg_base
)
{
const
int64_t
start_parse_us
=
butil
::
cpuwide_time_us
();
...
...
@@ -278,7 +278,7 @@ void ProcessThriftFramedRequest(InputMessageBase* msg_base) {
thrift_binary_head_t
*
req_head
=
(
thrift_binary_head_t
*
)
p
;
req_head
->
body_len
=
ntohl
(
req_head
->
body_len
);
Thrift
Framed
Service
*
service
=
server
->
options
().
thrift_service
;
ThriftService
*
service
=
server
->
options
().
thrift_service
;
if
(
service
==
NULL
)
{
LOG_EVERY_SECOND
(
WARNING
)
<<
"Received thrift request however the server does not set"
...
...
@@ -287,9 +287,9 @@ void ProcessThriftFramedRequest(InputMessageBase* msg_base) {
return
;
}
void
*
space
=
malloc
(
sizeof
(
Thrift
Framed
Closure
)
+
service
->
_additional_space
);
void
*
space
=
malloc
(
sizeof
(
ThriftClosure
)
+
service
->
_additional_space
);
if
(
!
space
)
{
LOG
(
FATAL
)
<<
"Fail to new Thrift
Framed
Closure"
;
LOG
(
FATAL
)
<<
"Fail to new ThriftClosure"
;
socket
->
SetFailed
();
return
;
}
...
...
@@ -303,12 +303,12 @@ void ProcessThriftFramedRequest(InputMessageBase* msg_base) {
void
*
sub_space
=
NULL
;
if
(
service
->
_additional_space
)
{
sub_space
=
(
char
*
)
space
+
sizeof
(
Thrift
Framed
Closure
);
sub_space
=
(
char
*
)
space
+
sizeof
(
ThriftClosure
);
}
Thrift
FramedClosure
*
thrift_done
=
new
(
space
)
ThriftFramed
Closure
(
sub_space
);
Thrift
Closure
*
thrift_done
=
new
(
space
)
Thrift
Closure
(
sub_space
);
Controller
*
cntl
=
&
(
thrift_done
->
_controller
);
Thrift
Framed
Message
*
req
=
&
(
thrift_done
->
_request
);
Thrift
Framed
Message
*
res
=
&
(
thrift_done
->
_response
);
ThriftMessage
*
req
=
&
(
thrift_done
->
_request
);
ThriftMessage
*
res
=
&
(
thrift_done
->
_response
);
req
->
head
=
*
req_head
;
msg
->
payload
.
swap
(
req
->
body
);
...
...
@@ -321,7 +321,7 @@ void ProcessThriftFramedRequest(InputMessageBase* msg_base) {
const
bool
security_mode
=
server
->
options
().
security_mode
()
&&
socket
->
user
()
==
server_accessor
.
acceptor
();
// Initialize log_id with the log_id in thrift. Notice that the protocols
// on top of Thrift
Framed
Service may pack log_id in meta or user messages and
// on top of ThriftService may pack log_id in meta or user messages and
// overwrite the value.
//cntl->set_log_id(req_head->log_id);
accessor
.
set_server
(
server
)
...
...
@@ -394,11 +394,11 @@ void ProcessThriftFramedRequest(InputMessageBase* msg_base) {
}
void
ProcessThrift
Framed
Response
(
InputMessageBase
*
msg_base
)
{
void
ProcessThriftResponse
(
InputMessageBase
*
msg_base
)
{
const
int64_t
start_parse_us
=
butil
::
cpuwide_time_us
();
DestroyingPtr
<
MostCommonMessage
>
msg
(
static_cast
<
MostCommonMessage
*>
(
msg_base
));
// Fetch correlation id that we saved before in `PacThrift
Framed
Request'
// Fetch correlation id that we saved before in `PacThriftRequest'
const
CallId
cid
=
{
static_cast
<
uint64_t
>
(
msg
->
socket
()
->
correlation_id
())
};
Controller
*
cntl
=
NULL
;
const
int
rc
=
bthread_id_lock
(
cid
,
(
void
**
)
&
cntl
);
...
...
@@ -417,8 +417,8 @@ void ProcessThriftFramedResponse(InputMessageBase* msg_base) {
span
->
set_start_parse_us
(
start_parse_us
);
}
// MUST be Thrift
FramedMessage (checked in SerializeThriftFramed
Request)
Thrift
FramedMessage
*
response
=
(
ThriftFramed
Message
*
)
cntl
->
response
();
// MUST be Thrift
Message (checked in SerializeThrift
Request)
Thrift
Message
*
response
=
(
Thrift
Message
*
)
cntl
->
response
();
const
int
saved_error
=
cntl
->
ErrorCode
();
if
(
response
!=
NULL
)
{
msg
->
meta
.
copy_to
(
&
response
->
head
,
sizeof
(
thrift_binary_head_t
));
...
...
@@ -516,7 +516,7 @@ void ProcessThriftFramedResponse(InputMessageBase* msg_base) {
accessor
.
OnResponse
(
cid
,
saved_error
);
}
bool
VerifyThrift
Framed
Request
(
const
InputMessageBase
*
msg_base
)
{
bool
VerifyThriftRequest
(
const
InputMessageBase
*
msg_base
)
{
Server
*
server
=
(
Server
*
)
msg_base
->
arg
();
if
(
server
->
options
().
auth
)
{
LOG
(
WARNING
)
<<
"thrift does not support authentication"
;
...
...
@@ -525,14 +525,14 @@ bool VerifyThriftFramedRequest(const InputMessageBase* msg_base) {
return
true
;
}
void
SerializeThrift
Framed
Request
(
butil
::
IOBuf
*
request_buf
,
Controller
*
cntl
,
void
SerializeThriftRequest
(
butil
::
IOBuf
*
request_buf
,
Controller
*
cntl
,
const
google
::
protobuf
::
Message
*
req_base
)
{
if
(
req_base
==
NULL
)
{
return
cntl
->
SetFailed
(
EREQUEST
,
"request is NULL"
);
}
ControllerPrivateAccessor
accessor
(
cntl
);
const
Thrift
FramedMessage
*
req
=
(
const
ThriftFramed
Message
*
)
req_base
;
const
Thrift
Message
*
req
=
(
const
Thrift
Message
*
)
req_base
;
thrift_binary_head_t
head
=
req
->
head
;
...
...
@@ -566,7 +566,7 @@ void SerializeThriftFramedRequest(butil::IOBuf* request_buf, Controller* cntl,
xfer
+=
out_portocol
->
writeFieldBegin
(
"request"
,
::
apache
::
thrift
::
protocol
::
T_STRUCT
,
1
);
// request's write
Thrift
FramedMessage
*
r
=
const_cast
<
ThriftFramed
Message
*>
(
req
);
Thrift
Message
*
r
=
const_cast
<
Thrift
Message
*>
(
req
);
xfer
+=
r
->
write
(
out_portocol
.
get
());
// end request's write
...
...
@@ -594,7 +594,7 @@ void SerializeThriftFramedRequest(butil::IOBuf* request_buf, Controller* cntl,
}
void
PackThrift
Framed
Request
(
void
PackThriftRequest
(
butil
::
IOBuf
*
packet_buf
,
SocketMessage
**
,
uint64_t
correlation_id
,
...
...
@@ -624,12 +624,12 @@ void PackThriftFramedRequest(
}
// namespace policy
void
RegisterThrift
Framed
Protocol
()
{
void
RegisterThriftProtocol
()
{
Protocol
thrift_binary_protocol
=
{
policy
::
ParseThrift
Framed
Message
,
policy
::
SerializeThrift
FramedRequest
,
policy
::
PackThriftFramed
Request
,
policy
::
ProcessThrift
FramedRequest
,
policy
::
ProcessThriftFramed
Response
,
policy
::
VerifyThrift
Framed
Request
,
NULL
,
NULL
,
Protocol
thrift_binary_protocol
=
{
policy
::
ParseThriftMessage
,
policy
::
SerializeThrift
Request
,
policy
::
PackThrift
Request
,
policy
::
ProcessThrift
Request
,
policy
::
ProcessThrift
Response
,
policy
::
VerifyThriftRequest
,
NULL
,
NULL
,
CONNECTION_TYPE_POOLED_AND_SHORT
,
"thrift"
};
if
(
RegisterProtocol
(
PROTOCOL_THRIFT
,
thrift_binary_protocol
)
!=
0
)
{
exit
(
1
);
...
...
src/brpc/policy/thrift_protocol.h
View file @
5a1b7235
...
...
@@ -25,18 +25,18 @@ namespace brpc {
namespace
policy
{
// Parse binary protocol format of thrift framed
ParseResult
ParseThrift
Framed
Message
(
butil
::
IOBuf
*
source
,
Socket
*
socket
,
bool
read_eof
,
const
void
*
arg
);
ParseResult
ParseThriftMessage
(
butil
::
IOBuf
*
source
,
Socket
*
socket
,
bool
read_eof
,
const
void
*
arg
);
// Actions to a (client) request in thrift binary framed format
void
ProcessThrift
Framed
Request
(
InputMessageBase
*
msg
);
void
ProcessThriftRequest
(
InputMessageBase
*
msg
);
// Actions to a (server) response in thrift binary framed format
void
ProcessThrift
Framed
Response
(
InputMessageBase
*
msg
);
void
ProcessThriftResponse
(
InputMessageBase
*
msg
);
void
SerializeThrift
Framed
Request
(
butil
::
IOBuf
*
request_buf
,
Controller
*
controller
,
void
SerializeThriftRequest
(
butil
::
IOBuf
*
request_buf
,
Controller
*
controller
,
const
google
::
protobuf
::
Message
*
request
);
void
PackThrift
Framed
Request
(
void
PackThriftRequest
(
butil
::
IOBuf
*
packet_buf
,
SocketMessage
**
,
uint64_t
correlation_id
,
...
...
@@ -46,7 +46,7 @@ void PackThriftFramedRequest(
const
Authenticator
*
);
// Verify authentication information in thrift binary format
bool
VerifyThrift
Framed
Request
(
const
InputMessageBase
*
msg
);
bool
VerifyThriftRequest
(
const
InputMessageBase
*
msg
);
}
// namespace policy
}
// namespace brpc
...
...
src/brpc/server.h
View file @
5a1b7235
...
...
@@ -46,7 +46,7 @@ namespace brpc {
class
Acceptor
;
class
MethodStatus
;
class
NsheadService
;
class
Thrift
Framed
Service
;
class
ThriftService
;
class
SimpleDataPool
;
class
MongoServiceAdaptor
;
class
RestfulMap
;
...
...
@@ -73,7 +73,7 @@ struct ServerOptions {
// Process requests in format of thrift_binary_head_t + blob.
// Owned by Server and deleted in server's destructor
// Default: NULL
Thrift
Framed
Service
*
thrift_service
;
ThriftService
*
thrift_service
;
// Adaptor for Mongo protocol, check src/brpc/mongo_service_adaptor.h for details
// The adaptor will not be deleted by server
...
...
src/brpc/thrift_
framed_
message.cpp
→
src/brpc/thrift_message.cpp
View file @
5a1b7235
...
...
@@ -17,7 +17,7 @@
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
#define INTERNAL_SUPPRESS_PROTOBUF_FIELD_DEPRECATION
#include "brpc/thrift_
framed_
message.h"
#include "brpc/thrift_message.h"
#include <algorithm>
#include "butil/logging.h"
...
...
@@ -36,7 +36,7 @@
namespace
brpc
{
namespace
{
const
::
google
::
protobuf
::
Descriptor
*
Thrift
Framed
Message_descriptor_
=
NULL
;
const
::
google
::
protobuf
::
Descriptor
*
ThriftMessage_descriptor_
=
NULL
;
}
// namespace
...
...
@@ -46,7 +46,7 @@ void protobuf_AssignDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto() {
::
google
::
protobuf
::
DescriptorPool
::
generated_pool
()
->
FindFileByName
(
"baidu/rpc/thrift_framed_message.proto"
);
GOOGLE_CHECK
(
file
!=
NULL
);
Thrift
Framed
Message_descriptor_
=
file
->
message_type
(
0
);
ThriftMessage_descriptor_
=
file
->
message_type
(
0
);
}
namespace
{
...
...
@@ -60,13 +60,13 @@ inline void protobuf_AssignDescriptorsOnce() {
void
protobuf_RegisterTypes
(
const
::
std
::
string
&
)
{
protobuf_AssignDescriptorsOnce
();
::
google
::
protobuf
::
MessageFactory
::
InternalRegisterGeneratedMessage
(
Thrift
FramedMessage_descriptor_
,
&
ThriftFramed
Message
::
default_instance
());
Thrift
Message_descriptor_
,
&
Thrift
Message
::
default_instance
());
}
}
// namespace
void
protobuf_ShutdownFile_baidu_2frpc_2fthrift_binary_5fmessage_2eproto
()
{
delete
Thrift
Framed
Message
::
default_instance_
;
delete
ThriftMessage
::
default_instance_
;
}
void
protobuf_AddDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto_impl
()
{
...
...
@@ -82,8 +82,8 @@ void protobuf_AddDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto_impl() {
"hriftBinaryMessage"
,
58
);
::
google
::
protobuf
::
MessageFactory
::
InternalRegisterGeneratedFile
(
"thrift_framed_message.proto"
,
&
protobuf_RegisterTypes
);
Thrift
FramedMessage
::
default_instance_
=
new
ThriftFramed
Message
();
Thrift
Framed
Message
::
default_instance_
->
InitAsDefaultInstance
();
Thrift
Message
::
default_instance_
=
new
Thrift
Message
();
ThriftMessage
::
default_instance_
->
InitAsDefaultInstance
();
::
google
::
protobuf
::
internal
::
OnShutdown
(
&
protobuf_ShutdownFile_baidu_2frpc_2fthrift_binary_5fmessage_2eproto
);
}
...
...
@@ -108,64 +108,64 @@ struct StaticDescriptorInitializer_baidu_2frpc_2fthrift_binary_5fmessage_2eproto
#ifndef _MSC_VER
#endif // !_MSC_VER
Thrift
FramedMessage
::
ThriftFramed
Message
()
Thrift
Message
::
Thrift
Message
()
:
::
google
::
protobuf
::
Message
()
{
SharedCtor
();
}
void
Thrift
Framed
Message
::
InitAsDefaultInstance
()
{
void
ThriftMessage
::
InitAsDefaultInstance
()
{
}
Thrift
FramedMessage
::
ThriftFramedMessage
(
const
ThriftFramed
Message
&
from
)
Thrift
Message
::
ThriftMessage
(
const
Thrift
Message
&
from
)
:
::
google
::
protobuf
::
Message
()
{
SharedCtor
();
MergeFrom
(
from
);
}
void
Thrift
Framed
Message
::
SharedCtor
()
{
void
ThriftMessage
::
SharedCtor
()
{
memset
(
&
head
,
0
,
sizeof
(
head
));
thrift_raw_instance_deleter
=
nullptr
;
thrift_raw_instance
=
nullptr
;
thrift_message_seq_id
=
0
;
method_name
=
""
;
//RegisterThrift
Framed
ProtocolDummy dummy;
//RegisterThriftProtocolDummy dummy;
}
Thrift
FramedMessage
::~
ThriftFramed
Message
()
{
Thrift
Message
::~
Thrift
Message
()
{
SharedDtor
();
if
(
thrift_raw_instance
&&
thrift_raw_instance_deleter
)
{
thrift_raw_instance_deleter
(
thrift_raw_instance
);
}
}
void
Thrift
Framed
Message
::
SharedDtor
()
{
void
ThriftMessage
::
SharedDtor
()
{
if
(
this
!=
default_instance_
)
{
}
}
const
::
google
::
protobuf
::
Descriptor
*
Thrift
Framed
Message
::
descriptor
()
{
const
::
google
::
protobuf
::
Descriptor
*
ThriftMessage
::
descriptor
()
{
protobuf_AssignDescriptorsOnce
();
return
Thrift
Framed
Message_descriptor_
;
return
ThriftMessage_descriptor_
;
}
const
Thrift
FramedMessage
&
ThriftFramed
Message
::
default_instance
()
{
const
Thrift
Message
&
Thrift
Message
::
default_instance
()
{
if
(
default_instance_
==
NULL
)
protobuf_AddDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto
();
return
*
default_instance_
;
}
Thrift
FramedMessage
*
ThriftFramed
Message
::
default_instance_
=
NULL
;
Thrift
Message
*
Thrift
Message
::
default_instance_
=
NULL
;
Thrift
FramedMessage
*
ThriftFramed
Message
::
New
()
const
{
return
new
Thrift
Framed
Message
;
Thrift
Message
*
Thrift
Message
::
New
()
const
{
return
new
ThriftMessage
;
}
void
Thrift
Framed
Message
::
Clear
()
{
void
ThriftMessage
::
Clear
()
{
memset
(
&
head
,
0
,
sizeof
(
head
));
body
.
clear
();
}
bool
Thrift
Framed
Message
::
MergePartialFromCodedStream
(
bool
ThriftMessage
::
MergePartialFromCodedStream
(
::
google
::
protobuf
::
io
::
CodedInputStream
*
input
)
{
#define DO_(EXPRESSION) if (!(EXPRESSION)) return false
::
google
::
protobuf
::
uint32
tag
;
...
...
@@ -179,55 +179,55 @@ bool ThriftFramedMessage::MergePartialFromCodedStream(
#undef DO_
}
void
Thrift
Framed
Message
::
SerializeWithCachedSizes
(
void
ThriftMessage
::
SerializeWithCachedSizes
(
::
google
::
protobuf
::
io
::
CodedOutputStream
*
)
const
{
}
::
google
::
protobuf
::
uint8
*
Thrift
Framed
Message
::
SerializeWithCachedSizesToArray
(
::
google
::
protobuf
::
uint8
*
ThriftMessage
::
SerializeWithCachedSizesToArray
(
::
google
::
protobuf
::
uint8
*
target
)
const
{
return
target
;
}
int
Thrift
Framed
Message
::
ByteSize
()
const
{
int
ThriftMessage
::
ByteSize
()
const
{
return
sizeof
(
thrift_binary_head_t
)
+
body
.
size
();
}
void
Thrift
Framed
Message
::
MergeFrom
(
const
::
google
::
protobuf
::
Message
&
from
)
{
void
ThriftMessage
::
MergeFrom
(
const
::
google
::
protobuf
::
Message
&
from
)
{
GOOGLE_CHECK_NE
(
&
from
,
this
);
const
Thrift
Framed
Message
*
source
=
::
google
::
protobuf
::
internal
::
dynamic_cast_if_available
<
const
Thrift
Framed
Message
*>
(
const
ThriftMessage
*
source
=
::
google
::
protobuf
::
internal
::
dynamic_cast_if_available
<
const
ThriftMessage
*>
(
&
from
);
if
(
source
==
NULL
)
{
LOG
(
ERROR
)
<<
"Can only merge from Thrift
Framed
Message"
;
LOG
(
ERROR
)
<<
"Can only merge from ThriftMessage"
;
return
;
}
else
{
MergeFrom
(
*
source
);
}
}
void
Thrift
FramedMessage
::
MergeFrom
(
const
ThriftFramed
Message
&
from
)
{
void
Thrift
Message
::
MergeFrom
(
const
Thrift
Message
&
from
)
{
GOOGLE_CHECK_NE
(
&
from
,
this
);
head
=
from
.
head
;
body
=
from
.
body
;
}
void
Thrift
Framed
Message
::
CopyFrom
(
const
::
google
::
protobuf
::
Message
&
from
)
{
void
ThriftMessage
::
CopyFrom
(
const
::
google
::
protobuf
::
Message
&
from
)
{
if
(
&
from
==
this
)
return
;
Clear
();
MergeFrom
(
from
);
}
void
Thrift
FramedMessage
::
CopyFrom
(
const
ThriftFramed
Message
&
from
)
{
void
Thrift
Message
::
CopyFrom
(
const
Thrift
Message
&
from
)
{
if
(
&
from
==
this
)
return
;
Clear
();
MergeFrom
(
from
);
}
bool
Thrift
Framed
Message
::
IsInitialized
()
const
{
bool
ThriftMessage
::
IsInitialized
()
const
{
return
true
;
}
void
Thrift
FramedMessage
::
Swap
(
ThriftFramed
Message
*
other
)
{
void
Thrift
Message
::
Swap
(
Thrift
Message
*
other
)
{
if
(
other
!=
this
)
{
const
thrift_binary_head_t
tmp
=
other
->
head
;
other
->
head
=
head
;
...
...
@@ -236,10 +236,10 @@ void ThriftFramedMessage::Swap(ThriftFramedMessage* other) {
}
}
::
google
::
protobuf
::
Metadata
Thrift
Framed
Message
::
GetMetadata
()
const
{
::
google
::
protobuf
::
Metadata
ThriftMessage
::
GetMetadata
()
const
{
protobuf_AssignDescriptorsOnce
();
::
google
::
protobuf
::
Metadata
metadata
;
metadata
.
descriptor
=
Thrift
Framed
Message_descriptor_
;
metadata
.
descriptor
=
ThriftMessage_descriptor_
;
metadata
.
reflection
=
NULL
;
return
metadata
;
}
...
...
src/brpc/thrift_
framed_
message.h
→
src/brpc/thrift_message.h
View file @
5a1b7235
...
...
@@ -16,8 +16,8 @@
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
#ifndef BRPC_THRIFT_
FRAMED_
MESSAGE_H
#define BRPC_THRIFT_
FRAMED_
MESSAGE_H
#ifndef BRPC_THRIFT_MESSAGE_H
#define BRPC_THRIFT_MESSAGE_H
#include <functional>
#include <string>
...
...
@@ -43,7 +43,7 @@ void protobuf_AssignDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto();
void
protobuf_ShutdownFile_baidu_2frpc_2fthrift_binary_5fmessage_2eproto
();
// Representing a thrift_binary request or response.
class
Thrift
Framed
Message
:
public
::
google
::
protobuf
::
Message
{
class
ThriftMessage
:
public
::
google
::
protobuf
::
Message
{
public
:
thrift_binary_head_t
head
;
butil
::
IOBuf
body
;
...
...
@@ -55,28 +55,28 @@ public:
std
::
string
method_name
;
public
:
Thrift
Framed
Message
();
virtual
~
Thrift
Framed
Message
();
ThriftMessage
();
virtual
~
ThriftMessage
();
Thrift
FramedMessage
(
const
ThriftFramed
Message
&
from
);
Thrift
Message
(
const
Thrift
Message
&
from
);
inline
Thrift
FramedMessage
&
operator
=
(
const
ThriftFramed
Message
&
from
)
{
inline
Thrift
Message
&
operator
=
(
const
Thrift
Message
&
from
)
{
CopyFrom
(
from
);
return
*
this
;
}
static
const
::
google
::
protobuf
::
Descriptor
*
descriptor
();
static
const
Thrift
Framed
Message
&
default_instance
();
static
const
ThriftMessage
&
default_instance
();
void
Swap
(
Thrift
Framed
Message
*
other
);
void
Swap
(
ThriftMessage
*
other
);
// implements Message ----------------------------------------------
Thrift
Framed
Message
*
New
()
const
;
ThriftMessage
*
New
()
const
;
void
CopyFrom
(
const
::
google
::
protobuf
::
Message
&
from
);
void
MergeFrom
(
const
::
google
::
protobuf
::
Message
&
from
);
void
CopyFrom
(
const
Thrift
Framed
Message
&
from
);
void
MergeFrom
(
const
Thrift
Framed
Message
&
from
);
void
CopyFrom
(
const
ThriftMessage
&
from
);
void
MergeFrom
(
const
ThriftMessage
&
from
);
void
Clear
();
bool
IsInitialized
()
const
;
...
...
@@ -124,21 +124,21 @@ friend void protobuf_AssignDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto();
friend
void
protobuf_ShutdownFile_baidu_2frpc_2fthrift_binary_5fmessage_2eproto
();
void
InitAsDefaultInstance
();
static
Thrift
Framed
Message
*
default_instance_
;
static
ThriftMessage
*
default_instance_
;
};
template
<
typename
T
>
class
Thrift
Message
:
public
ThriftFramed
Message
{
class
Thrift
TemplateMessage
:
public
Thrift
Message
{
public
:
ThriftMessage
()
{
Thrift
Template
Message
()
{
thrift_message_
=
new
T
;
assert
(
thrift_message_
!=
nullptr
);
}
virtual
~
ThriftMessage
()
{
delete
thrift_message_
;
}
virtual
~
Thrift
Template
Message
()
{
delete
thrift_message_
;
}
Thrift
Message
<
T
>&
operator
=
(
const
Thrift
Message
<
T
>&
other
)
{
Thrift
TemplateMessage
<
T
>&
operator
=
(
const
ThriftTemplate
Message
<
T
>&
other
)
{
*
thrift_message_
=
*
(
other
.
thrift_message_
);
return
*
this
;
}
...
...
@@ -161,6 +161,6 @@ private:
}
// namespace brpc
#endif // BRPC_THRIFT_
FRAMED_
MESSAGE_H
#endif // BRPC_THRIFT_MESSAGE_H
#endif //ENABLE_THRIFT_FRAMED_PROTOCOL
src/brpc/thrift_service.cpp
View file @
5a1b7235
...
...
@@ -23,12 +23,12 @@ namespace brpc {
BAIDU_CASSERT
(
sizeof
(
thrift_binary_head_t
)
==
4
,
sizeof_thrift_must_be_4
);
Thrift
FramedService
::
ThriftFramed
Service
()
:
_additional_space
(
0
)
{
Thrift
Service
::
Thrift
Service
()
:
_additional_space
(
0
)
{
_status
=
new
(
std
::
nothrow
)
MethodStatus
;
LOG_IF
(
FATAL
,
_status
==
NULL
)
<<
"Fail to new MethodStatus"
;
}
Thrift
FramedService
::
ThriftFramedService
(
const
ThriftFramed
ServiceOptions
&
options
)
Thrift
Service
::
ThriftService
(
const
Thrift
ServiceOptions
&
options
)
:
_status
(
NULL
),
_additional_space
(
options
.
additional_space
)
{
if
(
options
.
generate_status
)
{
_status
=
new
(
std
::
nothrow
)
MethodStatus
;
...
...
@@ -36,16 +36,16 @@ ThriftFramedService::ThriftFramedService(const ThriftFramedServiceOptions& optio
}
}
Thrift
FramedService
::~
ThriftFramed
Service
()
{
Thrift
Service
::~
Thrift
Service
()
{
delete
_status
;
_status
=
NULL
;
}
void
Thrift
Framed
Service
::
Describe
(
std
::
ostream
&
os
,
const
DescribeOptions
&
)
const
{
void
ThriftService
::
Describe
(
std
::
ostream
&
os
,
const
DescribeOptions
&
)
const
{
os
<<
butil
::
class_name_str
(
*
this
);
}
void
Thrift
Framed
Service
::
Expose
(
const
butil
::
StringPiece
&
prefix
)
{
void
ThriftService
::
Expose
(
const
butil
::
StringPiece
&
prefix
)
{
_cached_name
=
butil
::
class_name_str
(
*
this
);
if
(
_status
==
NULL
)
{
return
;
...
...
src/brpc/thrift_service.h
View file @
5a1b7235
...
...
@@ -20,7 +20,7 @@
#define BRPC_THRIFT_SERVICE_H
#include "brpc/controller.h" // Controller
#include "brpc/thrift_
framed_message.h" // ThriftFramed
Message
#include "brpc/thrift_
message.h" // Thrift
Message
#include "brpc/describable.h"
...
...
@@ -31,14 +31,14 @@ class Server;
class
MethodStatus
;
class
StatusService
;
namespace
policy
{
void
ProcessThrift
Framed
Request
(
InputMessageBase
*
msg_base
);
void
ProcessThriftRequest
(
InputMessageBase
*
msg_base
);
}
// The continuation of request processing. Namely send response back to client.
// NOTE: you DON'T need to inherit this class or create instance of this class.
class
Thrift
Framed
Closure
:
public
google
::
protobuf
::
Closure
{
class
ThriftClosure
:
public
google
::
protobuf
::
Closure
{
public
:
explicit
Thrift
Framed
Closure
(
void
*
additional_space
);
explicit
ThriftClosure
(
void
*
additional_space
);
// [Required] Call this to send response back to the client.
void
Run
();
...
...
@@ -46,7 +46,7 @@ public:
// [Optional] Set the full method name. If unset, use name of the service.
void
SetMethodName
(
const
std
::
string
&
full_method_name
);
// The space required by subclass at Thrift
Framed
ServiceOptions. subclass may
// The space required by subclass at ThriftServiceOptions. subclass may
// utilizes this feature to save the cost of allocating closure separately.
// If subclass does not require space, this return value is NULL.
void
*
additional_space
()
{
return
_additional_space
;
}
...
...
@@ -58,24 +58,24 @@ public:
void
DoNotRespond
();
private
:
friend
void
policy
::
ProcessThrift
Framed
Request
(
InputMessageBase
*
msg_base
);
friend
class
DeleteThrift
Framed
Closure
;
friend
void
policy
::
ProcessThriftRequest
(
InputMessageBase
*
msg_base
);
friend
class
DeleteThriftClosure
;
// Only callable by Run().
~
Thrift
Framed
Closure
();
~
ThriftClosure
();
Socket
*
_socket_ptr
;
const
Server
*
_server
;
int64_t
_start_parse_us
;
Thrift
Framed
Message
_request
;
Thrift
Framed
Message
_response
;
ThriftMessage
_request
;
ThriftMessage
_response
;
bool
_do_respond
;
void
*
_additional_space
;
Controller
_controller
;
};
struct
Thrift
Framed
ServiceOptions
{
Thrift
Framed
ServiceOptions
()
:
generate_status
(
true
),
additional_space
(
0
)
{}
Thrift
Framed
ServiceOptions
(
bool
generate_status2
,
size_t
additional_space2
)
struct
ThriftServiceOptions
{
ThriftServiceOptions
()
:
generate_status
(
true
),
additional_space
(
0
)
{}
ThriftServiceOptions
(
bool
generate_status2
,
size_t
additional_space2
)
:
generate_status
(
generate_status2
)
,
additional_space
(
additional_space2
)
{}
...
...
@@ -84,11 +84,11 @@ struct ThriftFramedServiceOptions {
};
// Inherit this class to let brpc server understands thrift_binary requests.
class
Thrift
Framed
Service
:
public
Describable
{
class
ThriftService
:
public
Describable
{
public
:
Thrift
Framed
Service
();
Thrift
FramedService
(
const
ThriftFramed
ServiceOptions
&
);
virtual
~
Thrift
Framed
Service
();
ThriftService
();
Thrift
Service
(
const
Thrift
ServiceOptions
&
);
virtual
~
ThriftService
();
// Implement this method to handle thrift_binary requests. Notice that this
// method can be called with a failed Controller(something wrong with the
...
...
@@ -102,24 +102,24 @@ public:
// done You must call done->Run() to end the processing.
virtual
void
ProcessThriftFramedRequest
(
const
Server
&
server
,
Controller
*
controller
,
Thrift
Framed
Message
*
request
,
Thrift
Framed
Message
*
response
,
Thrift
Framed
Closure
*
done
)
=
0
;
ThriftMessage
*
request
,
ThriftMessage
*
response
,
ThriftClosure
*
done
)
=
0
;
// Put descriptions into the stream.
void
Describe
(
std
::
ostream
&
os
,
const
DescribeOptions
&
)
const
;
private
:
DISALLOW_COPY_AND_ASSIGN
(
Thrift
Framed
Service
);
friend
class
Thrift
Framed
Closure
;
friend
void
policy
::
ProcessThrift
Framed
Request
(
InputMessageBase
*
msg_base
);
DISALLOW_COPY_AND_ASSIGN
(
ThriftService
);
friend
class
ThriftClosure
;
friend
void
policy
::
ProcessThriftRequest
(
InputMessageBase
*
msg_base
);
friend
class
StatusService
;
friend
class
Server
;
private
:
void
Expose
(
const
butil
::
StringPiece
&
prefix
);
// Tracking status of non Thrift
Framed
PbService
// Tracking status of non ThriftPbService
MethodStatus
*
_status
;
size_t
_additional_space
;
std
::
string
_cached_name
;
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment