Commit 4c50ffb4 authored by zyearn's avatar zyearn Committed by zhujiashun

add grpc gizp support in req and res

parent e6334e5c
// Copyright (c) 2014 Baidu, Inc. // Copyright (c) 2018 Bilibili, Inc.
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
...@@ -12,7 +12,8 @@ ...@@ -12,7 +12,8 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
// A client sending requests to server every 1 second. // A client sending requests to server every 1 second using grpc.
// Authors: Jiashun Zhu(zhujiashun@bilibili.com)
#include <gflags/gflags.h> #include <gflags/gflags.h>
#include <butil/logging.h> #include <butil/logging.h>
...@@ -20,14 +21,13 @@ ...@@ -20,14 +21,13 @@
#include <brpc/channel.h> #include <brpc/channel.h>
#include "helloworld.pb.h" #include "helloworld.pb.h"
DEFINE_string(attachment, "foo", "Carry this along with requests");
DEFINE_string(protocol, "grpc", "Protocol type. Defined in src/brpc/options.proto"); DEFINE_string(protocol, "grpc", "Protocol type. Defined in src/brpc/options.proto");
DEFINE_string(server, "0.0.0.0:8000", "IP Address of server"); DEFINE_string(server, "0.0.0.0:8000", "IP Address of server");
DEFINE_string(load_balancer, "", "The algorithm for load balancing"); DEFINE_string(load_balancer, "", "The algorithm for load balancing");
DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds"); DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)"); DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
DEFINE_int32(interval_ms, 1000, "Milliseconds between consecutive requests"); DEFINE_int32(interval_ms, 1000, "Milliseconds between consecutive requests");
DEFINE_string(http_content_type, "application/json", "Content type of http request"); DEFINE_bool(gzip, false, "compress body using gzip");
int main(int argc, char* argv[]) { int main(int argc, char* argv[]) {
// Parse gflags. We recommend you to use gflags as well. // Parse gflags. We recommend you to use gflags as well.
...@@ -53,25 +53,18 @@ int main(int argc, char* argv[]) { ...@@ -53,25 +53,18 @@ int main(int argc, char* argv[]) {
// Send a request and wait for the response every 1 second. // Send a request and wait for the response every 1 second.
int log_id = 0; int log_id = 0;
//while (!brpc::IsAskedToQuit()) { while (!brpc::IsAskedToQuit()) {
// We will receive response synchronously, safe to put variables // We will receive response synchronously, safe to put variables
// on stack. // on stack.
helloworld::HelloRequest request; helloworld::HelloRequest request;
helloworld::HelloReply response; helloworld::HelloReply response;
brpc::Controller cntl; brpc::Controller cntl;
request.set_name("zjs's world"); request.set_name("grpc client example");
cntl.set_log_id(log_id ++); // set by user cntl.set_log_id(log_id ++); // set by user
if (FLAGS_protocol != "http" && FLAGS_protocol != "h2c" && if (FLAGS_gzip) {
FLAGS_protocol != "grpc") { cntl.set_request_compress_type(brpc::COMPRESS_TYPE_GZIP);
// Set attachment which is wired to network directly instead of
// being serialized into protobuf messages.
cntl.request_attachment().append(FLAGS_attachment);
} else {
//cntl.http_request().set_content_type(FLAGS_http_content_type);
} }
// Because `done'(last parameter) is NULL, this function waits until // Because `done'(last parameter) is NULL, this function waits until
// the response comes back or error occurs(including timedout). // the response comes back or error occurs(including timedout).
stub.SayHello(&cntl, &request, &response, NULL); stub.SayHello(&cntl, &request, &response, NULL);
...@@ -84,8 +77,8 @@ int main(int argc, char* argv[]) { ...@@ -84,8 +77,8 @@ int main(int argc, char* argv[]) {
} else { } else {
LOG(WARNING) << cntl.ErrorText(); LOG(WARNING) << cntl.ErrorText();
} }
//usleep(FLAGS_interval_ms * 1000L); usleep(FLAGS_interval_ms * 1000L);
//} }
return 0; return 0;
} }
// Copyright (c) 2014 Baidu, Inc. // Copyright (c) 2018 Bilibili, Inc.
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
...@@ -12,7 +12,9 @@ ...@@ -12,7 +12,9 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
// A server to receive HttpRequest and send back HttpResponse. // A server to receive HelloRequest and send back HelloReply
//
// Authors: Jiashun Zhu(zhujiashun@bilibili.com)
#include <gflags/gflags.h> #include <gflags/gflags.h>
#include <butil/logging.h> #include <butil/logging.h>
...@@ -25,22 +27,25 @@ DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no " ...@@ -25,22 +27,25 @@ DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no "
"read/write operations during the last `idle_timeout_s'"); "read/write operations during the last `idle_timeout_s'");
DEFINE_int32(logoff_ms, 2000, "Maximum duration of server's LOGOFF state " DEFINE_int32(logoff_ms, 2000, "Maximum duration of server's LOGOFF state "
"(waiting for client to close connection before server stops)"); "(waiting for client to close connection before server stops)");
DEFINE_bool(gzip, false, "compress body using gzip");
// Service with static path. // Service with static path.
using helloworld::HelloRequest; using helloworld::HelloRequest;
using helloworld::HelloReply; using helloworld::HelloReply;
class HttpServiceImpl : public helloworld::Greeter { class GreeterImpl : public helloworld::Greeter {
public: public:
HttpServiceImpl() {}; GreeterImpl() {};
virtual ~HttpServiceImpl() {}; virtual ~GreeterImpl() {};
void SayHello(google::protobuf::RpcController* cntl_base, void SayHello(google::protobuf::RpcController* cntl_base,
const HelloRequest* req, const HelloRequest* req,
HelloReply* res, HelloReply* res,
google::protobuf::Closure* done) { google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done); brpc::ClosureGuard done_guard(done);
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
if (FLAGS_gzip) {
cntl->set_response_compress_type(brpc::COMPRESS_TYPE_GZIP);
}
std::string prefix("Hello "); std::string prefix("Hello ");
LOG(INFO) << "req=" << req->name(); LOG(INFO) << "req=" << req->name();
res->set_message(prefix + req->name()); res->set_message(prefix + req->name());
...@@ -54,7 +59,7 @@ int main(int argc, char* argv[]) { ...@@ -54,7 +59,7 @@ int main(int argc, char* argv[]) {
// Generally you only need one Server. // Generally you only need one Server.
brpc::Server server; brpc::Server server;
HttpServiceImpl http_svc; GreeterImpl http_svc;
// Add services into server. Notice the second parameter, because the // Add services into server. Notice the second parameter, because the
// service is put on stack, we don't want server to delete it, otherwise // service is put on stack, we don't want server to delete it, otherwise
......
...@@ -110,6 +110,7 @@ CommonStrings::CommonStrings() ...@@ -110,6 +110,7 @@ CommonStrings::CommonStrings()
, ACCEPT_ENCODING("accept-encoding") , ACCEPT_ENCODING("accept-encoding")
, CONTENT_ENCODING("content-encoding") , CONTENT_ENCODING("content-encoding")
, CONTENT_LENGTH("content-length") , CONTENT_LENGTH("content-length")
, IDENTITY("identity")
, GZIP("gzip") , GZIP("gzip")
, CONNECTION("connection") , CONNECTION("connection")
, KEEP_ALIVE("keep-alive") , KEEP_ALIVE("keep-alive")
...@@ -131,6 +132,7 @@ CommonStrings::CommonStrings() ...@@ -131,6 +132,7 @@ CommonStrings::CommonStrings()
, TE("te") , TE("te")
, TRAILERS("trailers") , TRAILERS("trailers")
, GRPC_ENCODING("grpc-encoding") , GRPC_ENCODING("grpc-encoding")
, GRPC_ACCEPT_ENCODING("grpc-accept-encoding")
{} {}
static CommonStrings* common = NULL; static CommonStrings* common = NULL;
...@@ -269,7 +271,7 @@ void ProcessHttpResponse(InputMessageBase* msg) { ...@@ -269,7 +271,7 @@ void ProcessHttpResponse(InputMessageBase* msg) {
// TODO(zhujiashun): handle compression // TODO(zhujiashun): handle compression
char compressed_grpc = false; char compressed_grpc = false;
if (ParseContentType(res_header->content_type()) == HTTP_CONTENT_GRPC) { if (ParseContentType(res_header->content_type()) == HTTP_CONTENT_GRPC) {
/* 4 is the size of grpc Message-Length */ /* 4 is the size of grpc Message-Length in Length-Prefixed-Message*/
char buf[4]; char buf[4];
res_body.cut1(&compressed_grpc); res_body.cut1(&compressed_grpc);
res_body.cutn(buf, 4); res_body.cutn(buf, 4);
...@@ -369,7 +371,11 @@ void ProcessHttpResponse(InputMessageBase* msg) { ...@@ -369,7 +371,11 @@ void ProcessHttpResponse(InputMessageBase* msg) {
} }
const std::string* encoding = const std::string* encoding =
res_header->GetHeader(common->CONTENT_ENCODING); res_header->GetHeader(common->CONTENT_ENCODING);
if (encoding != NULL && *encoding == common->GZIP) { const std::string* grpc_encoding =
res_header->GetHeader(common->GRPC_ENCODING);
if ((encoding != NULL && *encoding == common->GZIP) ||
(compressed_grpc && grpc_encoding != NULL && *grpc_encoding ==
common->GZIP)) {
TRACEPRINTF("Decompressing response=%lu", TRACEPRINTF("Decompressing response=%lu",
(unsigned long)res_body.size()); (unsigned long)res_body.size());
butil::IOBuf uncompressed; butil::IOBuf uncompressed;
...@@ -463,6 +469,8 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/, ...@@ -463,6 +469,8 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/,
return cntl->SetFailed(EREQUEST, "%s", return cntl->SetFailed(EREQUEST, "%s",
cntl->http_request().uri().status().error_cstr()); cntl->http_request().uri().status().error_cstr());
} }
ControllerPrivateAccessor accessor(cntl);
bool grpc_compressed = false;
if (cntl->request_compress_type() != COMPRESS_TYPE_NONE) { if (cntl->request_compress_type() != COMPRESS_TYPE_NONE) {
if (cntl->request_compress_type() != COMPRESS_TYPE_GZIP) { if (cntl->request_compress_type() != COMPRESS_TYPE_GZIP) {
return cntl->SetFailed(EREQUEST, "http does not support %s", return cntl->SetFailed(EREQUEST, "http does not support %s",
...@@ -474,7 +482,12 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/, ...@@ -474,7 +482,12 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/,
butil::IOBuf compressed; butil::IOBuf compressed;
if (GzipCompress(cntl->request_attachment(), &compressed, NULL)) { if (GzipCompress(cntl->request_attachment(), &compressed, NULL)) {
cntl->request_attachment().swap(compressed); cntl->request_attachment().swap(compressed);
if (accessor.request_protocol() == PROTOCOL_GRPC) {
grpc_compressed = true;
cntl->http_request().SetHeader(common->GRPC_ENCODING, common->GZIP);
} else {
cntl->http_request().SetHeader(common->CONTENT_ENCODING, common->GZIP); cntl->http_request().SetHeader(common->CONTENT_ENCODING, common->GZIP);
}
} else { } else {
cntl->SetFailed("Fail to gzip the request body, skip compressing"); cntl->SetFailed("Fail to gzip the request body, skip compressing");
} }
...@@ -482,8 +495,6 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/, ...@@ -482,8 +495,6 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/,
} }
HttpHeader* header = &cntl->http_request(); HttpHeader* header = &cntl->http_request();
ControllerPrivateAccessor accessor(cntl);
// Fill log-id if user set it. // Fill log-id if user set it.
if (cntl->has_log_id()) { if (cntl->has_log_id()) {
header->SetHeader(common->LOG_ID, header->SetHeader(common->LOG_ID,
...@@ -504,12 +515,19 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/, ...@@ -504,12 +515,19 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/,
} }
if (accessor.request_protocol() == PROTOCOL_GRPC) { if (accessor.request_protocol() == PROTOCOL_GRPC) {
header->set_content_type("application/grpc"); // always tell client gzip support
// TODO(zhujiashun): add zlib and snappy?
header->SetHeader(common->GRPC_ACCEPT_ENCODING,
common->IDENTITY + "," + common->GZIP);
header->set_content_type(common->CONTENT_TYPE_GRPC);
header->SetHeader(common->TE, common->TRAILERS); header->SetHeader(common->TE, common->TRAILERS);
butil::IOBuf tmp_buf; butil::IOBuf tmp_buf;
// TODO(zhujiashun): Encode request according to Message-Encoding. // Compressed-Flag as 0 / 1, encoded as 1 byte unsigned integer
// Currently just appen 0x00 to indicate no compression. if (grpc_compressed) {
tmp_buf.append("\1", 1);
} else {
tmp_buf.append("\0", 1); tmp_buf.append("\0", 1);
}
char size_buf[4]; char size_buf[4];
WriteBigEndian4Bytes(size_buf, cntl->request_attachment().size()); WriteBigEndian4Bytes(size_buf, cntl->request_attachment().size());
tmp_buf.append(size_buf, 4); tmp_buf.append(size_buf, 4);
...@@ -577,10 +595,10 @@ void PackHttpRequest(butil::IOBuf* buf, ...@@ -577,10 +595,10 @@ void PackHttpRequest(butil::IOBuf* buf,
inline bool SupportGzip(Controller* cntl) { inline bool SupportGzip(Controller* cntl) {
const std::string* encodings = const std::string* encodings =
cntl->http_request().GetHeader(common->ACCEPT_ENCODING); cntl->http_request().GetHeader(common->ACCEPT_ENCODING);
if (encodings == NULL) { const std::string* grpc_encodings =
return false; cntl->http_request().GetHeader(common->GRPC_ACCEPT_ENCODING);
} return (encodings && encodings->find(common->GZIP) != std::string::npos) ||
return encodings->find(common->GZIP) != std::string::npos; (grpc_encodings && grpc_encodings->find(common->GZIP) != std::string::npos);
} }
class HttpResponseSender { class HttpResponseSender {
...@@ -694,18 +712,6 @@ HttpResponseSender::~HttpResponseSender() { ...@@ -694,18 +712,6 @@ HttpResponseSender::~HttpResponseSender() {
} }
} }
if (ParseContentType(req_header->content_type()) == HTTP_CONTENT_GRPC) {
butil::IOBuf tmp_buf;
// TODO(zhujiashun): Encode response according to Message-Accept-Encoding
// in req_header. Currently just appen 0x00 to indicate no compression.
tmp_buf.append("\0", 1);
char size_buf[4];
WriteBigEndian4Bytes(size_buf, cntl->response_attachment().size());
tmp_buf.append(size_buf, 4);
tmp_buf.append(cntl->response_attachment());
cntl->response_attachment().swap(tmp_buf);
}
// In HTTP 0.9, the server always closes the connection after sending the // In HTTP 0.9, the server always closes the connection after sending the
// response. The client must close its end of the connection after // response. The client must close its end of the connection after
// receiving the response. // receiving the response.
...@@ -738,6 +744,9 @@ HttpResponseSender::~HttpResponseSender() { ...@@ -738,6 +744,9 @@ HttpResponseSender::~HttpResponseSender() {
} // else user explicitly set Connection:close, clients of } // else user explicitly set Connection:close, clients of
// HTTP 1.1/1.0/0.9 should all close the connection. // HTTP 1.1/1.0/0.9 should all close the connection.
} }
bool grpc_compressed = false;
bool grpc_protocol =
ParseContentType(req_header->content_type()) == HTTP_CONTENT_GRPC;
if (cntl->Failed()) { if (cntl->Failed()) {
// Set status-code with default value(converted from error code) // Set status-code with default value(converted from error code)
// if user did not set it. // if user did not set it.
...@@ -775,7 +784,12 @@ HttpResponseSender::~HttpResponseSender() { ...@@ -775,7 +784,12 @@ HttpResponseSender::~HttpResponseSender() {
butil::IOBuf tmpbuf; butil::IOBuf tmpbuf;
if (GzipCompress(cntl->response_attachment(), &tmpbuf, NULL)) { if (GzipCompress(cntl->response_attachment(), &tmpbuf, NULL)) {
cntl->response_attachment().swap(tmpbuf); cntl->response_attachment().swap(tmpbuf);
if (grpc_protocol) {
grpc_compressed = true;
res_header->SetHeader(common->GRPC_ENCODING, common->GZIP);
} else {
res_header->SetHeader(common->CONTENT_ENCODING, common->GZIP); res_header->SetHeader(common->CONTENT_ENCODING, common->GZIP);
}
} else { } else {
LOG(ERROR) << "Fail to gzip the http response, skip compression."; LOG(ERROR) << "Fail to gzip the http response, skip compression.";
} }
...@@ -787,6 +801,27 @@ HttpResponseSender::~HttpResponseSender() { ...@@ -787,6 +801,27 @@ HttpResponseSender::~HttpResponseSender() {
} }
} }
if (grpc_protocol) {
// always tell client gzip support
// TODO(zhujiashun): add zlib and snappy?
res_header->SetHeader(common->GRPC_ACCEPT_ENCODING,
common->IDENTITY + "," + common->GZIP);
butil::IOBuf tmp_buf;
// Compressed-Flag as 0 / 1, encoded as 1 byte unsigned integer
if (grpc_compressed) {
tmp_buf.append("\1", 1);
} else {
tmp_buf.append("\0", 1);
}
char size_buf[4];
WriteBigEndian4Bytes(size_buf, cntl->response_attachment().size());
tmp_buf.append(size_buf, 4);
tmp_buf.append(cntl->response_attachment());
cntl->response_attachment().swap(tmp_buf);
}
int rc = -1; int rc = -1;
// Have the risk of unlimited pending responses, in which case, tell // Have the risk of unlimited pending responses, in which case, tell
// users to set max_concurrency. // users to set max_concurrency.
...@@ -1159,11 +1194,9 @@ void ProcessHttpRequest(InputMessageBase *msg) { ...@@ -1159,11 +1194,9 @@ void ProcessHttpRequest(InputMessageBase *msg) {
imsg_guard->header().Swap(req_header); imsg_guard->header().Swap(req_header);
butil::IOBuf& req_body = imsg_guard->body(); butil::IOBuf& req_body = imsg_guard->body();
// TODO(zhujiashun): handle compression
char compressed_grpc = false; char compressed_grpc = false;
if (ParseContentType(req_header.content_type()) == HTTP_CONTENT_GRPC) { if (ParseContentType(req_header.content_type()) == HTTP_CONTENT_GRPC) {
LOG(INFO) << "Find grpc"; /* 4 is the size of grpc Message-Length in Length-Prefixed-Message*/
/* 4 is the size of grpc Message-Length */
char buf[4]; char buf[4];
req_body.cut1(&compressed_grpc); req_body.cut1(&compressed_grpc);
req_body.cutn(buf, 4); req_body.cutn(buf, 4);
...@@ -1354,9 +1387,11 @@ void ProcessHttpRequest(InputMessageBase *msg) { ...@@ -1354,9 +1387,11 @@ void ProcessHttpRequest(InputMessageBase *msg) {
} else { } else {
const std::string* encoding = const std::string* encoding =
req_header.GetHeader(common->CONTENT_ENCODING); req_header.GetHeader(common->CONTENT_ENCODING);
//const std::string* grpc_encoding = const std::string* grpc_encoding =
// req_header.GetHeader(common-> req_header.GetHeader(common->GRPC_ENCODING);
if (encoding != NULL && *encoding == common->GZIP) { if ((encoding != NULL && *encoding == common->GZIP) ||
(compressed_grpc && grpc_encoding != NULL && *grpc_encoding ==
common->GZIP)) {
TRACEPRINTF("Decompressing request=%lu", TRACEPRINTF("Decompressing request=%lu",
(unsigned long)req_body.size()); (unsigned long)req_body.size());
butil::IOBuf uncompressed; butil::IOBuf uncompressed;
......
...@@ -41,6 +41,7 @@ struct CommonStrings { ...@@ -41,6 +41,7 @@ struct CommonStrings {
std::string ACCEPT_ENCODING; std::string ACCEPT_ENCODING;
std::string CONTENT_ENCODING; std::string CONTENT_ENCODING;
std::string CONTENT_LENGTH; std::string CONTENT_LENGTH;
std::string IDENTITY;
std::string GZIP; std::string GZIP;
std::string CONNECTION; std::string CONNECTION;
std::string KEEP_ALIVE; std::string KEEP_ALIVE;
...@@ -67,6 +68,7 @@ struct CommonStrings { ...@@ -67,6 +68,7 @@ struct CommonStrings {
std::string TE; std::string TE;
std::string TRAILERS; std::string TRAILERS;
std::string GRPC_ENCODING; std::string GRPC_ENCODING;
std::string GRPC_ACCEPT_ENCODING;
CommonStrings(); CommonStrings();
}; };
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment