Commit 88670c12 authored by zyearn's avatar zyearn Committed by zhujiashun

add grpc server support

parent 65809450
...@@ -1206,6 +1206,8 @@ int H2StreamContext::ConsumeHeaders(butil::IOBufBytesIterator& it) { ...@@ -1206,6 +1206,8 @@ int H2StreamContext::ConsumeHeaders(butil::IOBufBytesIterator& it) {
if (rc == 0) { if (rc == 0) {
break; break;
} }
LOG(INFO) << "Header name: " << pair.name
<< ", header value: " << pair.value;
const char* const name = pair.name.c_str(); const char* const name = pair.name.c_str();
bool matched = false; bool matched = false;
if (name[0] == ':') { // reserved names if (name[0] == ':') { // reserved names
...@@ -1286,6 +1288,7 @@ const CommonStrings* get_common_strings(); ...@@ -1286,6 +1288,7 @@ const CommonStrings* get_common_strings();
static void PackH2Message(butil::IOBuf* out, static void PackH2Message(butil::IOBuf* out,
butil::IOBuf& headers, butil::IOBuf& headers,
butil::IOBuf& trailer_headers,
const butil::IOBuf& data, const butil::IOBuf& data,
int stream_id, int stream_id,
H2Context* conn_ctx) { H2Context* conn_ctx) {
...@@ -1293,7 +1296,7 @@ static void PackH2Message(butil::IOBuf* out, ...@@ -1293,7 +1296,7 @@ static void PackH2Message(butil::IOBuf* out,
char headbuf[FRAME_HEAD_SIZE]; char headbuf[FRAME_HEAD_SIZE];
H2FrameHead headers_head = { H2FrameHead headers_head = {
(uint32_t)headers.size(), H2_FRAME_HEADERS, 0, stream_id}; (uint32_t)headers.size(), H2_FRAME_HEADERS, 0, stream_id};
if (data.empty()) { if (data.empty() && trailer_headers.empty()) {
headers_head.flags |= H2_FLAGS_END_STREAM; headers_head.flags |= H2_FLAGS_END_STREAM;
} }
if (headers_head.payload_size <= remote_settings.max_frame_size) { if (headers_head.payload_size <= remote_settings.max_frame_size) {
...@@ -1325,8 +1328,10 @@ static void PackH2Message(butil::IOBuf* out, ...@@ -1325,8 +1328,10 @@ static void PackH2Message(butil::IOBuf* out,
butil::IOBufBytesIterator it(data); butil::IOBufBytesIterator it(data);
while (it.bytes_left()) { while (it.bytes_left()) {
if (it.bytes_left() <= remote_settings.max_frame_size) { if (it.bytes_left() <= remote_settings.max_frame_size) {
data_head.flags |= H2_FLAGS_END_STREAM;
data_head.payload_size = it.bytes_left(); data_head.payload_size = it.bytes_left();
if (trailer_headers.empty()) {
data_head.flags |= H2_FLAGS_END_STREAM;
}
} else { } else {
data_head.payload_size = remote_settings.max_frame_size; data_head.payload_size = remote_settings.max_frame_size;
} }
......
...@@ -223,6 +223,7 @@ private: ...@@ -223,6 +223,7 @@ private:
uint32_t _stream_id; uint32_t _stream_id;
std::unique_ptr<HttpHeader> _http_response; std::unique_ptr<HttpHeader> _http_response;
butil::IOBuf _data; butil::IOBuf _data;
bool _grpc_protocol;
HPacker::Header _list[0]; HPacker::Header _list[0];
}; };
......
...@@ -104,6 +104,7 @@ CommonStrings::CommonStrings() ...@@ -104,6 +104,7 @@ CommonStrings::CommonStrings()
, CONTENT_TYPE("content-type") , CONTENT_TYPE("content-type")
, CONTENT_TYPE_TEXT("text/plain") , CONTENT_TYPE_TEXT("text/plain")
, CONTENT_TYPE_JSON("application/json") , CONTENT_TYPE_JSON("application/json")
, CONTENT_TYPE_GRPC("application/grpc")
, CONTENT_TYPE_PROTO("application/proto") , CONTENT_TYPE_PROTO("application/proto")
, ERROR_CODE("x-bd-error-code") , ERROR_CODE("x-bd-error-code")
, AUTHORIZATION("authorization") , AUTHORIZATION("authorization")
...@@ -141,16 +142,11 @@ int InitCommonStrings() { ...@@ -141,16 +142,11 @@ int InitCommonStrings() {
static const int ALLOW_UNUSED force_creation_of_common = InitCommonStrings(); static const int ALLOW_UNUSED force_creation_of_common = InitCommonStrings();
const CommonStrings* get_common_strings() { return common; } const CommonStrings* get_common_strings() { return common; }
enum HttpContentType { HttpContentType ParseContentType(butil::StringPiece content_type) {
HTTP_CONTENT_OTHERS = 0,
HTTP_CONTENT_JSON = 1,
HTTP_CONTENT_PROTO = 2
};
inline HttpContentType ParseContentType(butil::StringPiece content_type) {
const butil::StringPiece prefix = "application/"; const butil::StringPiece prefix = "application/";
const butil::StringPiece json = "json"; const butil::StringPiece json = "json";
const butil::StringPiece proto = "proto"; const butil::StringPiece proto = "proto";
const butil::StringPiece grpc = "grpc";
// According to http://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html#sec3.7 // According to http://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html#sec3.7
// media-type = type "/" subtype *( ";" parameter ) // media-type = type "/" subtype *( ";" parameter )
...@@ -167,6 +163,9 @@ inline HttpContentType ParseContentType(butil::StringPiece content_type) { ...@@ -167,6 +163,9 @@ inline HttpContentType ParseContentType(butil::StringPiece content_type) {
} else if (content_type.starts_with(proto)) { } else if (content_type.starts_with(proto)) {
type = HTTP_CONTENT_PROTO; type = HTTP_CONTENT_PROTO;
content_type.remove_prefix(proto.size()); content_type.remove_prefix(proto.size());
} else if (content_type.starts_with(grpc)) {
type = HTTP_CONTENT_GRPC;
content_type.remove_prefix(grpc.size());
} else { } else {
return HTTP_CONTENT_OTHERS; return HTTP_CONTENT_OTHERS;
} }
...@@ -612,11 +611,13 @@ HttpResponseSender::~HttpResponseSender() { ...@@ -612,11 +611,13 @@ HttpResponseSender::~HttpResponseSender() {
content_type_str = &req_header->content_type(); content_type_str = &req_header->content_type();
} }
const HttpContentType content_type = ParseContentType(*content_type_str); const HttpContentType content_type = ParseContentType(*content_type_str);
if (content_type == HTTP_CONTENT_PROTO) { if (content_type == HTTP_CONTENT_PROTO || content_type == HTTP_CONTENT_GRPC) {
if (res->SerializeToZeroCopyStream(&wrapper)) { if (res->SerializeToZeroCopyStream(&wrapper)) {
// Set content-type if user did not // Set content-type if user did not
if (res_header->content_type().empty()) { if (res_header->content_type().empty()) {
res_header->set_content_type(common->CONTENT_TYPE_PROTO); res_header->set_content_type((content_type == HTTP_CONTENT_PROTO)?
common->CONTENT_TYPE_PROTO:
common->CONTENT_TYPE_GRPC);
} }
} else { } else {
cntl->SetFailed(ERESPONSE, "Fail to serialize %s", res->GetTypeName().c_str()); cntl->SetFailed(ERESPONSE, "Fail to serialize %s", res->GetTypeName().c_str());
...@@ -640,6 +641,18 @@ HttpResponseSender::~HttpResponseSender() { ...@@ -640,6 +641,18 @@ HttpResponseSender::~HttpResponseSender() {
} }
} }
if (ParseContentType(req_header->content_type()) == HTTP_CONTENT_GRPC) {
butil::IOBuf tmp_buf;
// TODO(zhujiashun): Encode response according to Message-Accept-Encoding
// in req_header. Currently just appen 0x00 to indicate no compression.
tmp_buf.append("\0", 1);
char size_buf[4];
WriteBigEndian4Bytes(size_buf, cntl->response_attachment().size());
tmp_buf.append(size_buf, 4);
tmp_buf.append(cntl->response_attachment());
cntl->response_attachment().swap(tmp_buf);
}
// In HTTP 0.9, the server always closes the connection after sending the // In HTTP 0.9, the server always closes the connection after sending the
// response. The client must close its end of the connection after // response. The client must close its end of the connection after
// receiving the response. // receiving the response.
...@@ -1056,6 +1069,7 @@ bool VerifyHttpRequest(const InputMessageBase* msg) { ...@@ -1056,6 +1069,7 @@ bool VerifyHttpRequest(const InputMessageBase* msg) {
socket->mutable_auth_context()) == 0; socket->mutable_auth_context()) == 0;
} }
// Defined in baidu_rpc_protocol.cpp // Defined in baidu_rpc_protocol.cpp
void EndRunningCallMethodInPool( void EndRunningCallMethodInPool(
::google::protobuf::Service* service, ::google::protobuf::Service* service,
...@@ -1092,6 +1106,17 @@ void ProcessHttpRequest(InputMessageBase *msg) { ...@@ -1092,6 +1106,17 @@ void ProcessHttpRequest(InputMessageBase *msg) {
imsg_guard->header().Swap(req_header); imsg_guard->header().Swap(req_header);
butil::IOBuf& req_body = imsg_guard->body(); butil::IOBuf& req_body = imsg_guard->body();
// TODO(zhujiashun): handle compression
char compressed_grpc = false;
if (ParseContentType(req_header.content_type()) == HTTP_CONTENT_GRPC) {
/* 4 is the size of grpc Message-Length */
char buf[4];
req_body.cut1(&compressed_grpc);
req_body.cutn(buf, 4);
int message_length = ReadBigEndian4Bytes(buf);
CHECK(message_length == req_body.length());
}
butil::EndPoint user_addr; butil::EndPoint user_addr;
if (!GetUserAddressFromHeader(req_header, &user_addr)) { if (!GetUserAddressFromHeader(req_header, &user_addr)) {
user_addr = socket->remote_side(); user_addr = socket->remote_side();
......
...@@ -36,6 +36,7 @@ struct CommonStrings { ...@@ -36,6 +36,7 @@ struct CommonStrings {
std::string CONTENT_TYPE_TEXT; std::string CONTENT_TYPE_TEXT;
std::string CONTENT_TYPE_JSON; std::string CONTENT_TYPE_JSON;
std::string CONTENT_TYPE_PROTO; std::string CONTENT_TYPE_PROTO;
std::string CONTENT_TYPE_GRPC;
std::string ERROR_CODE; std::string ERROR_CODE;
std::string AUTHORIZATION; std::string AUTHORIZATION;
std::string ACCEPT_ENCODING; std::string ACCEPT_ENCODING;
...@@ -124,6 +125,15 @@ bool ParseHttpServerAddress(butil::EndPoint* out, const char* server_addr_and_po ...@@ -124,6 +125,15 @@ bool ParseHttpServerAddress(butil::EndPoint* out, const char* server_addr_and_po
const std::string& GetHttpMethodName(const google::protobuf::MethodDescriptor*, const std::string& GetHttpMethodName(const google::protobuf::MethodDescriptor*,
const Controller*); const Controller*);
enum HttpContentType {
HTTP_CONTENT_OTHERS = 0,
HTTP_CONTENT_JSON = 1,
HTTP_CONTENT_PROTO = 2,
HTTP_CONTENT_GRPC = 3
};
HttpContentType ParseContentType(butil::StringPiece content_type);
} // namespace policy } // namespace policy
} // namespace brpc } // namespace brpc
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment