Commit 8dbe7b38 authored by gejun's avatar gejun

Patch svn r35243 r35258 r35263

parent 969bfb79
...@@ -200,6 +200,9 @@ void Controller::InternalReset(bool in_constructor) { ...@@ -200,6 +200,9 @@ void Controller::InternalReset(bool in_constructor) {
// defined in header. Better for cpu cache and faster for lookup. // defined in header. Better for cpu cache and faster for lookup.
_span = NULL; _span = NULL;
_flags = 0; _flags = 0;
#ifndef BAIDU_INTERNAL
set_pb_bytes_to_base64(true);
#endif
_error_code = 0; _error_code = 0;
_remote_side = butil::EndPoint(); _remote_side = butil::EndPoint();
_local_side = butil::EndPoint(); _local_side = butil::EndPoint();
......
...@@ -115,6 +115,7 @@ friend void policy::ProcessMongoRequest(InputMessageBase*); ...@@ -115,6 +115,7 @@ friend void policy::ProcessMongoRequest(InputMessageBase*);
static const uint32_t FLAGS_CLOSE_CONNECTION = (1 << 8); static const uint32_t FLAGS_CLOSE_CONNECTION = (1 << 8);
static const uint32_t FLAGS_LOG_ID = (1 << 9); // log_id is set static const uint32_t FLAGS_LOG_ID = (1 << 9); // log_id is set
static const uint32_t FLAGS_REQUEST_CODE = (1 << 10); static const uint32_t FLAGS_REQUEST_CODE = (1 << 10);
static const uint32_t FLAGS_PB_BYTES_TO_BASE64 = (1 << 11);
public: public:
Controller(); Controller();
...@@ -257,6 +258,11 @@ public: ...@@ -257,6 +258,11 @@ public:
// may wish to suppress the error completely. To do this, call this // may wish to suppress the error completely. To do this, call this
// method before doing the RPC. // method before doing the RPC.
void ignore_eovercrowded() { add_flag(FLAGS_IGNORE_EOVERCROWDED); } void ignore_eovercrowded() { add_flag(FLAGS_IGNORE_EOVERCROWDED); }
// Set if the field of bytes in protobuf message should be encoded
// to base64 string in HTTP request.
void set_pb_bytes_to_base64(bool f) { set_flag(FLAGS_PB_BYTES_TO_BASE64, f); }
bool has_pb_bytes_to_base64() { return has_flag(FLAGS_PB_BYTES_TO_BASE64); }
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
// Server-side methods. // Server-side methods.
......
...@@ -349,7 +349,9 @@ void ProcessHttpResponse(InputMessageBase* msg) { ...@@ -349,7 +349,9 @@ void ProcessHttpResponse(InputMessageBase* msg) {
} else { } else {
butil::IOBufAsZeroCopyInputStream wrapper(res_body); butil::IOBufAsZeroCopyInputStream wrapper(res_body);
std::string err; std::string err;
if (!json2pb::JsonToProtoMessage(&wrapper, cntl->response(), &err)) { json2pb::Json2PbOptions options;
options.base64_to_bytes = cntl->has_pb_bytes_to_base64();
if (!json2pb::JsonToProtoMessage(&wrapper, cntl->response(), options, &err)) {
cntl->SetFailed(ERESPONSE, "Fail to parse content, %s", err.c_str()); cntl->SetFailed(ERESPONSE, "Fail to parse content, %s", err.c_str());
break; break;
} }
...@@ -401,6 +403,7 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/, ...@@ -401,6 +403,7 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/,
// Serialize content as json // Serialize content as json
std::string err; std::string err;
json2pb::Pb2JsonOptions opt; json2pb::Pb2JsonOptions opt;
opt.bytes_to_base64 = cntl->has_pb_bytes_to_base64();
opt.enum_option = (FLAGS_pb_enum_as_number opt.enum_option = (FLAGS_pb_enum_as_number
? json2pb::OUTPUT_ENUM_BY_NUMBER ? json2pb::OUTPUT_ENUM_BY_NUMBER
: json2pb::OUTPUT_ENUM_BY_NAME); : json2pb::OUTPUT_ENUM_BY_NAME);
...@@ -612,6 +615,7 @@ static void SendHttpResponse(Controller *cntl, ...@@ -612,6 +615,7 @@ static void SendHttpResponse(Controller *cntl,
} else { } else {
std::string err; std::string err;
json2pb::Pb2JsonOptions opt; json2pb::Pb2JsonOptions opt;
opt.bytes_to_base64 = cntl->has_pb_bytes_to_base64();
opt.enum_option = (FLAGS_pb_enum_as_number opt.enum_option = (FLAGS_pb_enum_as_number
? json2pb::OUTPUT_ENUM_BY_NUMBER ? json2pb::OUTPUT_ENUM_BY_NUMBER
: json2pb::OUTPUT_ENUM_BY_NAME); : json2pb::OUTPUT_ENUM_BY_NAME);
...@@ -1195,7 +1199,7 @@ void ProcessHttpRequest(InputMessageBase *msg) { ...@@ -1195,7 +1199,7 @@ void ProcessHttpRequest(InputMessageBase *msg) {
} }
// NOTE: accesses to builtin services are not counted as part of // NOTE: accesses to builtin services are not counted as part of
// concurrency, therefore are not limited by ServerOptions.max_concurrency. // concurrency, therefore are not limited by ServerOptions.max_concurrency.
if (!sp->is_builtin_service && !sp->is_tabbed) { if (!sp->is_builtin_service && !sp->params.is_tabbed) {
if (!server_accessor.AddConcurrency(cntl.get())) { if (!server_accessor.AddConcurrency(cntl.get())) {
cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d", cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",
server->options().max_concurrency); server->options().max_concurrency);
...@@ -1226,7 +1230,7 @@ void ProcessHttpRequest(InputMessageBase *msg) { ...@@ -1226,7 +1230,7 @@ void ProcessHttpRequest(InputMessageBase *msg) {
cntl->SetFailed("Fail to new req or res"); cntl->SetFailed("Fail to new req or res");
return SendHttpResponse(cntl.release(), server, method_status); return SendHttpResponse(cntl.release(), server, method_status);
} }
if (sp->allow_http_body_to_pb && if (sp->params.allow_http_body_to_pb &&
method->input_type()->field_count() > 0) { method->input_type()->field_count() > 0) {
// A protobuf service. No matter if Content-type is set to // A protobuf service. No matter if Content-type is set to
// applcation/json or body is empty, we have to treat body as a json // applcation/json or body is empty, we have to treat body as a json
...@@ -1263,7 +1267,10 @@ void ProcessHttpRequest(InputMessageBase *msg) { ...@@ -1263,7 +1267,10 @@ void ProcessHttpRequest(InputMessageBase *msg) {
} else { } else {
butil::IOBufAsZeroCopyInputStream wrapper(req_body); butil::IOBufAsZeroCopyInputStream wrapper(req_body);
std::string err; std::string err;
if (!json2pb::JsonToProtoMessage(&wrapper, req.get(), &err)) { json2pb::Json2PbOptions options;
options.base64_to_bytes = sp->params.pb_bytes_to_base64;
cntl->set_pb_bytes_to_base64(sp->params.pb_bytes_to_base64);
if (!json2pb::JsonToProtoMessage(&wrapper, req.get(), options, &err)) {
cntl->SetFailed(EREQUEST, "Fail to parse http body as %s, %s", cntl->SetFailed(EREQUEST, "Fail to parse http body as %s, %s",
req->GetDescriptor()->full_name().c_str(), err.c_str()); req->GetDescriptor()->full_name().c_str(), err.c_str());
return SendHttpResponse(cntl.release(), server, method_status); return SendHttpResponse(cntl.release(), server, method_status);
......
...@@ -257,8 +257,7 @@ RestfulMap::~RestfulMap() { ...@@ -257,8 +257,7 @@ RestfulMap::~RestfulMap() {
// This function inserts a mapping into _dedup_map. // This function inserts a mapping into _dedup_map.
bool RestfulMap::AddMethod(const RestfulMethodPath& path, bool RestfulMap::AddMethod(const RestfulMethodPath& path,
google::protobuf::Service* service, google::protobuf::Service* service,
bool is_tabbed, const Server::MethodProperty::OpaqueParams& params,
bool allow_http_body_to_pb,
const std::string& method_name, const std::string& method_name,
MethodStatus* status) { MethodStatus* status) {
if (service == NULL) { if (service == NULL) {
...@@ -289,8 +288,7 @@ bool RestfulMap::AddMethod(const RestfulMethodPath& path, ...@@ -289,8 +288,7 @@ bool RestfulMap::AddMethod(const RestfulMethodPath& path,
RestfulMethodProperty& info = _dedup_map[dedup_key]; RestfulMethodProperty& info = _dedup_map[dedup_key];
info.is_builtin_service = false; info.is_builtin_service = false;
info.own_method_status = false; info.own_method_status = false;
info.is_tabbed = is_tabbed; info.params = params;
info.allow_http_body_to_pb = allow_http_body_to_pb;
info.service = service; info.service = service;
info.method = md; info.method = md;
info.status = status; info.status = status;
......
...@@ -70,8 +70,7 @@ public: ...@@ -70,8 +70,7 @@ public:
// Returns MethodStatus of the method on success, NULL otherwise. // Returns MethodStatus of the method on success, NULL otherwise.
bool AddMethod(const RestfulMethodPath& path, bool AddMethod(const RestfulMethodPath& path,
google::protobuf::Service* service, google::protobuf::Service* service,
bool is_tabbed, const Server::MethodProperty::OpaqueParams& params,
bool allow_http_body_to_pb,
const std::string& method_name, const std::string& method_name,
MethodStatus* status); MethodStatus* status);
......
...@@ -148,11 +148,15 @@ ServerOptions::ServerOptions() ...@@ -148,11 +148,15 @@ ServerOptions::ServerOptions()
} }
} }
Server::MethodProperty::OpaqueParams::OpaqueParams()
: is_tabbed(false)
, allow_http_body_to_pb(true)
, pb_bytes_to_base64(false) {
}
Server::MethodProperty::MethodProperty() Server::MethodProperty::MethodProperty()
: is_builtin_service(false) : is_builtin_service(false)
, own_method_status(false) , own_method_status(false)
, is_tabbed(false)
, allow_http_body_to_pb(true)
, http_url(NULL) , http_url(NULL)
, service(NULL) , service(NULL)
, method(NULL) , method(NULL)
...@@ -388,7 +392,7 @@ Server::Server(ProfilerLinker) ...@@ -388,7 +392,7 @@ Server::Server(ProfilerLinker)
, _tab_info_list(NULL) , _tab_info_list(NULL)
, _global_restful_map(NULL) , _global_restful_map(NULL)
, _last_start_time(0) , _last_start_time(0)
, _derivative_thread(0) , _derivative_thread(INVALID_BTHREAD)
, _keytable_pool(NULL) , _keytable_pool(NULL)
, _concurrency(0) { , _concurrency(0) {
BAIDU_CASSERT(offsetof(Server, _concurrency) % 64 == 0, BAIDU_CASSERT(offsetof(Server, _concurrency) % 64 == 0,
...@@ -398,14 +402,6 @@ Server::Server(ProfilerLinker) ...@@ -398,14 +402,6 @@ Server::Server(ProfilerLinker)
Server::~Server() { Server::~Server() {
Stop(0); Stop(0);
Join(); Join();
// Notice that we don't do this in Stop()/Join() because we may need to
// check the derivative vars during the joining process (especially when
// the server is bugged and stuck);
if (_derivative_thread != 0) {
bthread_stop(_derivative_thread);
bthread_join(_derivative_thread, NULL);
_derivative_thread = 0;
}
ClearServices(); ClearServices();
FreeSSLContexts(); FreeSSLContexts();
...@@ -944,9 +940,9 @@ int Server::StartInternal(const butil::ip_t& ip, ...@@ -944,9 +940,9 @@ int Server::StartInternal(const butil::ip_t& ip,
PutPidFileIfNeeded(); PutPidFileIfNeeded();
// Launch _derivative_thread if it's not started. // Launch _derivative_thread.
if (_derivative_thread == 0 && CHECK_EQ(INVALID_BTHREAD, _derivative_thread);
bthread_start_background(&_derivative_thread, NULL, if (bthread_start_background(&_derivative_thread, NULL,
UpdateDerivedVars, this) != 0) { UpdateDerivedVars, this) != 0) {
LOG(ERROR) << "Fail to create _derivative_thread"; LOG(ERROR) << "Fail to create _derivative_thread";
return -1; return -1;
...@@ -1044,7 +1040,7 @@ int Server::Join() { ...@@ -1044,7 +1040,7 @@ int Server::Join() {
if (_session_local_data_pool) { if (_session_local_data_pool) {
// We can't delete the pool right here because there's a bvar watching // We can't delete the pool right here because there's a bvar watching
// this pool in _derivative_thread which will not quit until server's dtor // this pool in _derivative_thread which does not quit yet.
_session_local_data_pool->Reset(NULL); _session_local_data_pool->Reset(NULL);
} }
...@@ -1066,6 +1062,16 @@ int Server::Join() { ...@@ -1066,6 +1062,16 @@ int Server::Join() {
CHECK_EQ(0, bthread_key_delete(_tl_options.tls_key)); CHECK_EQ(0, bthread_key_delete(_tl_options.tls_key));
_tl_options.tls_key = INVALID_BTHREAD_KEY; _tl_options.tls_key = INVALID_BTHREAD_KEY;
} }
// Have to join _derivative_thread, which may assume that server is running
// and services in server are not mutated, otherwise data race happens
// between Add/RemoveService after Join() and the thread.
if (_derivative_thread != INVALID_BTHREAD) {
bthread_stop(_derivative_thread);
bthread_join(_derivative_thread, NULL);
_derivative_thread = INVALID_BTHREAD;
}
g_running_server_count.fetch_sub(1, butil::memory_order_relaxed); g_running_server_count.fetch_sub(1, butil::memory_order_relaxed);
_status = READY; _status = READY;
return 0; return 0;
...@@ -1117,8 +1123,9 @@ int Server::AddServiceInternal(google::protobuf::Service* service, ...@@ -1117,8 +1123,9 @@ int Server::AddServiceInternal(google::protobuf::Service* service,
MethodProperty mp; MethodProperty mp;
mp.is_builtin_service = is_builtin_service; mp.is_builtin_service = is_builtin_service;
mp.own_method_status = true; mp.own_method_status = true;
mp.is_tabbed = !!tabbed; mp.params.is_tabbed = !!tabbed;
mp.allow_http_body_to_pb = svc_opt.allow_http_body_to_pb; mp.params.allow_http_body_to_pb = svc_opt.allow_http_body_to_pb;
mp.params.pb_bytes_to_base64 = svc_opt.pb_bytes_to_base64;
mp.service = service; mp.service = service;
mp.method = md; mp.method = md;
mp.status = new MethodStatus; mp.status = new MethodStatus;
...@@ -1201,9 +1208,12 @@ int Server::AddServiceInternal(google::protobuf::Service* service, ...@@ -1201,9 +1208,12 @@ int Server::AddServiceInternal(google::protobuf::Service* service,
if (_global_restful_map == NULL) { if (_global_restful_map == NULL) {
_global_restful_map = new RestfulMap(""); _global_restful_map = new RestfulMap("");
} }
MethodProperty::OpaqueParams params;
params.is_tabbed = !!tabbed;
params.allow_http_body_to_pb = svc_opt.allow_http_body_to_pb;
params.pb_bytes_to_base64 = svc_opt.pb_bytes_to_base64;
if (!_global_restful_map->AddMethod( if (!_global_restful_map->AddMethod(
mappings[i].path, service, !!tabbed, mappings[i].path, service, params,
svc_opt.allow_http_body_to_pb,
mappings[i].method_name, mp->status)) { mappings[i].method_name, mp->status)) {
LOG(ERROR) << "Fail to map `" << mappings[i].path LOG(ERROR) << "Fail to map `" << mappings[i].path
<< "' to `" << full_method_name << '\''; << "' to `" << full_method_name << '\'';
...@@ -1235,8 +1245,11 @@ int Server::AddServiceInternal(google::protobuf::Service* service, ...@@ -1235,8 +1245,11 @@ int Server::AddServiceInternal(google::protobuf::Service* service,
} else { } else {
m = sp->restful_map; m = sp->restful_map;
} }
if (!m->AddMethod(mappings[i].path, service, !!tabbed, MethodProperty::OpaqueParams params;
svc_opt.allow_http_body_to_pb, params.is_tabbed = !!tabbed;
params.allow_http_body_to_pb = svc_opt.allow_http_body_to_pb;
params.pb_bytes_to_base64 = svc_opt.pb_bytes_to_base64;
if (!m->AddMethod(mappings[i].path, service, params,
mappings[i].method_name, mp->status)) { mappings[i].method_name, mp->status)) {
LOG(ERROR) << "Fail to map `" << mappings[i].path << "' to `" LOG(ERROR) << "Fail to map `" << mappings[i].path << "' to `"
<< sd->full_name() << '.' << mappings[i].method_name << sd->full_name() << '.' << mappings[i].method_name
...@@ -1288,8 +1301,13 @@ int Server::AddServiceInternal(google::protobuf::Service* service, ...@@ -1288,8 +1301,13 @@ int Server::AddServiceInternal(google::protobuf::Service* service,
ServiceOptions::ServiceOptions() ServiceOptions::ServiceOptions()
: ownership(SERVER_DOESNT_OWN_SERVICE) : ownership(SERVER_DOESNT_OWN_SERVICE)
, allow_http_body_to_pb(true) { , allow_http_body_to_pb(true)
} #ifdef BAIDU_INTERNAL
, pb_bytes_to_base64(false)
#else
, pb_bytes_to_base64(true)
#endif
{}
int Server::AddService(google::protobuf::Service* service, int Server::AddService(google::protobuf::Service* service,
ServiceOwnership ownership) { ServiceOwnership ownership) {
......
...@@ -331,6 +331,11 @@ struct ServiceOptions { ...@@ -331,6 +331,11 @@ struct ServiceOptions {
// with existing clients. // with existing clients.
// Default: true // Default: true
bool allow_http_body_to_pb; bool allow_http_body_to_pb;
// decode json string to protobuf bytes using base64 decoding when this
// option is turned on.
// Default: false if BAIDU_INTERNAL is defined, otherwise true
bool pb_bytes_to_base64;
}; };
// Represent ports inside [min_port, max_port] // Represent ports inside [min_port, max_port]
...@@ -373,8 +378,15 @@ public: ...@@ -373,8 +378,15 @@ public:
struct MethodProperty { struct MethodProperty {
bool is_builtin_service; bool is_builtin_service;
bool own_method_status; bool own_method_status;
bool is_tabbed; // Parameters which have nothing to with management of services, but
bool allow_http_body_to_pb; // will be used when the service is queried.
struct OpaqueParams {
bool is_tabbed;
bool allow_http_body_to_pb;
bool pb_bytes_to_base64;
OpaqueParams();
};
OpaqueParams params;
// NULL if service of the method was never added as restful. // NULL if service of the method was never added as restful.
// "@path1 @path2 ..." if the method was mapped from paths. // "@path1 @path2 ..." if the method was mapped from paths.
std::string* http_url; std::string* http_url;
......
...@@ -77,6 +77,7 @@ public: ...@@ -77,6 +77,7 @@ public:
bool g_delete = false; bool g_delete = false;
const std::string EXP_REQUEST = "hello"; const std::string EXP_REQUEST = "hello";
const std::string EXP_RESPONSE = "world"; const std::string EXP_RESPONSE = "world";
const std::string EXP_REQUEST_BASE64 = "aGVsbG8=";
class EchoServiceImpl : public test::EchoService { class EchoServiceImpl : public test::EchoService {
public: public:
...@@ -100,6 +101,24 @@ public: ...@@ -100,6 +101,24 @@ public:
} }
} }
virtual void BytesEcho1(google::protobuf::RpcController*,
const test::BytesRequest* request,
test::BytesResponse* response,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
EXPECT_EQ(EXP_REQUEST, request->databytes());
response->set_databytes(request->databytes());
}
virtual void BytesEcho2(google::protobuf::RpcController*,
const test::BytesRequest* request,
test::BytesResponse* response,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
EXPECT_EQ(EXP_REQUEST_BASE64, request->databytes());
response->set_databytes(request->databytes());
}
butil::atomic<int64_t> count; butil::atomic<int64_t> count;
}; };
...@@ -1185,6 +1204,42 @@ TEST_F(ServerTest, add_builtin_service) { ...@@ -1185,6 +1204,42 @@ TEST_F(ServerTest, add_builtin_service) {
} }
} }
TEST_F(ServerTest, base64_to_string) {
// We test two cases as following. If these two tests can be passed, we
// can prove that the pb_bytes_to_base64 flag is working in both client side
// and server side.
// 1. Client sets pb_bytes_to_base64 and server also sets pb_bytes_to_base64
// 2. Client sets pb_bytes_to_base64, but server doesn't set pb_bytes_to_base64
for (int i = 0; i < 2; ++i) {
brpc::Server server;
EchoServiceImpl echo_svc;
brpc::ServiceOptions service_opt;
service_opt.pb_bytes_to_base64 = (i == 0);
ASSERT_EQ(0, server.AddService(&echo_svc,
service_opt));
ASSERT_EQ(0, server.Start(8613, NULL));
brpc::Channel chan;
brpc::ChannelOptions opt;
opt.protocol = brpc::PROTOCOL_HTTP;
ASSERT_EQ(0, chan.Init("localhost:8613", &opt));
brpc::Controller cntl;
cntl.http_request().uri() = "/EchoService/BytesEcho" +
butil::string_printf("%d", i + 1);
cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
cntl.http_request().set_content_type("application/json");
cntl.set_pb_bytes_to_base64(true);
test::BytesRequest req;
test::BytesResponse res;
req.set_databytes(EXP_REQUEST);
chan.CallMethod(NULL, &cntl, &req, &res, NULL);
EXPECT_FALSE(cntl.Failed());
EXPECT_EQ(EXP_REQUEST, res.databytes());
server.Stop(0);
server.Join();
}
}
TEST_F(ServerTest, too_big_message) { TEST_F(ServerTest, too_big_message) {
EchoServiceImpl echo_svc; EchoServiceImpl echo_svc;
brpc::Server server; brpc::Server server;
......
...@@ -21,6 +21,14 @@ message ComboRequest { ...@@ -21,6 +21,14 @@ message ComboRequest {
repeated EchoRequest requests = 1; repeated EchoRequest requests = 1;
}; };
message BytesRequest {
required bytes databytes = 1;
};
message BytesResponse {
required bytes databytes = 1;
};
message ComboResponse { message ComboResponse {
repeated EchoResponse responses = 1; repeated EchoResponse responses = 1;
}; };
...@@ -28,6 +36,8 @@ message ComboResponse { ...@@ -28,6 +36,8 @@ message ComboResponse {
service EchoService { service EchoService {
rpc Echo(EchoRequest) returns (EchoResponse); rpc Echo(EchoRequest) returns (EchoResponse);
rpc ComboEcho(ComboRequest) returns (ComboResponse); rpc ComboEcho(ComboRequest) returns (ComboResponse);
rpc BytesEcho1(BytesRequest) returns (BytesResponse);
rpc BytesEcho2(BytesRequest) returns (BytesResponse);
} }
message HttpRequest {} message HttpRequest {}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment