Commit 59361655 authored by zyearn's avatar zyearn Committed by zhujiashun

* server can transfer grpc error code and message

* client can receive grpc error code and message
* add percent encode/decode
parent 3ae31a91
...@@ -68,6 +68,8 @@ int main(int argc, char* argv[]) { ...@@ -68,6 +68,8 @@ int main(int argc, char* argv[]) {
// Because `done'(last parameter) is NULL, this function waits until // Because `done'(last parameter) is NULL, this function waits until
// the response comes back or error occurs(including timedout). // the response comes back or error occurs(including timedout).
stub.SayHello(&cntl, &request, &response, NULL); stub.SayHello(&cntl, &request, &response, NULL);
//cntl.http_request().uri() = FLAGS_server;
//channel.CallMethod(NULL, &cntl, &request, &response, NULL);
if (!cntl.Failed()) { if (!cntl.Failed()) {
LOG(INFO) << "Received response from " << cntl.remote_side() LOG(INFO) << "Received response from " << cntl.remote_side()
<< " to " << cntl.local_side() << " to " << cntl.local_side()
...@@ -75,7 +77,10 @@ int main(int argc, char* argv[]) { ...@@ -75,7 +77,10 @@ int main(int argc, char* argv[]) {
<< cntl.response_attachment() << ")" << cntl.response_attachment() << ")"
<< " latency=" << cntl.latency_us() << "us"; << " latency=" << cntl.latency_us() << "us";
} else { } else {
LOG(WARNING) << cntl.ErrorText(); LOG(WARNING) << cntl.ErrorCode() << ": " << cntl.ErrorText()
<< ", grpc-status=" << cntl.grpc_status()
<< ", grpc-message=" << cntl.grpc_message();
} }
usleep(FLAGS_interval_ms * 1000L); usleep(FLAGS_interval_ms * 1000L);
} }
......
...@@ -46,9 +46,10 @@ public: ...@@ -46,9 +46,10 @@ public:
if (FLAGS_gzip) { if (FLAGS_gzip) {
cntl->set_response_compress_type(brpc::COMPRESS_TYPE_GZIP); cntl->set_response_compress_type(brpc::COMPRESS_TYPE_GZIP);
} }
std::string prefix("Hello ");
LOG(INFO) << "req=" << req->name(); LOG(INFO) << "req=" << req->name();
res->set_message(prefix + req->name()); res->set_message("Hello " + req->name());
// If an error happens, use controller::set_grpc_error_code to set errors
// e.g., cntl->set_grpc_error_code(brpc::GRPC_RESOURCEEXHAUSTED, "test grpc message");
} }
}; };
......
...@@ -254,6 +254,8 @@ void Controller::InternalReset(bool in_constructor) { ...@@ -254,6 +254,8 @@ void Controller::InternalReset(bool in_constructor) {
_response_stream = INVALID_STREAM_ID; _response_stream = INVALID_STREAM_ID;
_remote_stream_settings = NULL; _remote_stream_settings = NULL;
_thrift_method_name = ""; _thrift_method_name = "";
_grpc_status = GRPC_OK;
_grpc_message = "";
} }
Controller::Call::Call(Controller::Call* rhs) Controller::Call::Call(Controller::Call* rhs)
......
...@@ -40,6 +40,7 @@ ...@@ -40,6 +40,7 @@
#include "brpc/callback.h" #include "brpc/callback.h"
#include "brpc/progressive_attachment.h" // ProgressiveAttachment #include "brpc/progressive_attachment.h" // ProgressiveAttachment
#include "brpc/progressive_reader.h" // ProgressiveReader #include "brpc/progressive_reader.h" // ProgressiveReader
#include "brpc/grpc.h"
// EAUTH is defined in MAC // EAUTH is defined in MAC
#ifndef EAUTH #ifndef EAUTH
...@@ -476,6 +477,13 @@ public: ...@@ -476,6 +477,13 @@ public:
// Get sock option. .e.g get vip info through ttm kernel module hook, // Get sock option. .e.g get vip info through ttm kernel module hook,
int GetSockOption(int level, int optname, void* optval, socklen_t* optlen); int GetSockOption(int level, int optname, void* optval, socklen_t* optlen);
void set_grpc_error_code(GrpcStatus status, const std::string& msg) {
_grpc_status = status;
_grpc_message = msg;
}
GrpcStatus grpc_status() { return _grpc_status; }
std::string grpc_message() { return _grpc_message; }
private: private:
struct CompletionInfo { struct CompletionInfo {
CallId id; // call_id of the corresponding request CallId id; // call_id of the corresponding request
...@@ -716,6 +724,10 @@ private: ...@@ -716,6 +724,10 @@ private:
// Thrift method name, only used when thrift protocol enabled // Thrift method name, only used when thrift protocol enabled
std::string _thrift_method_name; std::string _thrift_method_name;
uint32_t _thrift_seq_id;
GrpcStatus _grpc_status;
std::string _grpc_message;
}; };
// Advises the RPC system that the caller desires that the RPC call be // Advises the RPC system that the caller desires that the RPC call be
......
...@@ -134,6 +134,9 @@ public: ...@@ -134,6 +134,9 @@ public:
_cntl->add_flag(Controller::FLAGS_REQUEST_WITH_AUTH); _cntl->add_flag(Controller::FLAGS_REQUEST_WITH_AUTH);
} }
GrpcStatus grpc_status() { return _cntl->_grpc_status; }
std::string grpc_message() { return _cntl->_grpc_message; }
private: private:
Controller* _cntl; Controller* _cntl;
}; };
......
...@@ -8,7 +8,8 @@ enum Errno { ...@@ -8,7 +8,8 @@ enum Errno {
ENOSERVICE = 1001; // Service not found ENOSERVICE = 1001; // Service not found
ENOMETHOD = 1002; // Method not found ENOMETHOD = 1002; // Method not found
EREQUEST = 1003; // Bad Request EREQUEST = 1003; // Bad Request
ERPCAUTH = 1004; // Unauthorized, can't be called EAUTH directly which is defined in MACOSX ERPCAUTH = 1004; // Unauthorized, can't be called EAUTH
// directly which is defined in MACOSX
ETOOMANYFAILS = 1005; // Too many sub calls failed ETOOMANYFAILS = 1005; // Too many sub calls failed
EPCHANFINISH = 1006; // [Internal] ParallelChannel finished EPCHANFINISH = 1006; // [Internal] ParallelChannel finished
EBACKUPREQUEST = 1007; // Sending backup request EBACKUPREQUEST = 1007; // Sending backup request
...@@ -30,4 +31,5 @@ enum Errno { ...@@ -30,4 +31,5 @@ enum Errno {
ELIMIT = 2004; // Reached server's limit on resources ELIMIT = 2004; // Reached server's limit on resources
ECLOSE = 2005; // Close socket initiatively ECLOSE = 2005; // Close socket initiatively
EITP = 2006; // Failed Itp response EITP = 2006; // Failed Itp response
EGRPC = 2007; // Failed Grpc response
} }
// Copyright (c) 2018 Bilibili, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: Jiashun Zhu(zhujiashun@bilibili.com)
#include <sstream> // std::stringstream
#include <iomanip> // std::setw
#include "brpc/grpc.h"
#include "brpc/errno.pb.h"
#include "brpc/http_status_code.h"
#include "butil/logging.h"
namespace brpc {
// The mapping can be found in grpc-go internal/transport/http_util.go
GrpcStatus HttpStatus2GrpcStatus(int http_status) {
switch(http_status) {
case HTTP_STATUS_BAD_REQUEST:
return GRPC_INTERNAL;
case HTTP_STATUS_UNAUTHORIZED:
return GRPC_UNAUTHENTICATED;
case HTTP_STATUS_FORBIDDEN:
return GRPC_PERMISSIONDENIED;
case HTTP_STATUS_NOT_FOUND:
return GRPC_UNIMPLEMENTED;
case HTTP_STATUS_BAD_GATEWAY:
case HTTP_STATUS_SERVICE_UNAVAILABLE:
case HTTP_STATUS_GATEWAY_TIMEOUT:
return GRPC_UNAVAILABLE;
default:
return GRPC_UNKNOWN;
}
}
GrpcStatus ErrorCode2GrpcStatus(int error_code) {
switch (error_code) {
case ENOSERVICE:
case ENOMETHOD:
return GRPC_UNIMPLEMENTED;
case ERPCAUTH:
return GRPC_UNAUTHENTICATED;
case EREQUEST:
case EINVAL:
return GRPC_INVALIDARGUMENT;
case ELIMIT:
case ELOGOFF:
return GRPC_UNAVAILABLE;
case EPERM:
return GRPC_PERMISSIONDENIED;
case ERPCTIMEDOUT:
case ETIMEDOUT:
return GRPC_INTERNAL;
default:
return GRPC_UNKNOWN;
}
}
void percent_encode(const std::string& str, std::string* str_out) {
std::ostringstream escaped;
escaped.fill('0');
escaped << std::hex;
for (std::string::const_iterator it = str.begin();
it != str.end(); ++it) {
const std::string::value_type& c = *it;
if (c >= ' ' && c <= '~' && c != '%') {
escaped << c;
continue;
}
escaped << '%' << std::setw(2) << int((unsigned char) c);
}
if (str_out) {
*str_out = escaped.str();
}
}
static int hex_to_int(char c) {
if (c >= 'a' && c <= 'f') {
return c - 'a' + 10;
} else if (c >= 'A' && c <= 'F') {
return c - 'A' + 10;
} else if (c >= '0' && c <= '9') {
return c - '0';
}
return 0;
}
void percent_decode(const std::string& str, std::string* str_out) {
std::ostringstream unescaped;
for (std::string::const_iterator it = str.begin();
it != str.end(); ++it) {
const std::string::value_type& c = *it;
if (c == '%' && it + 2 < str.end()) {
int i1 = hex_to_int(*++it);
int i2 = hex_to_int(*++it);
unescaped << (char)(i1 * 16 + i2);
} else {
unescaped << c;
}
}
if (str_out) {
*str_out = unescaped.str();
}
}
} // namespace brpc
// Copyright (c) 2018 Bilibili, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: Jiashun Zhu(zhujiashun@bilibili.com)
#ifndef BRPC_GRPC_H
#define BRPC_GRPC_H
#include <map>
namespace brpc {
typedef std::map<std::string, std::string> TrailerMessage;
enum GrpcStatus {
// OK is returned on success.
GRPC_OK = 0,
// CANCELED indicates the operation was canceled (typically by the caller).
GRPC_CANCELED,
// Unknown error. An example of where this error may be returned is
// if a Status value received from another address space belongs to
// an error-space that is not known in this address space. Also
// errors raised by APIs that do not return enough error information
// may be converted to this error.
GRPC_UNKNOWN,
// INVALIDARGUMENT Indicates client specified an invalid argument.
// Note that this differs from FAILEDPRECONDITION. It indicates arguments
// that are problematic regardless of the state of the system
// (e.g., a malformed file name).
GRPC_INVALIDARGUMENT,
// DEADLINEEXCEEDED Means operation expired before completion.
// For operations that change the state of the system, this error may be
// returned even if the operation has completed successfully. For
// example, a successful response from a server could have been delayed
// long enough for the deadline to expire.
GRPC_DEADLINEEXCEEDED,
// NOTFOUND Means some requested entity (e.g., file or directory) was
// not found.
GRPC_NOTFOUND,
// ALREADYEXISTS Means an attempt to create an entity failed because one
// already exists.
GRPC_ALREADYEXISTS,
// PERMISSIONDENIED Indicates the caller does not have permission to
// execute the specified operation. It must not be used for rejections
// caused by exhausting some resource (use ResourceExhausted
// instead for those errors). It must not be
// used if the caller cannot be identified (use UNAUTHENTICATED
// instead for those errors).
GRPC_PERMISSIONDENIED,
// RESOURCEEXHAUSTED Indicates some resource has been exhausted, perhaps
// a per-user quota, or perhaps the entire file system is out of space.
GRPC_RESOURCEEXHAUSTED,
// FAILEDPRECONDITION indicates operation was rejected because the
// system is not in a state required for the operation's execution.
// For example, directory to be deleted may be non-empty, an rmdir
// operation is applied to a non-directory, etc.
//
// A litmus test that may help a service implementor in deciding
// between FAILEDPRECONDITION, Aborted, and Unavailable:
// (a) Use Unavailable if the client can retry just the failing call.
// (b) Use Aborted if the client should retry at a higher-level
// (e.g., restarting a read-modify-write sequence).
// (c) Use FAILEDPRECONDITION if the client should not retry until
// the system state has been explicitly fixed. E.g., if an "rmdir"
// fails because the directory is non-empty, FAILEDPRECONDITION
// should be returned since the client should not retry unless
// they have first fixed up the directory by deleting files from it.
// (d) Use FAILEDPRECONDITION if the client performs conditional
// REST Get/Update/Delete on a resource and the resource on the
// server does not match the condition. E.g., conflicting
// read-modify-write on the same resource.
GRPC_FAILEDPRECONDITION,
// ABORTED indicates the operation was aborted, typically due to a
// concurrency issue like sequencer check failures, transaction aborts,
// etc.
//
// See litmus test above for deciding between FAILEDPRECONDITION,
// Aborted, and Unavailable.
GPRC_ABORTED,
// OUTOFRANGE means operation was attempted past the valid range.
// E.g., seeking or reading past end of file.
//
// Unlike INVALIDARGUMENT, this error indicates a problem that may
// be fixed if the system state changes. For example, a 32-bit file
// system will generate INVALIDARGUMENT if asked to read at an
// offset that is not in the range [0,2^32-1], but it will generate
// OUTOFRANGE if asked to read from an offset past the current
// file size.
//
// There is a fair bit of overlap between FAILEDPRECONDITION and
// OUTOFRANGE. We recommend using OUTOFRANGE (the more specific
// error) when it applies so that callers who are iterating through
// a space can easily look for an OUTOFRANGE error to detect when
// they are done.
GRPC_OUTOFRANGE,
// UNIMPLEMENTED indicates operation is not implemented or not
// supported/enabled in this service.
GRPC_UNIMPLEMENTED,
// INTERNAL errors. Means some invariants expected by underlying
// system has been broken. If you see one of these errors,
// something is very broken.
GRPC_INTERNAL,
// UNAVAILABLE indicates the service is currently unavailable.
// This is a most likely a transient condition and may be corrected
// by retrying with a backoff.
//
// See litmus test above for deciding between FAILEDPRECONDITION,
// ABORTED, and UNAVAILABLE.
GRPC_UNAVAILABLE,
// DATALOSS indicates unrecoverable data loss or corruption.
GRPC_DATALOSS,
// UNAUTHENTICATED indicates the request does not have valid
// authentication credentials for the operation.
GRPC_UNAUTHENTICATED,
};
GrpcStatus HttpStatus2GrpcStatus(int http_status);
GrpcStatus ErrorCode2GrpcStatus(int error_code);
void percent_encode(const std::string& str, std::string* str_out);
void percent_decode(const std::string& str, std::string* str_out);
} // namespace brpc
#endif // BRPC_GRPC_H
...@@ -1206,6 +1206,8 @@ int H2StreamContext::ConsumeHeaders(butil::IOBufBytesIterator& it) { ...@@ -1206,6 +1206,8 @@ int H2StreamContext::ConsumeHeaders(butil::IOBufBytesIterator& it) {
if (rc == 0) { if (rc == 0) {
break; break;
} }
RPC_VLOG << "Header name: " << pair.name
<< ", header value: " << pair.value;
const char* const name = pair.name.c_str(); const char* const name = pair.name.c_str();
bool matched = false; bool matched = false;
if (name[0] == ':') { // reserved names if (name[0] == ':') { // reserved names
...@@ -1627,7 +1629,15 @@ void H2UnsentRequest::Describe(butil::IOBuf* desc) const { ...@@ -1627,7 +1629,15 @@ void H2UnsentRequest::Describe(butil::IOBuf* desc) const {
} }
} }
H2UnsentResponse* H2UnsentResponse::New(Controller* c, int stream_id) { H2UnsentResponse::H2UnsentResponse(Controller* c, const TrailerMessage& trailers)
: _size(0)
, _stream_id(c->http_request().h2_stream_id())
, _http_response(c->release_http_response())
, _trailers(trailers) {
_data.swap(c->response_attachment());
}
H2UnsentResponse* H2UnsentResponse::New(Controller* c, const TrailerMessage& trailers) {
const HttpHeader* const h = &c->http_response(); const HttpHeader* const h = &c->http_response();
const CommonStrings* const common = get_common_strings(); const CommonStrings* const common = get_common_strings();
const bool need_content_length = const bool need_content_length =
...@@ -1638,7 +1648,7 @@ H2UnsentResponse* H2UnsentResponse::New(Controller* c, int stream_id) { ...@@ -1638,7 +1648,7 @@ H2UnsentResponse* H2UnsentResponse::New(Controller* c, int stream_id) {
+ (size_t)need_content_type; + (size_t)need_content_type;
const size_t memsize = offsetof(H2UnsentResponse, _list) + const size_t memsize = offsetof(H2UnsentResponse, _list) +
sizeof(HPacker::Header) * maxsize; sizeof(HPacker::Header) * maxsize;
H2UnsentResponse* msg = new (malloc(memsize)) H2UnsentResponse(c, stream_id); H2UnsentResponse* msg = new (malloc(memsize)) H2UnsentResponse(c, trailers);
// :status // :status
if (h->status_code() == 200) { if (h->status_code() == 200) {
msg->push(common->H2_STATUS, common->STATUS_200); msg->push(common->H2_STATUS, common->STATUS_200);
...@@ -1710,14 +1720,12 @@ H2UnsentResponse::AppendAndDestroySelf(butil::IOBuf* out, Socket* socket) { ...@@ -1710,14 +1720,12 @@ H2UnsentResponse::AppendAndDestroySelf(butil::IOBuf* out, Socket* socket) {
butil::IOBufAppender trailer_appender; butil::IOBufAppender trailer_appender;
butil::IOBuf trailer_frag; butil::IOBuf trailer_frag;
if (_grpc_protocol) { for (TrailerMessage::iterator it = _trailers.begin();
// TODO(zhujiashun): how to decide status code and status message it != _trailers.end(); ++it) {
HPacker::Header status("grpc-status", "0"); HPacker::Header header(it->first, it->second);
hpacker.Encode(&trailer_appender, status, options); hpacker.Encode(&trailer_appender, header, options);
HPacker::Header message("grpc-message", "");
hpacker.Encode(&trailer_appender, message, options);
trailer_appender.move_to(trailer_frag);
} }
trailer_appender.move_to(trailer_frag);
PackH2Message(out, frag, trailer_frag, _data, _stream_id, ctx); PackH2Message(out, frag, trailer_frag, _data, _stream_id, ctx);
return butil::Status::OK(); return butil::Status::OK();
......
...@@ -194,7 +194,7 @@ private: ...@@ -194,7 +194,7 @@ private:
class H2UnsentResponse : public SocketMessage { class H2UnsentResponse : public SocketMessage {
public: public:
static H2UnsentResponse* New(Controller* c, int stream_id); static H2UnsentResponse* New(Controller* c, const TrailerMessage& trailers);
void Destroy(); void Destroy();
void Describe(butil::IOBuf*) const; void Describe(butil::IOBuf*) const;
// @SocketMessage // @SocketMessage
...@@ -208,14 +208,7 @@ private: ...@@ -208,14 +208,7 @@ private:
void push(const std::string& name, const std::string& value) void push(const std::string& name, const std::string& value)
{ new (&_list[_size++]) HPacker::Header(name, value); } { new (&_list[_size++]) HPacker::Header(name, value); }
H2UnsentResponse(Controller* c, int stream_id) H2UnsentResponse(Controller* c, const TrailerMessage& trailers);
: _size(0)
, _stream_id(stream_id)
, _http_response(c->release_http_response())
, _grpc_protocol(ParseContentType(c->http_request().content_type()) ==
HTTP_CONTENT_GRPC) {
_data.swap(c->response_attachment());
}
~H2UnsentResponse() {} ~H2UnsentResponse() {}
H2UnsentResponse(const H2UnsentResponse&); H2UnsentResponse(const H2UnsentResponse&);
void operator=(const H2UnsentResponse&); void operator=(const H2UnsentResponse&);
...@@ -225,7 +218,7 @@ private: ...@@ -225,7 +218,7 @@ private:
uint32_t _stream_id; uint32_t _stream_id;
std::unique_ptr<HttpHeader> _http_response; std::unique_ptr<HttpHeader> _http_response;
butil::IOBuf _data; butil::IOBuf _data;
bool _grpc_protocol; TrailerMessage _trailers;
HPacker::Header _list[0]; HPacker::Header _list[0];
}; };
......
...@@ -37,6 +37,7 @@ ...@@ -37,6 +37,7 @@
#include "brpc/policy/gzip_compress.h" #include "brpc/policy/gzip_compress.h"
#include "brpc/policy/http2_rpc_protocol.h" #include "brpc/policy/http2_rpc_protocol.h"
#include "brpc/details/usercode_backup_pool.h" #include "brpc/details/usercode_backup_pool.h"
#include "brpc/grpc.h"
extern "C" { extern "C" {
void bthread_assign_data(void* data); void bthread_assign_data(void* data);
...@@ -133,6 +134,8 @@ CommonStrings::CommonStrings() ...@@ -133,6 +134,8 @@ CommonStrings::CommonStrings()
, TRAILERS("trailers") , TRAILERS("trailers")
, GRPC_ENCODING("grpc-encoding") , GRPC_ENCODING("grpc-encoding")
, GRPC_ACCEPT_ENCODING("grpc-accept-encoding") , GRPC_ACCEPT_ENCODING("grpc-accept-encoding")
, GRPC_STATUS("grpc-status")
, GRPC_MESSAGE("grpc-message")
{} {}
static CommonStrings* common = NULL; static CommonStrings* common = NULL;
...@@ -268,12 +271,13 @@ void ProcessHttpResponse(InputMessageBase* msg) { ...@@ -268,12 +271,13 @@ void ProcessHttpResponse(InputMessageBase* msg) {
CHECK(cntl->response_attachment().empty()); CHECK(cntl->response_attachment().empty());
const int saved_error = cntl->ErrorCode(); const int saved_error = cntl->ErrorCode();
char compressed_grpc = false; char grpc_compressed = false;
if (ParseContentType(res_header->content_type()) == HTTP_CONTENT_GRPC && bool grpc_protocol =
!res_body.empty()) { ParseContentType(res_header->content_type()) == HTTP_CONTENT_GRPC;
if (grpc_protocol && !res_body.empty()) {
/* 4 is the size of grpc Message-Length in Length-Prefixed-Message*/ /* 4 is the size of grpc Message-Length in Length-Prefixed-Message*/
char buf[4]; char buf[4];
res_body.cut1(&compressed_grpc); res_body.cut1(&grpc_compressed);
res_body.cutn(buf, 4); res_body.cutn(buf, 4);
int message_length = ReadBigEndian4Bytes(buf); int message_length = ReadBigEndian4Bytes(buf);
CHECK(message_length == res_body.length()) << message_length CHECK(message_length == res_body.length()) << message_length
...@@ -295,6 +299,7 @@ void ProcessHttpResponse(InputMessageBase* msg) { ...@@ -295,6 +299,7 @@ void ProcessHttpResponse(InputMessageBase* msg) {
} }
} }
if (imsg_guard->read_body_progressively()) { if (imsg_guard->read_body_progressively()) {
// Set RPA if needed // Set RPA if needed
accessor.set_readable_progressive_attachment(imsg_guard.get()); accessor.set_readable_progressive_attachment(imsg_guard.get());
...@@ -375,7 +380,7 @@ void ProcessHttpResponse(InputMessageBase* msg) { ...@@ -375,7 +380,7 @@ void ProcessHttpResponse(InputMessageBase* msg) {
const std::string* grpc_encoding = const std::string* grpc_encoding =
res_header->GetHeader(common->GRPC_ENCODING); res_header->GetHeader(common->GRPC_ENCODING);
if ((encoding != NULL && *encoding == common->GZIP) || if ((encoding != NULL && *encoding == common->GZIP) ||
(compressed_grpc && grpc_encoding != NULL && *grpc_encoding == (grpc_compressed && grpc_encoding != NULL && *grpc_encoding ==
common->GZIP)) { common->GZIP)) {
TRACEPRINTF("Decompressing response=%lu", TRACEPRINTF("Decompressing response=%lu",
(unsigned long)res_body.size()); (unsigned long)res_body.size());
...@@ -386,7 +391,6 @@ void ProcessHttpResponse(InputMessageBase* msg) { ...@@ -386,7 +391,6 @@ void ProcessHttpResponse(InputMessageBase* msg) {
} }
res_body.swap(uncompressed); res_body.swap(uncompressed);
} }
// message body is json
if (content_type == HTTP_CONTENT_PROTO || if (content_type == HTTP_CONTENT_PROTO ||
content_type == HTTP_CONTENT_GRPC) { content_type == HTTP_CONTENT_GRPC) {
if (!ParsePbFromIOBuf(cntl->response(), res_body)) { if (!ParsePbFromIOBuf(cntl->response(), res_body)) {
...@@ -394,6 +398,7 @@ void ProcessHttpResponse(InputMessageBase* msg) { ...@@ -394,6 +398,7 @@ void ProcessHttpResponse(InputMessageBase* msg) {
break; break;
} }
} else { } else {
// message body is json
butil::IOBufAsZeroCopyInputStream wrapper(res_body); butil::IOBufAsZeroCopyInputStream wrapper(res_body);
std::string err; std::string err;
json2pb::Json2PbOptions options; json2pb::Json2PbOptions options;
...@@ -404,6 +409,40 @@ void ProcessHttpResponse(InputMessageBase* msg) { ...@@ -404,6 +409,40 @@ void ProcessHttpResponse(InputMessageBase* msg) {
} }
} }
} while (0); } while (0);
do {
if (!grpc_protocol) {
break;
}
// begin to handle grpc case
const std::string* grpc_status = res_header->GetHeader(common->GRPC_STATUS);
const std::string* grpc_message = res_header->GetHeader(common->GRPC_MESSAGE);
if (grpc_status) {
GrpcStatus status = (GrpcStatus)strtol(grpc_status->data(), NULL, 10);
cntl->set_grpc_error_code(status, grpc_message? *grpc_message: "");
cntl->SetFailed(EGRPC, grpc_message? grpc_message->c_str(): "");
break;
}
// grpc-status is absent in http header, just convert error code
// to grpc status
if (!cntl->Failed()) {
break;
}
if (cntl->ErrorCode() == EHTTP) {
cntl->set_grpc_error_code(
HttpStatus2GrpcStatus(res_header->status_code()),
cntl->ErrorText());
// use empty string since gprc-status and grpc-message is absent
cntl->SetFailed(EGRPC, "");
break;
}
cntl->set_grpc_error_code(ErrorCode2GrpcStatus(cntl->ErrorCode()),
cntl->ErrorText());
// use empty string since gprc-status and grpc-message is absent
cntl->SetFailed(EGRPC, "");
} while (0);
// Unlocks correlation_id inside. Revert controller's // Unlocks correlation_id inside. Revert controller's
// error code if it version check of `cid' fails // error code if it version check of `cid' fails
imsg_guard.reset(); imsg_guard.reset();
...@@ -748,7 +787,8 @@ HttpResponseSender::~HttpResponseSender() { ...@@ -748,7 +787,8 @@ HttpResponseSender::~HttpResponseSender() {
bool grpc_compressed = false; bool grpc_compressed = false;
bool grpc_protocol = bool grpc_protocol =
ParseContentType(req_header->content_type()) == HTTP_CONTENT_GRPC; ParseContentType(req_header->content_type()) == HTTP_CONTENT_GRPC;
if (cntl->Failed()) {
if (cntl->Failed() && !grpc_protocol) {
// Set status-code with default value(converted from error code) // Set status-code with default value(converted from error code)
// if user did not set it. // if user did not set it.
if (res_header->status_code() == HTTP_STATUS_OK) { if (res_header->status_code() == HTTP_STATUS_OK) {
...@@ -802,12 +842,36 @@ HttpResponseSender::~HttpResponseSender() { ...@@ -802,12 +842,36 @@ HttpResponseSender::~HttpResponseSender() {
} }
} }
TrailerMessage trailers;
if (grpc_protocol) { if (grpc_protocol) {
// status code is always 200 according to grpc protocol
res_header->set_status_code(HTTP_STATUS_OK);
res_header->set_content_type(common->CONTENT_TYPE_GRPC);
GrpcStatus status = accessor.grpc_status();
std::string message = accessor.grpc_message();
if (status == GRPC_OK && cntl->Failed()) {
// In this case, request may not be handled by user-defined method or
// users use cntl.SetFailed to set grpc error instead of using
// cntl.set_grpc_error_code
status = ErrorCode2GrpcStatus(cntl->ErrorCode());
message = cntl->ErrorText();
}
std::string message_encoded;
percent_encode(message, &message_encoded);
trailers[common->GRPC_STATUS] = butil::string_printf("%d", status);
if (!message_encoded.empty()) {
trailers[common->GRPC_MESSAGE] = message_encoded;
}
// always tell client gzip support // always tell client gzip support
// TODO(zhujiashun): add zlib and snappy? // TODO(zhujiashun): add zlib and snappy?
res_header->SetHeader(common->GRPC_ACCEPT_ENCODING, res_header->SetHeader(common->GRPC_ACCEPT_ENCODING,
common->IDENTITY + "," + common->GZIP); common->IDENTITY + "," + common->GZIP);
if (status == GRPC_OK) {
butil::IOBuf tmp_buf; butil::IOBuf tmp_buf;
// Compressed-Flag as 0 / 1, encoded as 1 byte unsigned integer // Compressed-Flag as 0 / 1, encoded as 1 byte unsigned integer
if (grpc_compressed) { if (grpc_compressed) {
...@@ -820,8 +884,10 @@ HttpResponseSender::~HttpResponseSender() { ...@@ -820,8 +884,10 @@ HttpResponseSender::~HttpResponseSender() {
tmp_buf.append(size_buf, 4); tmp_buf.append(size_buf, 4);
tmp_buf.append(cntl->response_attachment()); tmp_buf.append(cntl->response_attachment());
cntl->response_attachment().swap(tmp_buf); cntl->response_attachment().swap(tmp_buf);
} else {
cntl->response_attachment().clear();
}
} }
int rc = -1; int rc = -1;
// Have the risk of unlimited pending responses, in which case, tell // Have the risk of unlimited pending responses, in which case, tell
...@@ -829,8 +895,8 @@ HttpResponseSender::~HttpResponseSender() { ...@@ -829,8 +895,8 @@ HttpResponseSender::~HttpResponseSender() {
Socket::WriteOptions wopt; Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true; wopt.ignore_eovercrowded = true;
if (req_header->is_http2()) { if (req_header->is_http2()) {
SocketMessagePtr<H2UnsentResponse> SocketMessagePtr<H2UnsentResponse> h2_response(
h2_response(H2UnsentResponse::New(cntl, _h2_stream_id)); H2UnsentResponse::New(cntl, trailers));
if (h2_response == NULL) { if (h2_response == NULL) {
LOG(ERROR) << "Fail to make http2 response"; LOG(ERROR) << "Fail to make http2 response";
errno = EINVAL; errno = EINVAL;
...@@ -1195,11 +1261,12 @@ void ProcessHttpRequest(InputMessageBase *msg) { ...@@ -1195,11 +1261,12 @@ void ProcessHttpRequest(InputMessageBase *msg) {
imsg_guard->header().Swap(req_header); imsg_guard->header().Swap(req_header);
butil::IOBuf& req_body = imsg_guard->body(); butil::IOBuf& req_body = imsg_guard->body();
char compressed_grpc = false; char grpc_compressed = false;
if (ParseContentType(req_header.content_type()) == HTTP_CONTENT_GRPC) { if (ParseContentType(req_header.content_type()) == HTTP_CONTENT_GRPC &&
!req_body.empty()) {
/* 4 is the size of grpc Message-Length in Length-Prefixed-Message*/ /* 4 is the size of grpc Message-Length in Length-Prefixed-Message*/
char buf[4]; char buf[4];
req_body.cut1(&compressed_grpc); req_body.cut1(&grpc_compressed);
req_body.cutn(buf, 4); req_body.cutn(buf, 4);
int message_length = ReadBigEndian4Bytes(buf); int message_length = ReadBigEndian4Bytes(buf);
CHECK(message_length == req_body.length()); CHECK(message_length == req_body.length());
...@@ -1391,7 +1458,7 @@ void ProcessHttpRequest(InputMessageBase *msg) { ...@@ -1391,7 +1458,7 @@ void ProcessHttpRequest(InputMessageBase *msg) {
const std::string* grpc_encoding = const std::string* grpc_encoding =
req_header.GetHeader(common->GRPC_ENCODING); req_header.GetHeader(common->GRPC_ENCODING);
if ((encoding != NULL && *encoding == common->GZIP) || if ((encoding != NULL && *encoding == common->GZIP) ||
(compressed_grpc && grpc_encoding != NULL && *grpc_encoding == (grpc_compressed && grpc_encoding != NULL && *grpc_encoding ==
common->GZIP)) { common->GZIP)) {
TRACEPRINTF("Decompressing request=%lu", TRACEPRINTF("Decompressing request=%lu",
(unsigned long)req_body.size()); (unsigned long)req_body.size());
......
...@@ -69,6 +69,8 @@ struct CommonStrings { ...@@ -69,6 +69,8 @@ struct CommonStrings {
std::string TRAILERS; std::string TRAILERS;
std::string GRPC_ENCODING; std::string GRPC_ENCODING;
std::string GRPC_ACCEPT_ENCODING; std::string GRPC_ACCEPT_ENCODING;
std::string GRPC_STATUS;
std::string GRPC_MESSAGE;
CommonStrings(); CommonStrings();
}; };
......
// Copyright (c) 2018 Bilibili, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: Jiashun Zhu(zhujiashun@bilibili.com)
#include <gtest/gtest.h>
#include <gflags/gflags.h>
#include "brpc/grpc.h"
int main(int argc, char* argv[]) {
testing::InitGoogleTest(&argc, argv);
GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
if (GFLAGS_NS::SetCommandLineOption("crash_on_fatal_log", "true").empty()) {
std::cerr << "Fail to set -crash_on_fatal_log" << std::endl;
return -1;
}
return RUN_ALL_TESTS();
}
namespace {
class GrpcTest : public ::testing::Test {
protected:
GrpcTest() {}
virtual ~GrpcTest() {};
virtual void SetUp() {};
virtual void TearDown() {};
};
TEST_F(GrpcTest, percent_encode) {
std::string out;
std::string s1("abcdefg !@#$^&*()/");
brpc::percent_encode(s1, &out);
EXPECT_TRUE(out == s1) << s1 << " vs " << out;
char s2_buf[] = "\0\0%\33\35 brpc";
std::string s2(s2_buf, sizeof(s2_buf) - 1);
std::string s2_expected_out("%00%00%25%1b%1d brpc");
brpc::percent_encode(s2, &out);
EXPECT_TRUE(out == s2_expected_out) << s2_expected_out << " vs " << out;
}
TEST_F(GrpcTest, percent_decode) {
std::string out;
std::string s1("abcdefg !@#$^&*()/");
brpc::percent_decode(s1, &out);
EXPECT_TRUE(out == s1) << s1 << " vs " << out;
std::string s2("%00%00%1b%1d brpc");
char s2_expected_out_buf[] = "\0\0\33\35 brpc";
std::string s2_expected_out(s2_expected_out_buf, sizeof(s2_expected_out_buf) - 1);
brpc::percent_decode(s2, &out);
EXPECT_TRUE(out == s2_expected_out) << s2_expected_out << " vs " << out;
}
} // namespace
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment