Commit 4a0927af authored by gejun's avatar gejun

Fix many issues around impl. of grpc

parent 798e9a11
......@@ -21,7 +21,7 @@
#include <brpc/channel.h>
#include "helloworld.pb.h"
DEFINE_string(protocol, "grpc", "Protocol type. Defined in src/brpc/options.proto");
DEFINE_string(protocol, "h2c", "Protocol type. Defined in src/brpc/options.proto");
DEFINE_string(server, "0.0.0.0:50051", "IP Address of server");
DEFINE_string(load_balancer, "", "The algorithm for load balancing");
DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
......@@ -59,22 +59,21 @@ int main(int argc, char* argv[]) {
helloworld::HelloReply response;
brpc::Controller cntl;
request.set_name("grpc client example");
request.set_name("grpc_req_from_brpc");
cntl.http_request().set_content_type("application/grpc");
if (FLAGS_gzip) {
cntl.set_request_compress_type(brpc::COMPRESS_TYPE_GZIP);
}
// Because `done'(last parameter) is NULL, this function waits until
// the response comes back or error occurs(including timedout).
stub.SayHello(&cntl, &request, &response, NULL);
//cntl.http_request().uri() = FLAGS_server;
//channel.CallMethod(NULL, &cntl, &request, &response, NULL);
if (!cntl.Failed()) {
LOG(INFO) << "Received response from " << cntl.remote_side()
<< " to " << cntl.local_side()
<< ": " << response.message()
<< " latency=" << cntl.latency_us() << "us";
} else {
LOG(WARNING) << cntl.ErrorCode() << ": " << cntl.ErrorText();
LOG(WARNING) << cntl.ErrorText();
}
usleep(FLAGS_interval_ms * 1000L);
}
......
......@@ -42,7 +42,6 @@ public:
if (FLAGS_gzip) {
cntl->set_response_compress_type(brpc::COMPRESS_TYPE_GZIP);
}
LOG(INFO) << "req=" << req->name();
res->set_message("Hello " + req->name());
// If an error happens, use controller::set_grpc_error_code to set errors
// e.g., cntl->set_grpc_error_code(brpc::GRPC_RESOURCEEXHAUSTED, "test grpc message");
......
......@@ -225,12 +225,14 @@ int HttpMessage::OnBody(const char *at, const size_t length) {
delete _vmsgbuilder;
_vmsgbuilder = NULL;
} else {
if (_body_length < (size_t)FLAGS_http_verbose_max_body_length) {
if (_vbodylen < (size_t)FLAGS_http_verbose_max_body_length) {
int plen = std::min(length, (size_t)FLAGS_http_verbose_max_body_length
- _body_length);
_vmsgbuilder->write(at, plen);
- _vbodylen);
std::string str = butil::ToPrintableString(
at, plen, std::numeric_limits<size_t>::max());
_vmsgbuilder->write(str.data(), str.size());
}
_body_length += length;
_vbodylen += length;
}
}
if (_stage != HTTP_ON_BODY) {
......@@ -280,8 +282,8 @@ int HttpMessage::OnBody(const char *at, const size_t length) {
int HttpMessage::OnMessageComplete() {
if (_vmsgbuilder) {
if (_body_length > (size_t)FLAGS_http_verbose_max_body_length) {
*_vmsgbuilder << "\n<skipped " << _body_length
if (_vbodylen > (size_t)FLAGS_http_verbose_max_body_length) {
*_vmsgbuilder << "\n<skipped " << _vbodylen
- (size_t)FLAGS_http_verbose_max_body_length << " bytes>";
}
std::cerr << _vmsgbuilder->buf() << std::endl;
......@@ -396,7 +398,7 @@ HttpMessage::HttpMessage(bool read_body_progressively)
, _body_reader(NULL)
, _cur_value(NULL)
, _vmsgbuilder(NULL)
, _body_length(0) {
, _vbodylen(0) {
http_parser_init(&_parser, HTTP_BOTH);
_parser.data = this;
}
......@@ -534,7 +536,7 @@ std::ostream& operator<<(std::ostream& os, const http_parser& parser) {
// | "CONNECT" ; Section 9.9
// | extension-method
// extension-method = token
void SerializeHttpRequest(butil::IOBuf* request,
void MakeRawHttpRequest(butil::IOBuf* request,
HttpHeader* h,
const butil::EndPoint& remote_side,
const butil::IOBuf* content) {
......@@ -611,7 +613,7 @@ void SerializeHttpRequest(butil::IOBuf* request,
// CRLF
// [ message-body ] ; Section 7.2
// Status-Line = HTTP-Version SP Status-Code SP Reason-Phrase CRLF
void SerializeHttpResponse(butil::IOBuf* response,
void MakeRawHttpResponse(butil::IOBuf* response,
HttpHeader* h,
butil::IOBuf* content) {
butil::IOBufBuilder os;
......
......@@ -113,7 +113,7 @@ private:
protected:
// Only valid when -http_verbose is on
butil::IOBufBuilder* _vmsgbuilder;
size_t _body_length;
size_t _vbodylen;
};
std::ostream& operator<<(std::ostream& os, const http_parser& parser);
......@@ -122,7 +122,7 @@ std::ostream& operator<<(std::ostream& os, const http_parser& parser);
// header: may be modified in some cases
// remote_side: used when "Host" is absent
// content: could be NULL.
void SerializeHttpRequest(butil::IOBuf* request,
void MakeRawHttpRequest(butil::IOBuf* request,
HttpHeader* header,
const butil::EndPoint& remote_side,
const butil::IOBuf* content);
......@@ -130,7 +130,7 @@ void SerializeHttpRequest(butil::IOBuf* request,
// Serialize a http response.
// header: may be modified in some cases
// content: cleared after usage. could be NULL.
void SerializeHttpResponse(butil::IOBuf* response,
void MakeRawHttpResponse(butil::IOBuf* response,
HttpHeader* header,
butil::IOBuf* content);
......
......@@ -504,18 +504,6 @@ static void GlobalInitializeOrDieImpl() {
}
#endif
// grpc protocol is based on http2
Protocol grpc_protocol = { ParseH2Message,
SerializeHttpRequest, PackH2Request,
ProcessHttpRequest, ProcessHttpResponse,
VerifyHttpRequest, ParseHttpServerAddress,
GetHttpMethodName,
CONNECTION_TYPE_SINGLE,
"grpc" };
if (RegisterProtocol(PROTOCOL_GRPC, grpc_protocol) != 0) {
exit(1);
}
// Only valid at client side
Protocol ubrpc_compack_protocol = {
ParseNsheadMessage,
......
......@@ -24,6 +24,30 @@
namespace brpc {
const char* GrpcStatusToString(GrpcStatus s) {
switch (s) {
case GRPC_OK: return "GRPC_OK";
case GRPC_CANCELED: return "GRPC_CANCELED";
case GRPC_UNKNOWN: return "GRPC_UNKNOWN";
case GRPC_INVALIDARGUMENT: return "GRPC_INVALIDARGUMENT";
case GRPC_DEADLINEEXCEEDED: return "GRPC_DEADLINEEXCEEDED";
case GRPC_NOTFOUND: return "GRPC_NOTFOUND";
case GRPC_ALREADYEXISTS: return "GRPC_ALREADYEXISTS";
case GRPC_PERMISSIONDENIED: return "GRPC_PERMISSIONDENIED";
case GRPC_RESOURCEEXHAUSTED: return "GRPC_RESOURCEEXHAUSTED";
case GRPC_FAILEDPRECONDITION: return "GRPC_FAILEDPRECONDITION";
case GPRC_ABORTED: return "GPRC_ABORTED";
case GRPC_OUTOFRANGE: return "GRPC_OUTOFRANGE";
case GRPC_UNIMPLEMENTED: return "GRPC_UNIMPLEMENTED";
case GRPC_INTERNAL: return "GRPC_INTERNAL";
case GRPC_UNAVAILABLE: return "GRPC_UNAVAILABLE";
case GRPC_DATALOSS: return "GRPC_DATALOSS";
case GRPC_UNAUTHENTICATED: return "GRPC_UNAUTHENTICATED";
case GRPC_MAX: return "GRPC_MAX";
}
return "Unknown-GrpcStatus";
}
GrpcStatus ErrorCodeToGrpcStatus(int error_code) {
switch (error_code) {
case 0:
......
......@@ -142,8 +142,11 @@ enum GrpcStatus {
GRPC_MAX,
};
GrpcStatus ErrorCodeToGrpcStatus(int error_code);
// Get description of the error.
const char* GrpcStatusToString(GrpcStatus);
// Convert between error code and grpc status with similar semantics
GrpcStatus ErrorCodeToGrpcStatus(int error_code);
int GrpcStatusToErrorCode(GrpcStatus grpc_status);
void PercentEncode(const std::string& str, std::string* str_out);
......
......@@ -154,8 +154,6 @@ friend void policy::ProcessHttpRequest(InputMessageBase *msg);
std::string _content_type;
std::string _unresolved_path;
std::pair<int, int> _version;
int _h2_stream_id;
H2Error _h2_error;
};
const HttpHeader& DefaultHttpHeader();
......
......@@ -47,7 +47,6 @@ enum ProtocolType {
PROTOCOL_CDS_AGENT = 24; // Client side only
PROTOCOL_ESP = 25; // Client side only
PROTOCOL_HTTP2 = 26;
PROTOCOL_GRPC = 27;
}
enum CompressType {
......
......@@ -1592,8 +1592,7 @@ size_t H2UnsentRequest::EstimatedByteSize() {
return sz;
}
void H2UnsentRequest::Describe(butil::IOBuf* desc) const {
butil::IOBufBuilder os;
void H2UnsentRequest::Print(std::ostream& os) const {
os << "[ H2 REQUEST @" << butil::my_ip() << " ]\n";
for (size_t i = 0; i < _size; ++i) {
os << "> " << _list[i].name << " = " << _list[i].value << '\n';
......@@ -1613,33 +1612,23 @@ void H2UnsentRequest::Describe(butil::IOBuf* desc) const {
if (!body->empty()) {
os << "> \n";
}
os.move_to(*desc);
if (body->size() > (size_t)FLAGS_http_verbose_max_body_length) {
size_t nskipped = body->size() - (size_t)FLAGS_http_verbose_max_body_length;
body->append_to(desc, FLAGS_http_verbose_max_body_length);
if (nskipped) {
char str[48];
snprintf(str, sizeof(str), "\n<skipped %" PRIu64 " bytes>", nskipped);
desc->append(str);
}
} else {
desc->append(*body);
}
os << butil::BinaryPrinter(*body, FLAGS_http_verbose_max_body_length);
}
H2UnsentResponse::H2UnsentResponse(Controller* c, int stream_id, bool grpc)
H2UnsentResponse::H2UnsentResponse(Controller* c, int stream_id, bool is_grpc)
: _size(0)
, _stream_id(stream_id)
, _http_response(c->release_http_response())
, _grpc(grpc) {
, _is_grpc(is_grpc) {
_data.swap(c->response_attachment());
if (grpc) {
if (is_grpc) {
_grpc_status = ErrorCodeToGrpcStatus(c->ErrorCode());
PercentEncode(c->ErrorText(), &_grpc_message);
}
}
H2UnsentResponse* H2UnsentResponse::New(Controller* c, int stream_id, bool grpc) {
H2UnsentResponse* H2UnsentResponse::New(Controller* c, int stream_id, bool is_grpc) {
const HttpHeader* const h = &c->http_response();
const CommonStrings* const common = get_common_strings();
const bool need_content_length =
......@@ -1650,7 +1639,7 @@ H2UnsentResponse* H2UnsentResponse::New(Controller* c, int stream_id, bool grpc)
+ (size_t)need_content_type;
const size_t memsize = offsetof(H2UnsentResponse, _list) +
sizeof(HPacker::Header) * maxsize;
H2UnsentResponse* msg = new (malloc(memsize)) H2UnsentResponse(c, stream_id, grpc);
H2UnsentResponse* msg = new (malloc(memsize)) H2UnsentResponse(c, stream_id, is_grpc);
// :status
if (h->status_code() == 200) {
msg->push(common->H2_STATUS, common->STATUS_200);
......@@ -1720,18 +1709,17 @@ H2UnsentResponse::AppendAndDestroySelf(butil::IOBuf* out, Socket* socket) {
butil::IOBuf frag;
appender.move_to(frag);
butil::IOBufAppender trailer_appender;
butil::IOBuf trailer_frag;
if (_grpc) {
if (_is_grpc) {
HPacker::Header status_header("grpc-status",
butil::string_printf("%d", _grpc_status));
hpacker.Encode(&trailer_appender, status_header, options);
hpacker.Encode(&appender, status_header, options);
if (!_grpc_message.empty()) {
HPacker::Header msg_header("grpc-message", _grpc_message);
hpacker.Encode(&trailer_appender, msg_header, options);
hpacker.Encode(&appender, msg_header, options);
}
appender.move_to(trailer_frag);
}
trailer_appender.move_to(trailer_frag);
PackH2Message(out, frag, trailer_frag, _data, _stream_id, ctx);
return butil::Status::OK();
......@@ -1752,8 +1740,7 @@ size_t H2UnsentResponse::EstimatedByteSize() {
return sz;
}
void H2UnsentResponse::Describe(butil::IOBuf* desc) const {
butil::IOBufBuilder os;
void H2UnsentResponse::Print(std::ostream& os) const {
os << "[ H2 RESPONSE @" << butil::my_ip() << " ]\n";
for (size_t i = 0; i < _size; ++i) {
os << "> " << _list[i].name << " = " << _list[i].value << '\n';
......@@ -1767,18 +1754,7 @@ void H2UnsentResponse::Describe(butil::IOBuf* desc) const {
if (!_data.empty()) {
os << "> \n";
}
os.move_to(*desc);
if (_data.size() > (size_t)FLAGS_http_verbose_max_body_length) {
size_t nskipped = _data.size() - (size_t)FLAGS_http_verbose_max_body_length;
_data.append_to(desc, FLAGS_http_verbose_max_body_length);
if (nskipped) {
char str[48];
snprintf(str, sizeof(str), "\n<skipped %" PRIu64 " bytes>", nskipped);
desc->append(str);
}
} else {
desc->append(_data);
}
os << butil::BinaryPrinter(_data, FLAGS_http_verbose_max_body_length);
}
void PackH2Request(butil::IOBuf*,
......@@ -1806,9 +1782,7 @@ void PackH2Request(butil::IOBuf*,
*user_message = h2_req;
if (FLAGS_http_verbose) {
butil::IOBuf desc;
h2_req->Describe(&desc);
std::cerr << desc << std::endl;
std::cerr << *h2_req << std::endl;
}
}
......
......@@ -135,7 +135,7 @@ friend void PackH2Request(butil::IOBuf*, SocketMessage**,
Controller*, const butil::IOBuf&, const Authenticator*);
public:
static H2UnsentRequest* New(Controller* c);
void Describe(butil::IOBuf*) const;
void Print(std::ostream&) const;
int AddRefManually()
{ return _nref.fetch_add(1, butil::memory_order_relaxed); }
......@@ -194,9 +194,9 @@ private:
class H2UnsentResponse : public SocketMessage {
public:
static H2UnsentResponse* New(Controller* c, int stream_id, bool grpc);
static H2UnsentResponse* New(Controller* c, int stream_id, bool is_grpc);
void Destroy();
void Describe(butil::IOBuf*) const;
void Print(std::ostream&) const;
// @SocketMessage
butil::Status AppendAndDestroySelf(butil::IOBuf* out, Socket*) override;
size_t EstimatedByteSize() override;
......@@ -208,7 +208,7 @@ private:
void push(const std::string& name, const std::string& value)
{ new (&_list[_size++]) HPacker::Header(name, value); }
H2UnsentResponse(Controller* c, int stream_id, bool grpc);
H2UnsentResponse(Controller* c, int stream_id, bool is_grpc);
~H2UnsentResponse() {}
H2UnsentResponse(const H2UnsentResponse&);
void operator=(const H2UnsentResponse&);
......@@ -218,7 +218,7 @@ private:
uint32_t _stream_id;
std::unique_ptr<HttpHeader> _http_response;
butil::IOBuf _data;
bool _grpc;
bool _is_grpc;
GrpcStatus _grpc_status;
std::string _grpc_message;
HPacker::Header _list[0];
......@@ -409,6 +409,15 @@ inline bool H2Context::RunOutStreams() const {
return (_last_client_stream_id > 0x7FFFFFFF);
}
inline std::ostream& operator<<(std::ostream& os, const H2UnsentRequest& req) {
req.Print(os);
return os;
}
inline std::ostream& operator<<(std::ostream& os, const H2UnsentResponse& res) {
res.Print(os);
return os;
}
} // namespace policy
} // namespace brpc
......
......@@ -24,6 +24,7 @@
#include "butil/string_splitter.h" // StringMultiSplitter
#include "butil/string_printf.h"
#include "butil/time.h"
#include "butil/sys_byteorder.h"
#include "brpc/compress.h"
#include "brpc/errno.pb.h" // ENOSERVICE, ENOMETHOD
#include "brpc/controller.h" // Controller
......@@ -111,7 +112,6 @@ CommonStrings::CommonStrings()
, ACCEPT_ENCODING("accept-encoding")
, CONTENT_ENCODING("content-encoding")
, CONTENT_LENGTH("content-length")
, IDENTITY("identity")
, GZIP("gzip")
, CONNECTION("connection")
, KEEP_ALIVE("keep-alive")
......@@ -129,11 +129,11 @@ CommonStrings::CommonStrings()
, H2_METHOD(":method")
, METHOD_GET("GET")
, METHOD_POST("POST")
, CONTENT_TYPE_GRPC("application/grpc")
, TE("te")
, TRAILERS("trailers")
, GRPC_ENCODING("grpc-encoding")
, GRPC_ACCEPT_ENCODING("grpc-accept-encoding")
, GRPC_ACCEPT_ENCODING_VALUE("identity,gzip")
, GRPC_STATUS("grpc-status")
, GRPC_MESSAGE("grpc-message")
{}
......@@ -150,35 +150,46 @@ int InitCommonStrings() {
static const int ALLOW_UNUSED force_creation_of_common = InitCommonStrings();
const CommonStrings* get_common_strings() { return common; }
HttpContentType ParseContentType(butil::StringPiece content_type) {
const butil::StringPiece prefix = "application/";
const butil::StringPiece json = "json";
const butil::StringPiece proto = "proto";
const butil::StringPiece grpc = "grpc";
HttpContentType ParseContentType(butil::StringPiece ct, bool* is_grpc_ct) {
// According to http://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html#sec3.7
// media-type = type "/" subtype *( ";" parameter )
// type = token
// subtype = token
if (!content_type.starts_with(prefix)) {
const butil::StringPiece prefix = "application/";
if (!ct.starts_with(prefix)) {
return HTTP_CONTENT_OTHERS;
}
content_type.remove_prefix(prefix.size());
ct.remove_prefix(prefix.size());
if (ct.starts_with("grpc")) {
if (ct.size() == (size_t)4 || ct[4] == ';') {
if (is_grpc_ct) {
*is_grpc_ct = true;
}
// assume that the default content type for grpc is proto.
return HTTP_CONTENT_PROTO;
} else if (ct[4] == '+') {
ct.remove_prefix(5);
if (is_grpc_ct) {
*is_grpc_ct = true;
}
}
// else don't change ct. Note that "grpcfoo" is a valid but non-grpc
// content-type in the sense of format.
}
HttpContentType type = HTTP_CONTENT_OTHERS;
if (content_type.starts_with(json)) {
if (ct.starts_with("json")) {
type = HTTP_CONTENT_JSON;
content_type.remove_prefix(json.size());
} else if (content_type.starts_with(proto)) {
ct.remove_prefix(4);
} else if (ct.starts_with("proto")) {
type = HTTP_CONTENT_PROTO;
content_type.remove_prefix(proto.size());
} else if (content_type.starts_with(grpc)) {
type = HTTP_CONTENT_GRPC;
content_type.remove_prefix(grpc.size());
ct.remove_prefix(5);
} else {
return HTTP_CONTENT_OTHERS;
}
return content_type.empty() || content_type.front() == ';'
? type : HTTP_CONTENT_OTHERS;
return (ct.empty() || ct.front() == ';') ? type : HTTP_CONTENT_OTHERS;
}
static void PrintMessage(const butil::IOBuf& inbuf,
......@@ -201,32 +212,39 @@ static void PrintMessage(const butil::IOBuf& inbuf,
if (buf2.size() == last_size) {
buf2.pop_back(2); // remove "> "
}
if (!has_content) {
buf2.append(buf1);
if (!has_content) {
std::cerr << buf2 << std::endl;
} else {
uint64_t nskipped = 0;
if (buf1.size() > (size_t)FLAGS_http_verbose_max_body_length) {
nskipped = buf1.size() - (size_t)FLAGS_http_verbose_max_body_length;
buf1.pop_back(nskipped);
}
buf2.append(buf1);
if (nskipped) {
snprintf(str, sizeof(str), "\n<skipped %" PRIu64 " bytes>", nskipped);
buf2.append(str);
std::cerr << butil::PrintedAsBinary(
buf2, buf2.size() + FLAGS_http_verbose_max_body_length) << std::endl;
}
}
std::cerr << buf2 << std::endl;
}
inline uint32_t ReadBigEndian4Bytes(const void* void_buf) {
uint32_t ret = 0;
char* p = (char*)&ret;
const char* buf = (const char*)void_buf;
p[3] = buf[0];
p[2] = buf[1];
p[1] = buf[2];
p[0] = buf[3];
return ret;
static void AddGrpcPrefix(butil::IOBuf* body, bool compressed) {
char buf[5];
buf[0] = (compressed ? 1 : 0);
*(uint32_t*)(buf + 1) = butil::HostToNet32(body->size());
butil::IOBuf tmp_buf;
tmp_buf.append(buf, sizeof(buf));
tmp_buf.append(butil::IOBuf::Movable(*body));
body->swap(tmp_buf);
}
static bool RemoveGrpcPrefix(butil::IOBuf* body, bool* compressed) {
if (body->empty()) {
*compressed = false;
return true;
}
const size_t sz = body->size();
if (sz < (size_t)5) {
return false;
}
char buf[5];
body->cutn(buf, sizeof(buf));
*compressed = buf[0];
const size_t message_length = butil::NetToHost32(*(uint32_t*)(buf + 1));
return (message_length + 5 == sz);
}
void ProcessHttpResponse(InputMessageBase* msg) {
......@@ -271,18 +289,11 @@ void ProcessHttpResponse(InputMessageBase* msg) {
CHECK(cntl->response_attachment().empty());
const int saved_error = cntl->ErrorCode();
char grpc_compressed = false;
bool grpc_protocol =
ParseContentType(res_header->content_type()) == HTTP_CONTENT_GRPC;
if (grpc_protocol && !res_body.empty()) {
/* 4 is the size of grpc Message-Length in Length-Prefixed-Message*/
char buf[4];
res_body.cut1(&grpc_compressed);
res_body.cutn(buf, 4);
int message_length = ReadBigEndian4Bytes(buf);
CHECK((size_t)message_length == res_body.length()) << message_length
<< " vs " << res_body.length();
}
bool is_grpc_ct = false;
const HttpContentType content_type =
ParseContentType(res_header->content_type(), &is_grpc_ct);
const bool is_grpc = (is_http2 && is_grpc_ct);
bool grpc_compressed = false; // only valid when is_grpc is true.
do {
if (!is_http2) {
......@@ -297,8 +308,31 @@ void ProcessHttpResponse(InputMessageBase* msg) {
socket->SetFailed();
}
}
} else if (is_grpc) {
if (!RemoveGrpcPrefix(&res_body, &grpc_compressed)) {
cntl->SetFailed(ERESPONSE, "Invalid gRPC response");
break;
}
const std::string* grpc_status = res_header->GetHeader(common->GRPC_STATUS);
if (grpc_status) {
// TODO: More strict parsing
GrpcStatus status = (GrpcStatus)strtol(grpc_status->data(), NULL, 10);
if (status != GRPC_OK) {
const std::string* grpc_message =
res_header->GetHeader(common->GRPC_MESSAGE);
if (grpc_message) {
std::string message_decoded;
PercentDecode(*grpc_message, &message_decoded);
cntl->SetFailed(GrpcStatusToErrorCode(status), "%s",
message_decoded.c_str());
} else {
cntl->SetFailed(GrpcStatusToErrorCode(status), "%s",
GrpcStatusToString(status));
}
break;
}
}
}
if (imsg_guard->read_body_progressively()) {
// Set RPA if needed
......@@ -356,52 +390,27 @@ void ProcessHttpResponse(InputMessageBase* msg) {
}
break;
}
if (grpc_protocol) {
const std::string* grpc_status =
res_header->GetHeader(common->GRPC_STATUS);
const std::string* grpc_message =
res_header->GetHeader(common->GRPC_MESSAGE);
if (grpc_status) {
GrpcStatus status = (GrpcStatus)strtol(grpc_status->data(), NULL, 10);
if (status != GRPC_OK) {
std::string message_decoded;
if (grpc_message) {
PercentDecode(*grpc_message, &message_decoded);
} else {
message_decoded = "Unknown grpc error";
}
cntl->SetFailed(GrpcStatusToErrorCode(status),
"%s", message_decoded.c_str());
break;
}
}
}
if (cntl->response() == NULL ||
cntl->response()->GetDescriptor()->field_count() == 0) {
// a http call, content is the "real response".
cntl->response_attachment().swap(res_body);
break;
}
const HttpContentType content_type =
ParseContentType(res_header->content_type());
if (content_type != HTTP_CONTENT_PROTO &&
content_type != HTTP_CONTENT_JSON &&
content_type != HTTP_CONTENT_GRPC) {
cntl->SetFailed(ERESPONSE, "content-type=%s is neither %s nor %s or %s"
"when response is not NULL",
res_header->content_type().c_str(),
common->CONTENT_TYPE_JSON.c_str(),
common->CONTENT_TYPE_PROTO.c_str(),
common->CONTENT_TYPE_GRPC.c_str());
const std::string* encoding = NULL;
if (is_grpc) {
if (grpc_compressed) {
encoding = res_header->GetHeader(common->GRPC_ENCODING);
if (encoding == NULL) {
cntl->SetFailed(ERESPONSE, "Fail to find header `grpc-encoding'"
" in compressed gRPC response");
break;
}
const std::string* encoding =
res_header->GetHeader(common->CONTENT_ENCODING);
const std::string* grpc_encoding =
res_header->GetHeader(common->GRPC_ENCODING);
if ((encoding != NULL && *encoding == common->GZIP) ||
(grpc_compressed && grpc_encoding != NULL && *grpc_encoding ==
common->GZIP)) {
}
} else {
encoding = res_header->GetHeader(common->CONTENT_ENCODING);
}
if (encoding != NULL && *encoding == common->GZIP) {
TRACEPRINTF("Decompressing response=%lu",
(unsigned long)res_body.size());
butil::IOBuf uncompressed;
......@@ -411,13 +420,12 @@ void ProcessHttpResponse(InputMessageBase* msg) {
}
res_body.swap(uncompressed);
}
if (content_type == HTTP_CONTENT_PROTO ||
content_type == HTTP_CONTENT_GRPC) {
if (content_type == HTTP_CONTENT_PROTO) {
if (!ParsePbFromIOBuf(cntl->response(), res_body)) {
cntl->SetFailed(ERESPONSE, "Fail to parse content");
break;
}
} else {
} else if (content_type == HTTP_CONTENT_JSON) {
// message body is json
butil::IOBufAsZeroCopyInputStream wrapper(res_body);
std::string err;
......@@ -427,6 +435,11 @@ void ProcessHttpResponse(InputMessageBase* msg) {
cntl->SetFailed(ERESPONSE, "Fail to parse content, %s", err.c_str());
break;
}
} else {
cntl->SetFailed(ERESPONSE,
"Unknown content-type=%s when response is not NULL",
res_header->content_type().c_str());
break;
}
} while (0);
......@@ -436,17 +449,11 @@ void ProcessHttpResponse(InputMessageBase* msg) {
accessor.OnResponse(cid, saved_error);
}
inline void WriteBigEndian4Bytes(char* buf, uint32_t val) {
const char* p = (const char*)&val;
buf[0] = p[3];
buf[1] = p[2];
buf[2] = p[1];
buf[3] = p[0];
}
void SerializeHttpRequest(butil::IOBuf* /*not used*/,
Controller* cntl,
const google::protobuf::Message* request) {
const bool is_http2 = (cntl->request_protocol() == PROTOCOL_HTTP2);
bool is_grpc = false;
if (request != NULL) {
// If request is not NULL, message body will be serialized json,
if (!request->IsInitialized()) {
......@@ -459,18 +466,18 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/,
"when request is not NULL");
}
butil::IOBufAsZeroCopyOutputStream wrapper(&cntl->request_attachment());
bool is_grpc_ct = false;
const HttpContentType content_type
= ParseContentType(cntl->http_request().content_type());
if (content_type == HTTP_CONTENT_PROTO ||
cntl->request_protocol() == PROTOCOL_GRPC) {
= ParseContentType(cntl->http_request().content_type(), &is_grpc_ct);
is_grpc = (is_http2 && is_grpc_ct);
if (content_type == HTTP_CONTENT_PROTO) {
// Serialize content as protobuf
if (!request->SerializeToZeroCopyStream(&wrapper)) {
cntl->request_attachment().clear();
return cntl->SetFailed(EREQUEST, "Fail to serialize %s",
request->GetTypeName().c_str());
}
} else {
// Serialize content as json
} else { // Serialize content as json
std::string err;
json2pb::Pb2JsonOptions opt;
opt.bytes_to_base64 = cntl->has_pb_bytes_to_base64();
......@@ -509,7 +516,7 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/,
butil::IOBuf compressed;
if (GzipCompress(cntl->request_attachment(), &compressed, NULL)) {
cntl->request_attachment().swap(compressed);
if (cntl->request_protocol() == PROTOCOL_GRPC) {
if (is_grpc) {
grpc_compressed = true;
cntl->http_request().SetHeader(common->GRPC_ENCODING, common->GZIP);
} else {
......@@ -529,37 +536,26 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/,
"%llu", (unsigned long long)cntl->log_id()));
}
if (!is_http2) {
// HTTP before 1.1 needs to set keep-alive explicitly.
if (header->before_http_1_1() &&
cntl->connection_type() != CONNECTION_TYPE_SHORT &&
header->GetHeader(common->CONNECTION) == NULL) {
header->SetHeader(common->CONNECTION, common->KEEP_ALIVE);
}
if (cntl->request_protocol() == PROTOCOL_HTTP2 ||
cntl->request_protocol() == PROTOCOL_GRPC) {
} else {
cntl->set_stream_creator(get_h2_global_stream_creator());
}
if (cntl->request_protocol() == PROTOCOL_GRPC) {
// always tell client gzip support
// TODO(zhujiashun): add zlib and snappy?
if (is_grpc) {
/*
header->SetHeader(common->GRPC_ACCEPT_ENCODING,
common->IDENTITY + "," + common->GZIP);
header->set_content_type(common->CONTENT_TYPE_GRPC);
common->GRPC_ACCEPT_ENCODING_VALUE);
*/
// TODO: do we need this?
header->SetHeader(common->TE, common->TRAILERS);
butil::IOBuf tmp_buf;
// Compressed-Flag as 0 / 1, encoded as 1 byte unsigned integer
if (grpc_compressed) {
tmp_buf.append("\1", 1);
} else {
tmp_buf.append("\0", 1);
// Append compressed and length before body
AddGrpcPrefix(&cntl->request_attachment(), grpc_compressed);
}
char size_buf[4];
WriteBigEndian4Bytes(size_buf, cntl->request_attachment().size());
tmp_buf.append(size_buf, 4);
tmp_buf.append(cntl->request_attachment());
cntl->request_attachment().swap(tmp_buf);
}
// Set url to /ServiceName/MethodName when we're about to call protobuf
......@@ -612,7 +608,7 @@ void PackHttpRequest(butil::IOBuf* buf,
// may not echo back this field. But we send it anyway.
accessor.get_sending_socket()->set_correlation_id(correlation_id);
SerializeHttpRequest(buf, header, cntl->remote_side(),
MakeRawHttpRequest(buf, header, cntl->remote_side(),
&cntl->request_attachment());
if (FLAGS_http_verbose) {
PrintMessage(*buf, true, true);
......@@ -622,10 +618,7 @@ void PackHttpRequest(butil::IOBuf* buf,
inline bool SupportGzip(Controller* cntl) {
const std::string* encodings =
cntl->http_request().GetHeader(common->ACCEPT_ENCODING);
const std::string* grpc_encodings =
cntl->http_request().GetHeader(common->GRPC_ACCEPT_ENCODING);
return (encodings && encodings->find(common->GZIP) != std::string::npos) ||
(grpc_encodings && grpc_encodings->find(common->GZIP) != std::string::npos);
return (encodings && encodings->find(common->GZIP) != std::string::npos);
}
class HttpResponseSender {
......@@ -692,6 +685,20 @@ HttpResponseSender::~HttpResponseSender() {
res_header->set_version(req_header->major_version(),
req_header->minor_version());
const std::string* content_type_str = &res_header->content_type();
if (content_type_str->empty()) {
// Use request's content_type if response's is not set.
content_type_str = &req_header->content_type();
res_header->set_content_type(*content_type_str);
}
// Notice that HTTP1 can have a header named `grpc-encoding' as well
// which should be treated as an user-defined header and ignored by
// the framework.
bool is_grpc_ct = false;
const HttpContentType content_type = ParseContentType(*content_type_str, &is_grpc_ct);
const bool is_http2 = req_header->is_http2();
const bool is_grpc = (is_http2 && is_grpc_ct);
// Convert response to json/proto if needed.
// Notice: Not check res->IsInitialized() which should be checked in the
// conversion function.
......@@ -704,20 +711,8 @@ HttpResponseSender::~HttpResponseSender() {
// ^ pb response in failed RPC is undefined, no need to convert.
butil::IOBufAsZeroCopyOutputStream wrapper(&cntl->response_attachment());
const std::string* content_type_str = &res_header->content_type();
if (content_type_str->empty()) {
content_type_str = &req_header->content_type();
}
const HttpContentType content_type = ParseContentType(*content_type_str);
if (content_type == HTTP_CONTENT_PROTO || content_type == HTTP_CONTENT_GRPC) {
if (res->SerializeToZeroCopyStream(&wrapper)) {
// Set content-type if user did not
if (res_header->content_type().empty()) {
res_header->set_content_type((content_type == HTTP_CONTENT_PROTO)?
common->CONTENT_TYPE_PROTO:
common->CONTENT_TYPE_GRPC);
}
} else {
if (content_type == HTTP_CONTENT_PROTO) {
if (!res->SerializeToZeroCopyStream(&wrapper)) {
cntl->SetFailed(ERESPONSE, "Fail to serialize %s", res->GetTypeName().c_str());
}
} else {
......@@ -728,12 +723,7 @@ HttpResponseSender::~HttpResponseSender() {
opt.enum_option = (FLAGS_pb_enum_as_number
? json2pb::OUTPUT_ENUM_BY_NUMBER
: json2pb::OUTPUT_ENUM_BY_NAME);
if (json2pb::ProtoMessageToJson(*res, &wrapper, opt, &err)) {
// Set content-type if user did not
if (res_header->content_type().empty()) {
res_header->set_content_type(common->CONTENT_TYPE_JSON);
}
} else {
if (!json2pb::ProtoMessageToJson(*res, &wrapper, opt, &err)) {
cntl->SetFailed(ERESPONSE, "Fail to convert response to json, %s", err.c_str());
}
}
......@@ -752,7 +742,7 @@ HttpResponseSender::~HttpResponseSender() {
// or the server sent a Connection: close response header. If such a
// response header exists, the client must close its end of the connection
// after receiving the response.
if (!req_header->is_http2()) {
if (!is_http2) {
const std::string* res_conn = res_header->GetHeader(common->CONNECTION);
if (res_conn == NULL || strcasecmp(res_conn->c_str(), "close") != 0) {
const std::string* req_conn =
......@@ -770,12 +760,15 @@ HttpResponseSender::~HttpResponseSender() {
}
} // else user explicitly set Connection:close, clients of
// HTTP 1.1/1.0/0.9 should all close the connection.
} else if (is_grpc) {
// status code is always 200 according to grpc protocol
res_header->set_status_code(HTTP_STATUS_OK);
}
bool grpc_compressed = false;
bool grpc_protocol =
ParseContentType(req_header->content_type()) == HTTP_CONTENT_GRPC;
bool grpc_compressed = false;
if (cntl->Failed()) {
cntl->response_attachment().clear();
if (!is_grpc) {
// Set status-code with default value(converted from error code)
// if user did not set it.
if (res_header->status_code() == HTTP_STATUS_OK) {
......@@ -790,8 +783,8 @@ HttpResponseSender::~HttpResponseSender() {
// body is error-text right now, remove the header.
res_header->RemoveHeader(common->CONTENT_ENCODING);
res_header->set_content_type(common->CONTENT_TYPE_TEXT);
cntl->response_attachment().clear();
cntl->response_attachment().append(cntl->ErrorText());
}
} else if (cntl->has_progressive_writer()) {
// Transfer-Encoding is supported since HTTP/1.1
if (res_header->major_version() < 2 && !res_header->before_http_1_1()) {
......@@ -803,16 +796,15 @@ HttpResponseSender::~HttpResponseSender() {
" ignored when CreateProgressiveAttachment() was called";
}
// not set_content to enable chunked mode.
} else {
if (cntl->response_compress_type() == COMPRESS_TYPE_GZIP) {
} else if (cntl->response_compress_type() == COMPRESS_TYPE_GZIP) {
const size_t response_size = cntl->response_attachment().size();
if (response_size >= (size_t)FLAGS_http_body_compress_threshold
&& SupportGzip(cntl)) {
&& (is_http2 || SupportGzip(cntl))) {
TRACEPRINTF("Compressing response=%lu", (unsigned long)response_size);
butil::IOBuf tmpbuf;
if (GzipCompress(cntl->response_attachment(), &tmpbuf, NULL)) {
cntl->response_attachment().swap(tmpbuf);
if (grpc_protocol) {
if (is_grpc) {
grpc_compressed = true;
res_header->SetHeader(common->GRPC_ENCODING, common->GZIP);
} else {
......@@ -823,56 +815,31 @@ HttpResponseSender::~HttpResponseSender() {
}
}
} else {
// TODO(gejun): Support snappy (grpc)
LOG_IF(ERROR, cntl->response_compress_type() != COMPRESS_TYPE_NONE)
<< "Unknown compress_type=" << cntl->response_compress_type()
<< ", skip compression.";
}
}
if (grpc_protocol) {
// status code is always 200 according to grpc protocol
res_header->set_status_code(HTTP_STATUS_OK);
res_header->set_content_type(common->CONTENT_TYPE_GRPC);
// always tell client gzip support
// TODO(zhujiashun): add zlib and snappy?
res_header->SetHeader(common->GRPC_ACCEPT_ENCODING,
common->IDENTITY + "," + common->GZIP);
if (!cntl->Failed()) {
// Encode Length-Prefixed-Message
butil::IOBuf tmp_buf;
// Compressed-Flag as 0 / 1, encoded as 1 byte unsigned integer
if (grpc_compressed) {
tmp_buf.append("\1", 1);
} else {
tmp_buf.append("\0", 1);
}
char size_buf[4];
WriteBigEndian4Bytes(size_buf, cntl->response_attachment().size());
tmp_buf.append(size_buf, 4);
tmp_buf.append(cntl->response_attachment());
cntl->response_attachment().swap(tmp_buf);
} else {
cntl->response_attachment().clear();
}
}
int rc = -1;
// Have the risk of unlimited pending responses, in which case, tell
// users to set max_concurrency.
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
if (req_header->is_http2()) {
if (is_http2) {
if (is_grpc) {
// Append compressed and length before body
AddGrpcPrefix(&cntl->response_attachment(), grpc_compressed);
}
SocketMessagePtr<H2UnsentResponse> h2_response(
H2UnsentResponse::New(cntl, _h2_stream_id, grpc_protocol));
H2UnsentResponse::New(cntl, _h2_stream_id, is_grpc));
if (h2_response == NULL) {
LOG(ERROR) << "Fail to make http2 response";
errno = EINVAL;
rc = -1;
} else {
if (FLAGS_http_verbose) {
butil::IOBuf desc;
h2_response->Describe(&desc);
std::cerr << desc << std::endl;
std::cerr << *h2_response << std::endl;
}
if (span) {
span->set_response_size(h2_response->EstimatedByteSize());
......@@ -885,7 +852,7 @@ HttpResponseSender::~HttpResponseSender() {
content = &cntl->response_attachment();
}
butil::IOBuf res_buf;
SerializeHttpResponse(&res_buf, res_header, content);
MakeRawHttpResponse(&res_buf, res_header, content);
if (FLAGS_http_verbose) {
PrintMessage(res_buf, false, !!content);
}
......@@ -1140,7 +1107,7 @@ ParseResult ParseHttpMessage(butil::IOBuf *source, Socket *socket,
butil::IOBuf bad_req;
HttpHeader header;
header.set_status_code(HTTP_STATUS_BAD_REQUEST);
SerializeHttpRequest(&bad_req, &header, socket->remote_side(), NULL);
MakeRawHttpRequest(&bad_req, &header, socket->remote_side(), NULL);
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
socket->Write(&bad_req, &wopt);
......@@ -1228,17 +1195,6 @@ void ProcessHttpRequest(InputMessageBase *msg) {
imsg_guard->header().Swap(req_header);
butil::IOBuf& req_body = imsg_guard->body();
char grpc_compressed = false;
if (ParseContentType(req_header.content_type()) == HTTP_CONTENT_GRPC &&
!req_body.empty()) {
/* 4 is the size of grpc Message-Length in Length-Prefixed-Message*/
char buf[4];
req_body.cut1(&grpc_compressed);
req_body.cutn(buf, 4);
int message_length = ReadBigEndian4Bytes(buf);
CHECK((size_t)message_length == req_body.length());
}
butil::EndPoint user_addr;
if (!GetUserAddressFromHeader(req_header, &user_addr)) {
user_addr = socket->remote_side();
......@@ -1302,7 +1258,7 @@ void ProcessHttpRequest(InputMessageBase *msg) {
span->set_remote_side(user_addr);
span->set_received_us(msg->received_us());
span->set_start_parse_us(start_parse_us);
span->set_protocol(req_header.is_http2() ? PROTOCOL_HTTP2 : PROTOCOL_HTTP);
span->set_protocol(is_http2 ? PROTOCOL_HTTP2 : PROTOCOL_HTTP);
span->set_request_size(imsg_guard->parsed_length());
}
......@@ -1420,13 +1376,31 @@ void ProcessHttpRequest(InputMessageBase *msg) {
return;
} // else all fields of the request are optional.
} else {
const std::string* encoding =
req_header.GetHeader(common->CONTENT_ENCODING);
const std::string* grpc_encoding =
req_header.GetHeader(common->GRPC_ENCODING);
if ((encoding != NULL && *encoding == common->GZIP) ||
(grpc_compressed && grpc_encoding != NULL && *grpc_encoding ==
common->GZIP)) {
bool is_grpc_ct = false;
const HttpContentType content_type =
ParseContentType(req_header.content_type(), &is_grpc_ct);
const std::string* encoding = NULL;
if (is_http2) {
if (is_grpc_ct) {
bool grpc_compressed = false;
if (!RemoveGrpcPrefix(&req_body, &grpc_compressed)) {
cntl->SetFailed(ERESPONSE, "Invalid gRPC response");
return;
}
if (grpc_compressed) {
encoding = req_header.GetHeader(common->GRPC_ENCODING);
if (encoding == NULL) {
cntl->SetFailed(
EREQUEST, "Fail to find header `grpc-encoding'"
" in compressed gRPC request");
return;
}
}
}
} else {
encoding = req_header.GetHeader(common->CONTENT_ENCODING);
}
if (encoding != NULL && *encoding == common->GZIP) {
TRACEPRINTF("Decompressing request=%lu",
(unsigned long)req_body.size());
butil::IOBuf uncompressed;
......@@ -1436,8 +1410,7 @@ void ProcessHttpRequest(InputMessageBase *msg) {
}
req_body.swap(uncompressed);
}
HttpContentType content_type = ParseContentType(req_header.content_type());
if (content_type == HTTP_CONTENT_PROTO || content_type == HTTP_CONTENT_GRPC) {
if (content_type == HTTP_CONTENT_PROTO) {
if (!ParsePbFromIOBuf(req, req_body)) {
cntl->SetFailed(EREQUEST, "Fail to parse http body as %s",
req->GetDescriptor()->full_name().c_str());
......
......@@ -41,7 +41,6 @@ struct CommonStrings {
std::string ACCEPT_ENCODING;
std::string CONTENT_ENCODING;
std::string CONTENT_LENGTH;
std::string IDENTITY;
std::string GZIP;
std::string CONNECTION;
std::string KEEP_ALIVE;
......@@ -69,6 +68,7 @@ struct CommonStrings {
std::string TRAILERS;
std::string GRPC_ENCODING;
std::string GRPC_ACCEPT_ENCODING;
std::string GRPC_ACCEPT_ENCODING_VALUE;
std::string GRPC_STATUS;
std::string GRPC_MESSAGE;
......@@ -138,13 +138,14 @@ enum HttpContentType {
HTTP_CONTENT_OTHERS = 0,
HTTP_CONTENT_JSON = 1,
HTTP_CONTENT_PROTO = 2,
HTTP_CONTENT_GRPC = 3
};
HttpContentType ParseContentType(butil::StringPiece content_type);
// Parse from the textual content type. One type may have more than one literals.
// Returns a numerical type. *is_grpc_ct is set to true if the content-type is
// set by gRPC.
HttpContentType ParseContentType(butil::StringPiece content_type, bool* is_grpc_ct);
} // namespace policy
} // namespace brpc
#endif // BRPC_POLICY_HTTP_RPC_PROTOCOL_H
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment