Commit 20b964dd authored by zyearn's avatar zyearn Committed by zhujiashun

add working grpc client support

parent 88670c12
......@@ -504,6 +504,18 @@ static void GlobalInitializeOrDieImpl() {
}
#endif
// grpc protocol is based on http2
Protocol grpc_protocol = { ParseH2Message,
SerializeHttpRequest, PackH2Request,
ProcessHttpRequest, ProcessHttpResponse,
VerifyHttpRequest, ParseHttpServerAddress,
GetHttpMethodName,
CONNECTION_TYPE_SINGLE,
"grpc" };
if (RegisterProtocol(PROTOCOL_GRPC, grpc_protocol) != 0) {
exit(1);
}
// Only valid at client side
Protocol ubrpc_compack_protocol = {
ParseNsheadMessage,
......
......@@ -47,6 +47,7 @@ enum ProtocolType {
PROTOCOL_CDS_AGENT = 24; // Client side only
PROTOCOL_ESP = 25; // Client side only
PROTOCOL_HTTP2 = 26;
PROTOCOL_GRPC = 27;
}
enum CompressType {
......
......@@ -1340,6 +1340,15 @@ static void PackH2Message(butil::IOBuf* out,
it.append_and_forward(out, data_head.payload_size);
}
}
if (!trailer_headers.empty()) {
H2FrameHead headers_head = {
(uint32_t)trailer_headers.size(), H2_FRAME_HEADERS, 0, stream_id};
headers_head.flags |= H2_FLAGS_END_STREAM;
headers_head.flags |= H2_FLAGS_END_HEADERS;
SerializeFrameHead(headbuf, headers_head);
out->append(headbuf, sizeof(headbuf));
out->append(butil::IOBuf::Movable(trailer_headers));
}
const int64_t conn_wu = conn_ctx->ReleaseDeferredWindowUpdate();
if (conn_wu > 0) {
char winbuf[FRAME_HEAD_SIZE + 4];
......@@ -1560,7 +1569,8 @@ H2UnsentRequest::AppendAndDestroySelf(butil::IOBuf* out, Socket* socket) {
}
butil::IOBuf frag;
appender.move_to(frag);
PackH2Message(out, frag, _cntl->request_attachment(), _stream_id, ctx);
butil::IOBuf dummy_buf;
PackH2Message(out, frag, dummy_buf, _cntl->request_attachment(), _stream_id, ctx);
return butil::Status::OK();
}
......@@ -1700,7 +1710,18 @@ H2UnsentResponse::AppendAndDestroySelf(butil::IOBuf* out, Socket* socket) {
butil::IOBuf frag;
appender.move_to(frag);
PackH2Message(out, frag, _data, _stream_id, ctx);
butil::IOBufAppender trailer_appender;
butil::IOBuf trailer_frag;
if (_grpc_protocol) {
// TODO(zhujiashun): how to decide status code and status message
HPacker::Header status("grpc-status", "0");
hpacker.Encode(&trailer_appender, status, options);
//HPacker::Header message("grpc-message", "");
//hpacker.Encode(&trailer_appender, message, options);
trailer_appender.move_to(trailer_frag);
}
PackH2Message(out, frag, trailer_frag, _data, _stream_id, ctx);
return butil::Status::OK();
}
......
......@@ -211,7 +211,9 @@ private:
H2UnsentResponse(Controller* c, int stream_id)
: _size(0)
, _stream_id(stream_id)
, _http_response(c->release_http_response()) {
, _http_response(c->release_http_response())
, _grpc_protocol(ParseContentType(c->http_request().content_type()) ==
HTTP_CONTENT_GRPC) {
_data.swap(c->response_attachment());
}
~H2UnsentResponse() {}
......
......@@ -104,7 +104,6 @@ CommonStrings::CommonStrings()
, CONTENT_TYPE("content-type")
, CONTENT_TYPE_TEXT("text/plain")
, CONTENT_TYPE_JSON("application/json")
, CONTENT_TYPE_GRPC("application/grpc")
, CONTENT_TYPE_PROTO("application/proto")
, ERROR_CODE("x-bd-error-code")
, AUTHORIZATION("authorization")
......@@ -128,6 +127,10 @@ CommonStrings::CommonStrings()
, H2_METHOD(":method")
, METHOD_GET("GET")
, METHOD_POST("POST")
, CONTENT_TYPE_GRPC("application/grpc")
, TE("te")
, TRAILERS("trailers")
, GRPC_ENCODING("grpc-encoding")
{}
static CommonStrings* common = NULL;
......@@ -210,6 +213,17 @@ static void PrintMessage(const butil::IOBuf& inbuf,
std::cerr << buf2 << std::endl;
}
inline uint32_t ReadBigEndian4Bytes(const void* void_buf) {
uint32_t ret = 0;
char* p = (char*)&ret;
const char* buf = (const char*)void_buf;
p[3] = buf[0];
p[2] = buf[1];
p[1] = buf[2];
p[0] = buf[3];
return ret;
}
void ProcessHttpResponse(InputMessageBase* msg) {
const int64_t start_parse_us = butil::cpuwide_time_us();
DestroyingPtr<HttpContext> imsg_guard(static_cast<HttpContext*>(msg));
......@@ -252,6 +266,17 @@ void ProcessHttpResponse(InputMessageBase* msg) {
CHECK(cntl->response_attachment().empty());
const int saved_error = cntl->ErrorCode();
// TODO(zhujiashun): handle compression
char compressed_grpc = false;
if (ParseContentType(res_header->content_type()) == HTTP_CONTENT_GRPC) {
/* 4 is the size of grpc Message-Length */
char buf[4];
res_body.cut1(&compressed_grpc);
res_body.cutn(buf, 4);
int message_length = ReadBigEndian4Bytes(buf);
CHECK(message_length == res_body.length());
}
do {
if (!is_http2) {
// If header has "Connection: close", close the connection.
......@@ -331,12 +356,15 @@ void ProcessHttpResponse(InputMessageBase* msg) {
}
const HttpContentType content_type =
ParseContentType(res_header->content_type());
if (content_type != HTTP_CONTENT_PROTO && content_type != HTTP_CONTENT_JSON) {
cntl->SetFailed(ERESPONSE, "content-type=%s is neither %s nor %s "
if (content_type != HTTP_CONTENT_PROTO &&
content_type != HTTP_CONTENT_JSON &&
content_type != HTTP_CONTENT_GRPC) {
cntl->SetFailed(ERESPONSE, "content-type=%s is neither %s nor %s or %s"
"when response is not NULL",
res_header->content_type().c_str(),
common->CONTENT_TYPE_JSON.c_str(),
common->CONTENT_TYPE_PROTO.c_str());
common->CONTENT_TYPE_PROTO.c_str(),
common->CONTENT_TYPE_GRPC.c_str());
break;
}
const std::string* encoding =
......@@ -352,7 +380,8 @@ void ProcessHttpResponse(InputMessageBase* msg) {
res_body.swap(uncompressed);
}
// message body is json
if (content_type == HTTP_CONTENT_PROTO) {
if (content_type == HTTP_CONTENT_PROTO ||
content_type == HTTP_CONTENT_GRPC) {
if (!ParsePbFromIOBuf(cntl->response(), res_body)) {
cntl->SetFailed(ERESPONSE, "Fail to parse content");
break;
......@@ -374,6 +403,14 @@ void ProcessHttpResponse(InputMessageBase* msg) {
accessor.OnResponse(cid, saved_error);
}
inline void WriteBigEndian4Bytes(char* buf, uint32_t val) {
const char* p = (const char*)&val;
buf[0] = p[3];
buf[1] = p[2];
buf[2] = p[1];
buf[3] = p[0];
}
void SerializeHttpRequest(butil::IOBuf* /*not used*/,
Controller* cntl,
const google::protobuf::Message* request) {
......@@ -391,7 +428,8 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/,
butil::IOBufAsZeroCopyOutputStream wrapper(&cntl->request_attachment());
const HttpContentType content_type
= ParseContentType(cntl->http_request().content_type());
if (content_type == HTTP_CONTENT_PROTO) {
if (content_type == HTTP_CONTENT_PROTO ||
cntl->request_protocol() == PROTOCOL_GRPC) {
// Serialize content as protobuf
if (!request->SerializeToZeroCopyStream(&wrapper)) {
cntl->request_attachment().clear();
......@@ -460,10 +498,25 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/,
header->SetHeader(common->CONNECTION, common->KEEP_ALIVE);
}
if (cntl->request_protocol() == PROTOCOL_HTTP2) {
if (cntl->request_protocol() == PROTOCOL_HTTP2 ||
cntl->request_protocol() == PROTOCOL_GRPC) {
cntl->set_stream_creator(get_h2_global_stream_creator());
}
if (accessor.request_protocol() == PROTOCOL_GRPC) {
header->set_content_type("application/grpc");
header->SetHeader(common->TE, common->TRAILERS);
butil::IOBuf tmp_buf;
// TODO(zhujiashun): Encode request according to Message-Encoding.
// Currently just appen 0x00 to indicate no compression.
tmp_buf.append("\0", 1);
char size_buf[4];
WriteBigEndian4Bytes(size_buf, cntl->request_attachment().size());
tmp_buf.append(size_buf, 4);
tmp_buf.append(cntl->request_attachment());
cntl->request_attachment().swap(tmp_buf);
}
// Set url to /ServiceName/MethodName when we're about to call protobuf
// services (indicated by non-NULL method).
const google::protobuf::MethodDescriptor* method = cntl->method();
......@@ -1109,6 +1162,7 @@ void ProcessHttpRequest(InputMessageBase *msg) {
// TODO(zhujiashun): handle compression
char compressed_grpc = false;
if (ParseContentType(req_header.content_type()) == HTTP_CONTENT_GRPC) {
LOG(INFO) << "Find grpc";
/* 4 is the size of grpc Message-Length */
char buf[4];
req_body.cut1(&compressed_grpc);
......@@ -1300,6 +1354,8 @@ void ProcessHttpRequest(InputMessageBase *msg) {
} else {
const std::string* encoding =
req_header.GetHeader(common->CONTENT_ENCODING);
//const std::string* grpc_encoding =
// req_header.GetHeader(common->
if (encoding != NULL && *encoding == common->GZIP) {
TRACEPRINTF("Decompressing request=%lu",
(unsigned long)req_body.size());
......@@ -1310,7 +1366,8 @@ void ProcessHttpRequest(InputMessageBase *msg) {
}
req_body.swap(uncompressed);
}
if (ParseContentType(req_header.content_type()) == HTTP_CONTENT_PROTO) {
HttpContentType content_type = ParseContentType(req_header.content_type());
if (content_type == HTTP_CONTENT_PROTO || content_type == HTTP_CONTENT_GRPC) {
if (!ParsePbFromIOBuf(req, req_body)) {
cntl->SetFailed(EREQUEST, "Fail to parse http body as %s",
req->GetDescriptor()->full_name().c_str());
......
......@@ -36,7 +36,6 @@ struct CommonStrings {
std::string CONTENT_TYPE_TEXT;
std::string CONTENT_TYPE_JSON;
std::string CONTENT_TYPE_PROTO;
std::string CONTENT_TYPE_GRPC;
std::string ERROR_CODE;
std::string AUTHORIZATION;
std::string ACCEPT_ENCODING;
......@@ -63,6 +62,12 @@ struct CommonStrings {
std::string METHOD_GET;
std::string METHOD_POST;
// GRPC-related headers
std::string CONTENT_TYPE_GRPC;
std::string TE;
std::string TRAILERS;
std::string GRPC_ENCODING;
CommonStrings();
};
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment