Commit a3d7af23 authored by zyearn's avatar zyearn Committed by zhujiashun

* add mapping from RST_STREAM h2eror to grpc error

* add grpc sanity and exception UT
parent 59361655
......@@ -67,6 +67,38 @@ GrpcStatus ErrorCode2GrpcStatus(int error_code) {
}
}
// The mapping can be found in
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#errors
GrpcStatus h2Error2GrpcStatus(H2Error h2_error) {
switch(h2_error) {
case H2_NO_ERROR:
case H2_PROTOCOL_ERROR:
case H2_INTERNAL_ERROR:
return GRPC_INTERNAL;
case H2_FLOW_CONTROL_ERROR:
return GRPC_RESOURCEEXHAUSTED;
case H2_SETTINGS_TIMEOUT:
case H2_STREAM_CLOSED_ERROR:
case H2_FRAME_SIZE_ERROR:
return GRPC_INTERNAL;
case H2_REFUSED_STREAM:
return GRPC_UNAVAILABLE;
case H2_CANCEL:
return GRPC_CANCELED;
case H2_COMPRESSION_ERROR:
case H2_CONNECT_ERROR:
return GRPC_INTERNAL;
case H2_ENHANCE_YOUR_CALM:
return GRPC_RESOURCEEXHAUSTED;
case H2_INADEQUATE_SECURITY:
return GRPC_PERMISSIONDENIED;
case H2_HTTP_1_1_REQUIRED:
return GRPC_INTERNAL;
default:
return GRPC_INTERNAL;
}
}
void percent_encode(const std::string& str, std::string* str_out) {
std::ostringstream escaped;
escaped.fill('0');
......
......@@ -18,6 +18,7 @@
#define BRPC_GRPC_H
#include <map>
#include <brpc/http2.h>
namespace brpc {
......@@ -139,12 +140,16 @@ enum GrpcStatus {
// UNAUTHENTICATED indicates the request does not have valid
// authentication credentials for the operation.
GRPC_UNAUTHENTICATED,
GRPC_MAX,
};
GrpcStatus HttpStatus2GrpcStatus(int http_status);
GrpcStatus ErrorCode2GrpcStatus(int error_code);
GrpcStatus h2Error2GrpcStatus(H2Error h2_error);
void percent_encode(const std::string& str, std::string* str_out);
void percent_decode(const std::string& str, std::string* str_out);
......
......@@ -26,7 +26,8 @@ HttpHeader::HttpHeader()
, _method(HTTP_METHOD_GET)
, _version(1, 1)
, _h2_stream_id(0)
, _h2_error(H2_NO_ERROR) {
, _h2_error(H2_NO_ERROR)
, _has_h2_error(false) {
// NOTE: don't forget to clear the field in Clear() as well.
}
......@@ -52,6 +53,7 @@ void HttpHeader::Swap(HttpHeader &rhs) {
std::swap(_version, rhs._version);
std::swap(_h2_stream_id, rhs._h2_stream_id);
std::swap(_h2_error, rhs._h2_error);
std::swap(_has_h2_error, rhs._has_h2_error);
}
void HttpHeader::Clear() {
......@@ -64,6 +66,7 @@ void HttpHeader::Clear() {
_version = std::make_pair(1, 1);
_h2_stream_id = 0;
_h2_error = H2_NO_ERROR;
_has_h2_error = false;
}
const char* HttpHeader::reason_phrase() const {
......
......@@ -69,6 +69,8 @@ public:
H2Error h2_error() const { return _h2_error; }
bool has_h2_error() { return _has_h2_error; }
// Get/set "Content-Type". Notice that you can't get "Content-Type"
// via GetHeader().
// possible values: "text/plain", "application/json" ...
......@@ -162,6 +164,7 @@ friend void policy::ProcessHttpRequest(InputMessageBase *msg);
std::pair<int, int> _version;
int _h2_stream_id;
H2Error _h2_error;
bool _has_h2_error;
};
const HttpHeader& DefaultHttpHeader();
......
......@@ -415,13 +415,22 @@ void ProcessHttpResponse(InputMessageBase* msg) {
break;
}
// begin to handle grpc case
// Receive RST_Stream and h2_error is set correspondingly
if (res_header->has_h2_error()) {
cntl->set_grpc_error_code(h2Error2GrpcStatus(res_header->h2_error()), "");
cntl->SetFailed(EGRPC, "");
break;
}
const std::string* grpc_status = res_header->GetHeader(common->GRPC_STATUS);
const std::string* grpc_message = res_header->GetHeader(common->GRPC_MESSAGE);
if (grpc_status) {
GrpcStatus status = (GrpcStatus)strtol(grpc_status->data(), NULL, 10);
if (status != GRPC_OK) {
cntl->set_grpc_error_code(status, grpc_message? *grpc_message: "");
cntl->SetFailed(EGRPC, grpc_message? grpc_message->c_str(): "");
}
break;
}
// grpc-status is absent in http header, just convert error code
......@@ -788,11 +797,12 @@ HttpResponseSender::~HttpResponseSender() {
bool grpc_protocol =
ParseContentType(req_header->content_type()) == HTTP_CONTENT_GRPC;
if (cntl->Failed() && !grpc_protocol) {
if (cntl->Failed()) {
if (!grpc_protocol) {
// Set status-code with default value(converted from error code)
// if user did not set it.
if (res_header->status_code() == HTTP_STATUS_OK) {
res_header->set_status_code(ErrorCodeToStatusCode(cntl->ErrorCode()));
res_header->set_status_code(ErrorCode2StatusCode(cntl->ErrorCode()));
}
// Fill ErrorCode into header
res_header->SetHeader(common->ERROR_CODE,
......@@ -805,6 +815,7 @@ HttpResponseSender::~HttpResponseSender() {
res_header->set_content_type(common->CONTENT_TYPE_TEXT);
cntl->response_attachment().clear();
cntl->response_attachment().append(cntl->ErrorText());
}
} else if (cntl->has_progressive_writer()) {
// Transfer-Encoding is supported since HTTP/1.1
if (res_header->major_version() < 2 && !res_header->before_http_1_1()) {
......
......@@ -13,12 +13,13 @@ set(TEST_PROTO_FILES addressbook1.proto
repeated.proto
snappy_message.proto
v1.proto
v2.proto)
file(MAKE_DIRECTORY ${PROJECT_BINARY_DIR}/test/hdrs)
set(PROTOC_FLAGS ${PROTOC_FLAGS} -I${PROJECT_SOURCE_DIR}/src)
compile_proto(PROTO_HDRS PROTO_SRCS ${PROJECT_BINARY_DIR}/test
${PROJECT_BINARY_DIR}/test/hdrs
${PROJECT_SOURCE_DIR}/test
v2.proto
grpc.proto)
file(MAKE_DIRECTORY ${CMAKE_BINARY_DIR}/test/hdrs)
set(PROTOC_FLAGS ${PROTOC_FLAGS} -I${CMAKE_SOURCE_DIR}/src)
compile_proto(PROTO_HDRS PROTO_SRCS ${CMAKE_BINARY_DIR}/test
${CMAKE_BINARY_DIR}/test/hdrs
${CMAKE_SOURCE_DIR}/test
"${TEST_PROTO_FILES}")
add_library(TEST_PROTO_LIB OBJECT ${PROTO_SRCS} ${PROTO_HDRS})
......
......@@ -16,11 +16,19 @@
#include <gtest/gtest.h>
#include <gflags/gflags.h>
#include "brpc/controller.h"
#include "brpc/server.h"
#include "brpc/channel.h"
#include "brpc/grpc.h"
#include "grpc.pb.h"
int main(int argc, char* argv[]) {
testing::InitGoogleTest(&argc, argv);
GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
if (GFLAGS_NS::SetCommandLineOption("http_body_compress_threshold", "0").empty()) {
std::cerr << "Fail to set -crash_on_fatal_log" << std::endl;
return -1;
}
if (GFLAGS_NS::SetCommandLineOption("crash_on_fatal_log", "true").empty()) {
std::cerr << "Fail to set -crash_on_fatal_log" << std::endl;
return -1;
......@@ -30,13 +38,73 @@ int main(int argc, char* argv[]) {
namespace {
const std::string g_server_addr = "127.0.0.1:8011";
const std::string g_prefix = "Hello, ";
const std::string g_req = "wyt";
const int64_t g_timeout_ms = 3000;
const std::string g_protocol = "grpc";
class MyGrpcService : public ::test::GrpcService {
public:
void Method(::google::protobuf::RpcController* cntl_base,
const ::test::GrpcRequest* req,
::test::GrpcResponse* res,
::google::protobuf::Closure* done) {
brpc::Controller* cntl =
static_cast<brpc::Controller*>(cntl_base);
brpc::ClosureGuard done_guard(done);
EXPECT_EQ(g_req, req->message());
if (req->has_gzip() && req->gzip()) {
cntl->set_response_compress_type(brpc::COMPRESS_TYPE_GZIP);
}
res->set_message(g_prefix + req->message());
if (req->has_error_code()) {
const int error_code = req->error_code();
cntl->set_grpc_error_code((brpc::GrpcStatus)error_code,
butil::string_printf("%s%d", g_prefix.c_str(), error_code));
return;
}
}
};
class GrpcTest : public ::testing::Test {
protected:
GrpcTest() {}
GrpcTest() {
EXPECT_EQ(0, _server.AddService(&_svc, brpc::SERVER_DOESNT_OWN_SERVICE));
EXPECT_EQ(0, _server.Start(g_server_addr.c_str(), NULL));
brpc::ChannelOptions options;
options.protocol = g_protocol;
options.timeout_ms = g_timeout_ms;
EXPECT_EQ(0, _channel.Init(g_server_addr.c_str(), "", &options));
}
virtual ~GrpcTest() {};
virtual void SetUp() {};
virtual void TearDown() {};
void CallMethod(bool req_gzip, bool res_gzip) {
test::GrpcRequest req;
test::GrpcResponse res;
brpc::Controller cntl;
if (req_gzip) {
cntl.set_request_compress_type(brpc::COMPRESS_TYPE_GZIP);
}
req.set_message(g_req);
req.set_gzip(res_gzip);
test::GrpcService_Stub stub(&_channel);
stub.Method(&cntl, &req, &res, NULL);
EXPECT_FALSE(cntl.Failed()) << cntl.ErrorCode() << ": " << cntl.ErrorText();
EXPECT_EQ(res.message(), g_prefix + g_req);
EXPECT_EQ(brpc::GRPC_OK, cntl.grpc_status());
}
brpc::Server _server;
MyGrpcService _svc;
brpc::Channel _channel;
};
TEST_F(GrpcTest, percent_encode) {
......@@ -65,4 +133,28 @@ TEST_F(GrpcTest, percent_decode) {
EXPECT_TRUE(out == s2_expected_out) << s2_expected_out << " vs " << out;
}
TEST_F(GrpcTest, sanity) {
for (int i = 0; i < 2; ++i) { // if req use gzip or not
for (int j = 0; j < 2; ++j) { // if res use gzip or not
CallMethod(i, j);
}
}
}
TEST_F(GrpcTest, return_error) {
// GRPC_OK(0) is skipped
for (int i = 1; i < (int)brpc::GRPC_MAX; ++i) {
test::GrpcRequest req;
test::GrpcResponse res;
brpc::Controller cntl;
req.set_message(g_req);
req.set_error_code(i);
test::GrpcService_Stub stub(&_channel);
stub.Method(&cntl, &req, &res, NULL);
EXPECT_TRUE(cntl.Failed());
EXPECT_EQ((int)cntl.grpc_status(), i);
EXPECT_EQ(cntl.grpc_message(), butil::string_printf("%s%d", g_prefix.c_str(), i));
}
}
} // namespace
syntax="proto2";
option cc_generic_services = true;
package test;
message GrpcRequest {
required string message = 1;
optional bool gzip = 2;
optional int32 error_code = 3;
};
message GrpcResponse {
required string message = 1;
};
service GrpcService {
rpc Method(GrpcRequest) returns (GrpcResponse);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment