Commit 02d8790f authored by gejun's avatar gejun

set default content-type by protocol-param

parent 972731c5
...@@ -382,6 +382,10 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method, ...@@ -382,6 +382,10 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
} }
// HTTP needs this field to be set before any SetFailed() // HTTP needs this field to be set before any SetFailed()
cntl->_request_protocol = _options.protocol; cntl->_request_protocol = _options.protocol;
if (_options.protocol.has_param()) {
CHECK(cntl->protocol_param().empty());
cntl->protocol_param() = _options.protocol.param();
}
cntl->_preferred_index = _preferred_index; cntl->_preferred_index = _preferred_index;
cntl->_retry_policy = _options.retry_policy; cntl->_retry_policy = _options.retry_policy;
if (_options.enable_circuit_breaker) { if (_options.enable_circuit_breaker) {
......
...@@ -45,14 +45,6 @@ public: ...@@ -45,14 +45,6 @@ public:
_cntl->OnVersionedRPCReturned(info, false, saved_error); _cntl->OnVersionedRPCReturned(info, false, saved_error);
} }
ConnectionType connection_type() const {
return _cntl->_connection_type;
}
int current_retry_count() const {
return _cntl->_current_call.nretry;
}
ControllerPrivateAccessor &set_peer_id(SocketId peer_id) { ControllerPrivateAccessor &set_peer_id(SocketId peer_id) {
_cntl->_current_call.peer_id = peer_id; _cntl->_current_call.peer_id = peer_id;
return *this; return *this;
...@@ -133,6 +125,9 @@ public: ...@@ -133,6 +125,9 @@ public:
_cntl->add_flag(Controller::FLAGS_REQUEST_WITH_AUTH); _cntl->add_flag(Controller::FLAGS_REQUEST_WITH_AUTH);
} }
std::string& protocol_param() { return _cntl->protocol_param(); }
const std::string& protocol_param() const { return _cntl->protocol_param(); }
private: private:
Controller* _cntl; Controller* _cntl;
}; };
......
...@@ -69,6 +69,7 @@ public: ...@@ -69,6 +69,7 @@ public:
const std::string& content_type() const { return _content_type; } const std::string& content_type() const { return _content_type; }
void set_content_type(const std::string& type) { _content_type = type; } void set_content_type(const std::string& type) { _content_type = type; }
void set_content_type(const char* type) { _content_type = type; } void set_content_type(const char* type) { _content_type = type; }
std::string& mutable_content_type() { return _content_type; }
// Get value of a header which is case-insensitive according to: // Get value of a header which is case-insensitive according to:
// https://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.2 // https://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.2
......
...@@ -450,34 +450,47 @@ void ProcessHttpResponse(InputMessageBase* msg) { ...@@ -450,34 +450,47 @@ void ProcessHttpResponse(InputMessageBase* msg) {
void SerializeHttpRequest(butil::IOBuf* /*not used*/, void SerializeHttpRequest(butil::IOBuf* /*not used*/,
Controller* cntl, Controller* cntl,
const google::protobuf::Message* request) { const google::protobuf::Message* pbreq) {
HttpHeader& hreq = cntl->http_request();
const bool is_http2 = (cntl->request_protocol() == PROTOCOL_HTTP2); const bool is_http2 = (cntl->request_protocol() == PROTOCOL_HTTP2);
bool is_grpc = false; bool is_grpc = false;
if (request != NULL) { ControllerPrivateAccessor accessor(cntl);
if (!accessor.protocol_param().empty() && hreq.content_type().empty()) {
const std::string& param = accessor.protocol_param();
if (param.find('/') == std::string::npos) {
std::string& s = hreq.mutable_content_type();
s.reserve(12 + param.size());
s.append("application/");
s.append(param);
} else {
hreq.set_content_type(param);
}
}
if (pbreq != NULL) {
// If request is not NULL, message body will be serialized proto/json, // If request is not NULL, message body will be serialized proto/json,
if (!request->IsInitialized()) { if (!pbreq->IsInitialized()) {
return cntl->SetFailed( return cntl->SetFailed(
EREQUEST, "Missing required fields in request: %s", EREQUEST, "Missing required fields in request: %s",
request->InitializationErrorString().c_str()); pbreq->InitializationErrorString().c_str());
} }
if (!cntl->request_attachment().empty()) { if (!cntl->request_attachment().empty()) {
return cntl->SetFailed(EREQUEST, "request_attachment must be empty " return cntl->SetFailed(EREQUEST, "request_attachment must be empty "
"when request is not NULL"); "when request is not NULL");
} }
HttpContentType content_type = HTTP_CONTENT_OTHERS; HttpContentType content_type = HTTP_CONTENT_OTHERS;
if (cntl->http_request().content_type().empty()) { if (hreq.content_type().empty()) {
// Set content-type if user did not. // Set content-type if user did not.
// Note that http1.x defaults to json and h2 defaults to pb. // Note that http1.x defaults to json and h2 defaults to pb.
if (is_http2) { if (is_http2) {
content_type = HTTP_CONTENT_PROTO; content_type = HTTP_CONTENT_PROTO;
cntl->http_request().set_content_type(common->CONTENT_TYPE_PROTO); hreq.set_content_type(common->CONTENT_TYPE_PROTO);
} else { } else {
content_type = HTTP_CONTENT_JSON; content_type = HTTP_CONTENT_JSON;
cntl->http_request().set_content_type(common->CONTENT_TYPE_JSON); hreq.set_content_type(common->CONTENT_TYPE_JSON);
} }
} else { } else {
bool is_grpc_ct = false; bool is_grpc_ct = false;
content_type = ParseContentType(cntl->http_request().content_type(), content_type = ParseContentType(hreq.content_type(),
&is_grpc_ct); &is_grpc_ct);
is_grpc = (is_http2 && is_grpc_ct); is_grpc = (is_http2 && is_grpc_ct);
} }
...@@ -485,10 +498,10 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/, ...@@ -485,10 +498,10 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/,
butil::IOBufAsZeroCopyOutputStream wrapper(&cntl->request_attachment()); butil::IOBufAsZeroCopyOutputStream wrapper(&cntl->request_attachment());
if (content_type == HTTP_CONTENT_PROTO) { if (content_type == HTTP_CONTENT_PROTO) {
// Serialize content as protobuf // Serialize content as protobuf
if (!request->SerializeToZeroCopyStream(&wrapper)) { if (!pbreq->SerializeToZeroCopyStream(&wrapper)) {
cntl->request_attachment().clear(); cntl->request_attachment().clear();
return cntl->SetFailed(EREQUEST, "Fail to serialize %s", return cntl->SetFailed(EREQUEST, "Fail to serialize %s",
request->GetTypeName().c_str()); pbreq->GetTypeName().c_str());
} }
} else if (content_type == HTTP_CONTENT_JSON) { } else if (content_type == HTTP_CONTENT_JSON) {
std::string err; std::string err;
...@@ -498,24 +511,25 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/, ...@@ -498,24 +511,25 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/,
opt.enum_option = (FLAGS_pb_enum_as_number opt.enum_option = (FLAGS_pb_enum_as_number
? json2pb::OUTPUT_ENUM_BY_NUMBER ? json2pb::OUTPUT_ENUM_BY_NUMBER
: json2pb::OUTPUT_ENUM_BY_NAME); : json2pb::OUTPUT_ENUM_BY_NAME);
if (!json2pb::ProtoMessageToJson(*request, &wrapper, opt, &err)) { if (!json2pb::ProtoMessageToJson(*pbreq, &wrapper, opt, &err)) {
cntl->request_attachment().clear(); cntl->request_attachment().clear();
return cntl->SetFailed(EREQUEST, "Fail to convert request to json, %s", err.c_str()); return cntl->SetFailed(
EREQUEST, "Fail to convert request to json, %s", err.c_str());
} }
} else { } else {
return cntl->SetFailed(EREQUEST, "Unknown content_type=%s", return cntl->SetFailed(
cntl->http_request().content_type().c_str()); EREQUEST, "Cannot serialize pb request according to content_type=%s",
hreq.content_type().c_str());
} }
} else { } else {
// Use request_attachment. // Use request_attachment.
// TODO: Checking required fields of http header. // TODO: Checking required fields of http header.
} }
// Make RPC fail if uri() is not OK (previous SetHttpURL/operator= failed) // Make RPC fail if uri() is not OK (previous SetHttpURL/operator= failed)
if (!cntl->http_request().uri().status().ok()) { if (!hreq.uri().status().ok()) {
return cntl->SetFailed(EREQUEST, "%s", return cntl->SetFailed(EREQUEST, "%s",
cntl->http_request().uri().status().error_cstr()); hreq.uri().status().error_cstr());
} }
ControllerPrivateAccessor accessor(cntl);
bool grpc_compressed = false; bool grpc_compressed = false;
if (cntl->request_compress_type() != COMPRESS_TYPE_NONE) { if (cntl->request_compress_type() != COMPRESS_TYPE_NONE) {
if (cntl->request_compress_type() != COMPRESS_TYPE_GZIP) { if (cntl->request_compress_type() != COMPRESS_TYPE_GZIP) {
...@@ -530,9 +544,9 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/, ...@@ -530,9 +544,9 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/,
cntl->request_attachment().swap(compressed); cntl->request_attachment().swap(compressed);
if (is_grpc) { if (is_grpc) {
grpc_compressed = true; grpc_compressed = true;
cntl->http_request().SetHeader(common->GRPC_ENCODING, common->GZIP); hreq.SetHeader(common->GRPC_ENCODING, common->GZIP);
} else { } else {
cntl->http_request().SetHeader(common->CONTENT_ENCODING, common->GZIP); hreq.SetHeader(common->CONTENT_ENCODING, common->GZIP);
} }
} else { } else {
cntl->SetFailed("Fail to gzip the request body, skip compressing"); cntl->SetFailed("Fail to gzip the request body, skip compressing");
...@@ -540,30 +554,29 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/, ...@@ -540,30 +554,29 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/,
} }
} }
HttpHeader* header = &cntl->http_request();
// Fill log-id if user set it. // Fill log-id if user set it.
if (cntl->has_log_id()) { if (cntl->has_log_id()) {
header->SetHeader(common->LOG_ID, hreq.SetHeader(common->LOG_ID,
butil::string_printf( butil::string_printf(
"%llu", (unsigned long long)cntl->log_id())); "%llu", (unsigned long long)cntl->log_id()));
} }
if (!is_http2) { if (!is_http2) {
// HTTP before 1.1 needs to set keep-alive explicitly. // HTTP before 1.1 needs to set keep-alive explicitly.
if (header->before_http_1_1() && if (hreq.before_http_1_1() &&
cntl->connection_type() != CONNECTION_TYPE_SHORT && cntl->connection_type() != CONNECTION_TYPE_SHORT &&
header->GetHeader(common->CONNECTION) == NULL) { hreq.GetHeader(common->CONNECTION) == NULL) {
header->SetHeader(common->CONNECTION, common->KEEP_ALIVE); hreq.SetHeader(common->CONNECTION, common->KEEP_ALIVE);
} }
} else { } else {
cntl->set_stream_creator(get_h2_global_stream_creator()); cntl->set_stream_creator(get_h2_global_stream_creator());
if (is_grpc) { if (is_grpc) {
/* /*
header->SetHeader(common->GRPC_ACCEPT_ENCODING, hreq.SetHeader(common->GRPC_ACCEPT_ENCODING,
common->GRPC_ACCEPT_ENCODING_VALUE); common->GRPC_ACCEPT_ENCODING_VALUE);
*/ */
// TODO: do we need this? // TODO: do we need this?
header->SetHeader(common->TE, common->TRAILERS); hreq.SetHeader(common->TE, common->TRAILERS);
// Append compressed and length before body // Append compressed and length before body
AddGrpcPrefix(&cntl->request_attachment(), grpc_compressed); AddGrpcPrefix(&cntl->request_attachment(), grpc_compressed);
...@@ -574,7 +587,7 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/, ...@@ -574,7 +587,7 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/,
// services (indicated by non-NULL method). // services (indicated by non-NULL method).
const google::protobuf::MethodDescriptor* method = cntl->method(); const google::protobuf::MethodDescriptor* method = cntl->method();
if (method != NULL) { if (method != NULL) {
header->set_method(HTTP_METHOD_POST); hreq.set_method(HTTP_METHOD_POST);
std::string path; std::string path;
path.reserve(2 + method->service()->full_name().size() path.reserve(2 + method->service()->full_name().size()
+ method->name().size()); + method->name().size());
...@@ -582,16 +595,16 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/, ...@@ -582,16 +595,16 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/,
path.append(method->service()->full_name()); path.append(method->service()->full_name());
path.push_back('/'); path.push_back('/');
path.append(method->name()); path.append(method->name());
header->uri().set_path(path); hreq.uri().set_path(path);
} }
Span* span = accessor.span(); Span* span = accessor.span();
if (span) { if (span) {
header->SetHeader("x-bd-trace-id", butil::string_printf( hreq.SetHeader("x-bd-trace-id", butil::string_printf(
"%llu", (unsigned long long)span->trace_id())); "%llu", (unsigned long long)span->trace_id()));
header->SetHeader("x-bd-span-id", butil::string_printf( hreq.SetHeader("x-bd-span-id", butil::string_printf(
"%llu", (unsigned long long)span->span_id())); "%llu", (unsigned long long)span->span_id()));
header->SetHeader("x-bd-parent-span-id", butil::string_printf( hreq.SetHeader("x-bd-parent-span-id", butil::string_printf(
"%llu", (unsigned long long)span->parent_span_id())); "%llu", (unsigned long long)span->parent_span_id()));
} }
} }
......
...@@ -42,7 +42,7 @@ const std::string g_server_addr = "127.0.0.1:8011"; ...@@ -42,7 +42,7 @@ const std::string g_server_addr = "127.0.0.1:8011";
const std::string g_prefix = "Hello, "; const std::string g_prefix = "Hello, ";
const std::string g_req = "wyt"; const std::string g_req = "wyt";
const int64_t g_timeout_ms = 1000; const int64_t g_timeout_ms = 1000;
const std::string g_protocol = "grpc"; const std::string g_protocol = "h2c:grpc";
class MyGrpcService : public ::test::GrpcService { class MyGrpcService : public ::test::GrpcService {
public: public:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment