Unverified Commit d66346f8 authored by Ge Jun's avatar Ge Jun Committed by GitHub

Merge pull request #553 from zyearn/deadline

Implement grpc timeout
parents a20ee30e afb33c08
...@@ -493,12 +493,12 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method, ...@@ -493,12 +493,12 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
// Setup timer for backup request. When it occurs, we'll setup a // Setup timer for backup request. When it occurs, we'll setup a
// timer of timeout_ms before sending backup request. // timer of timeout_ms before sending backup request.
// _abstime_us is for truncating _connect_timeout_ms and resetting // _deadline_us is for truncating _connect_timeout_ms and resetting
// timer when EBACKUPREQUEST occurs. // timer when EBACKUPREQUEST occurs.
if (cntl->timeout_ms() < 0) { if (cntl->timeout_ms() < 0) {
cntl->_abstime_us = -1; cntl->_deadline_us = -1;
} else { } else {
cntl->_abstime_us = cntl->timeout_ms() * 1000L + start_send_real_us; cntl->_deadline_us = cntl->timeout_ms() * 1000L + start_send_real_us;
} }
const int rc = bthread_timer_add( const int rc = bthread_timer_add(
&cntl->_timeout_id, &cntl->_timeout_id,
...@@ -512,18 +512,18 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method, ...@@ -512,18 +512,18 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
} else if (cntl->timeout_ms() >= 0) { } else if (cntl->timeout_ms() >= 0) {
// Setup timer for RPC timetout // Setup timer for RPC timetout
// _abstime_us is for truncating _connect_timeout_ms // _deadline_us is for truncating _connect_timeout_ms
cntl->_abstime_us = cntl->timeout_ms() * 1000L + start_send_real_us; cntl->_deadline_us = cntl->timeout_ms() * 1000L + start_send_real_us;
const int rc = bthread_timer_add( const int rc = bthread_timer_add(
&cntl->_timeout_id, &cntl->_timeout_id,
butil::microseconds_to_timespec(cntl->_abstime_us), butil::microseconds_to_timespec(cntl->_deadline_us),
HandleTimeout, (void*)correlation_id.value); HandleTimeout, (void*)correlation_id.value);
if (BAIDU_UNLIKELY(rc != 0)) { if (BAIDU_UNLIKELY(rc != 0)) {
cntl->SetFailed(rc, "Fail to add timer for timeout"); cntl->SetFailed(rc, "Fail to add timer for timeout");
return cntl->HandleSendFailed(); return cntl->HandleSendFailed();
} }
} else { } else {
cntl->_abstime_us = -1; cntl->_deadline_us = -1;
} }
cntl->IssueRPC(start_send_real_us); cntl->IssueRPC(start_send_real_us);
......
...@@ -222,7 +222,7 @@ void Controller::ResetPods() { ...@@ -222,7 +222,7 @@ void Controller::ResetPods() {
_timeout_ms = UNSET_MAGIC_NUM; _timeout_ms = UNSET_MAGIC_NUM;
_backup_request_ms = UNSET_MAGIC_NUM; _backup_request_ms = UNSET_MAGIC_NUM;
_connect_timeout_ms = UNSET_MAGIC_NUM; _connect_timeout_ms = UNSET_MAGIC_NUM;
_abstime_us = -1; _deadline_us = -1;
_timeout_id = 0; _timeout_id = 0;
_begin_time_us = 0; _begin_time_us = 0;
_end_time_us = 0; _end_time_us = 0;
...@@ -568,7 +568,7 @@ void Controller::OnVersionedRPCReturned(const CompletionInfo& info, ...@@ -568,7 +568,7 @@ void Controller::OnVersionedRPCReturned(const CompletionInfo& info,
if (timeout_ms() >= 0) { if (timeout_ms() >= 0) {
rc = bthread_timer_add( rc = bthread_timer_add(
&_timeout_id, &_timeout_id,
butil::microseconds_to_timespec(_abstime_us), butil::microseconds_to_timespec(_deadline_us),
HandleTimeout, (void*)_correlation_id.value); HandleTimeout, (void*)_correlation_id.value);
} }
if (rc != 0) { if (rc != 0) {
...@@ -750,7 +750,7 @@ void Controller::Call::OnComplete( ...@@ -750,7 +750,7 @@ void Controller::Call::OnComplete(
// main socket should die as well. // main socket should die as well.
// NOTE: main socket may be wrongly set failed (provided that // NOTE: main socket may be wrongly set failed (provided that
// short/pooled socket does not hold a ref of the main socket). // short/pooled socket does not hold a ref of the main socket).
// E.g. a in-parallel RPC sets the peer_id to be failed // E.g. an in-parallel RPC sets the peer_id to be failed
// -> this RPC meets ECONNREFUSED // -> this RPC meets ECONNREFUSED
// -> main socket gets revived from HC // -> main socket gets revived from HC
// -> this RPC sets main socket to be failed again. // -> this RPC sets main socket to be failed again.
...@@ -1111,10 +1111,10 @@ void Controller::IssueRPC(int64_t start_realtime_us) { ...@@ -1111,10 +1111,10 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
timespec connect_abstime; timespec connect_abstime;
timespec* pabstime = NULL; timespec* pabstime = NULL;
if (_connect_timeout_ms > 0) { if (_connect_timeout_ms > 0) {
if (_abstime_us >= 0) { if (_deadline_us >= 0) {
connect_abstime = butil::microseconds_to_timespec( connect_abstime = butil::microseconds_to_timespec(
std::min(_connect_timeout_ms * 1000L + start_realtime_us, std::min(_connect_timeout_ms * 1000L + start_realtime_us,
_abstime_us)); _deadline_us));
} else { } else {
connect_abstime = butil::microseconds_to_timespec( connect_abstime = butil::microseconds_to_timespec(
_connect_timeout_ms * 1000L + start_realtime_us); _connect_timeout_ms * 1000L + start_realtime_us);
......
...@@ -309,9 +309,11 @@ public: ...@@ -309,9 +309,11 @@ public:
// undefined on the client side (may crash). // undefined on the client side (may crash).
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
// If true, indicates that the client canceled the RPC or the connection has // Returns true if the client canceled the RPC or the connection has broken,
// broken, so the server may as well give up on replying to it. The server // so the server may as well give up on replying to it. The server should still
// should still call the final "done" callback. // call the final "done" callback.
// Note: Reaching deadline of the RPC would not affect this function, which means
// even if deadline has been reached, this function may still return false.
bool IsCanceled() const; bool IsCanceled() const;
// Asks that the given callback be called when the RPC is canceled or the // Asks that the given callback be called when the RPC is canceled or the
...@@ -480,6 +482,10 @@ public: ...@@ -480,6 +482,10 @@ public:
// Get sock option. .e.g get vip info through ttm kernel module hook, // Get sock option. .e.g get vip info through ttm kernel module hook,
int GetSockOption(int level, int optname, void* optval, socklen_t* optlen); int GetSockOption(int level, int optname, void* optval, socklen_t* optlen);
// Get deadline of this RPC (since the Epoch in microseconds).
// -1 means no deadline.
int64_t deadline_us() const { return _deadline_us; }
private: private:
struct CompletionInfo { struct CompletionInfo {
CallId id; // call_id of the corresponding request CallId id; // call_id of the corresponding request
...@@ -663,7 +669,7 @@ private: ...@@ -663,7 +669,7 @@ private:
int32_t _connect_timeout_ms; int32_t _connect_timeout_ms;
int32_t _backup_request_ms; int32_t _backup_request_ms;
// Deadline of this RPC (since the Epoch in microseconds). // Deadline of this RPC (since the Epoch in microseconds).
int64_t _abstime_us; int64_t _deadline_us;
// Timer registered to trigger RPC timeout event // Timer registered to trigger RPC timeout event
bthread_timer_t _timeout_id; bthread_timer_t _timeout_id;
......
...@@ -128,6 +128,10 @@ public: ...@@ -128,6 +128,10 @@ public:
std::string& protocol_param() { return _cntl->protocol_param(); } std::string& protocol_param() { return _cntl->protocol_param(); }
const std::string& protocol_param() const { return _cntl->protocol_param(); } const std::string& protocol_param() const { return _cntl->protocol_param(); }
// Note: This function can only be called in server side. The deadline of client
// side is properly set in the RPC sending path.
void set_deadline_us(int64_t deadline_us) { _cntl->_deadline_us = deadline_us; }
private: private:
Controller* _cntl; Controller* _cntl;
}; };
......
...@@ -166,4 +166,41 @@ void PercentDecode(const std::string& str, std::string* str_out) { ...@@ -166,4 +166,41 @@ void PercentDecode(const std::string& str, std::string* str_out) {
} }
} }
int64_t ConvertGrpcTimeoutToUS(const std::string* grpc_timeout) {
if (!grpc_timeout || grpc_timeout->empty()) {
return -1;
}
char* endptr = NULL;
int64_t timeout_value = (int64_t)strtol(grpc_timeout->data(), &endptr, 10);
// Only the format that the digit number is equal to (timeout header size - 1)
// is valid. Otherwise the format is not valid and is treated as no deadline.
// For example:
// "1H", "2993S", "82m" is valid.
// "30A" is also valid, but the following switch would fall into default
// case and return -1 since 'A' is not a valid time unit.
// "123ASH" is not vaid since the digit number is 3, while the size is 6.
// "HHH" is not valid since the dight number is 0, while the size is 3.
if ((size_t)(endptr - grpc_timeout->data()) != grpc_timeout->size() - 1) {
return -1;
}
switch (*endptr) {
case 'H':
return timeout_value * 3600 * 1000000;
case 'M':
return timeout_value * 60 * 1000000;
case 'S':
return timeout_value * 1000000;
case 'm':
return timeout_value * 1000;
case 'u':
return timeout_value;
case 'n':
timeout_value = (timeout_value + 500) / 1000;
return (timeout_value == 0) ? 1 : timeout_value;
default:
return -1;
}
CHECK(false) << "Impossible";
}
} // namespace brpc } // namespace brpc
...@@ -658,18 +658,18 @@ void ParallelChannel::CallMethod( ...@@ -658,18 +658,18 @@ void ParallelChannel::CallMethod(
cntl->set_timeout_ms(_options.timeout_ms); cntl->set_timeout_ms(_options.timeout_ms);
} }
if (cntl->timeout_ms() >= 0) { if (cntl->timeout_ms() >= 0) {
cntl->_abstime_us = cntl->timeout_ms() * 1000L + cntl->_begin_time_us; cntl->_deadline_us = cntl->timeout_ms() * 1000L + cntl->_begin_time_us;
// Setup timer for RPC timetout // Setup timer for RPC timetout
const int rc = bthread_timer_add( const int rc = bthread_timer_add(
&cntl->_timeout_id, &cntl->_timeout_id,
butil::microseconds_to_timespec(cntl->_abstime_us), butil::microseconds_to_timespec(cntl->_deadline_us),
HandleTimeout, (void*)cid.value); HandleTimeout, (void*)cid.value);
if (rc != 0) { if (rc != 0) {
cntl->SetFailed(rc, "Fail to add timer"); cntl->SetFailed(rc, "Fail to add timer");
goto FAIL; goto FAIL;
} }
} else { } else {
cntl->_abstime_us = -1; cntl->_deadline_us = -1;
} }
d->SaveThreadInfoOfCallsite(); d->SaveThreadInfoOfCallsite();
CHECK_EQ(0, bthread_id_unlock(cid)); CHECK_EQ(0, bthread_id_unlock(cid));
......
...@@ -50,6 +50,8 @@ int is_failed_after_queries(const http_parser* parser); ...@@ -50,6 +50,8 @@ int is_failed_after_queries(const http_parser* parser);
int is_failed_after_http_version(const http_parser* parser); int is_failed_after_http_version(const http_parser* parser);
DECLARE_bool(http_verbose); DECLARE_bool(http_verbose);
DECLARE_int32(http_verbose_max_body_length); DECLARE_int32(http_verbose_max_body_length);
// Defined in grpc.cpp
int64_t ConvertGrpcTimeoutToUS(const std::string* grpc_timeout);
namespace policy { namespace policy {
...@@ -135,6 +137,7 @@ CommonStrings::CommonStrings() ...@@ -135,6 +137,7 @@ CommonStrings::CommonStrings()
, GRPC_ACCEPT_ENCODING_VALUE("identity,gzip") , GRPC_ACCEPT_ENCODING_VALUE("identity,gzip")
, GRPC_STATUS("grpc-status") , GRPC_STATUS("grpc-status")
, GRPC_MESSAGE("grpc-message") , GRPC_MESSAGE("grpc-message")
, GRPC_TIMEOUT("grpc-timeout")
{} {}
static CommonStrings* common = NULL; static CommonStrings* common = NULL;
...@@ -575,7 +578,10 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/, ...@@ -575,7 +578,10 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/,
*/ */
// TODO: do we need this? // TODO: do we need this?
hreq.SetHeader(common->TE, common->TRAILERS); hreq.SetHeader(common->TE, common->TRAILERS);
if (cntl->timeout_ms() >= 0) {
hreq.SetHeader(common->GRPC_TIMEOUT,
butil::string_printf("%ldm", cntl->timeout_ms()));
}
// Append compressed and length before body // Append compressed and length before body
AddGrpcPrefix(&cntl->request_attachment(), grpc_compressed); AddGrpcPrefix(&cntl->request_attachment(), grpc_compressed);
} }
...@@ -1418,6 +1424,12 @@ void ProcessHttpRequest(InputMessageBase *msg) { ...@@ -1418,6 +1424,12 @@ void ProcessHttpRequest(InputMessageBase *msg) {
return; return;
} }
} }
int64_t timeout_value_us =
ConvertGrpcTimeoutToUS(req_header.GetHeader(common->GRPC_TIMEOUT));
if (timeout_value_us >= 0) {
accessor.set_deadline_us(
butil::gettimeofday_us() + timeout_value_us);
}
} }
} else { } else {
encoding = req_header.GetHeader(common->CONTENT_ENCODING); encoding = req_header.GetHeader(common->CONTENT_ENCODING);
...@@ -1455,7 +1467,7 @@ void ProcessHttpRequest(InputMessageBase *msg) { ...@@ -1455,7 +1467,7 @@ void ProcessHttpRequest(InputMessageBase *msg) {
// A http server, just keep content as it is. // A http server, just keep content as it is.
cntl->request_attachment().swap(req_body); cntl->request_attachment().swap(req_body);
} }
google::protobuf::Closure* done = new HttpResponseSenderAsDone(&resp_sender); google::protobuf::Closure* done = new HttpResponseSenderAsDone(&resp_sender);
imsg_guard.reset(); // optional, just release resourse ASAP imsg_guard.reset(); // optional, just release resourse ASAP
......
...@@ -71,6 +71,7 @@ struct CommonStrings { ...@@ -71,6 +71,7 @@ struct CommonStrings {
std::string GRPC_ACCEPT_ENCODING_VALUE; std::string GRPC_ACCEPT_ENCODING_VALUE;
std::string GRPC_STATUS; std::string GRPC_STATUS;
std::string GRPC_MESSAGE; std::string GRPC_MESSAGE;
std::string GRPC_TIMEOUT;
CommonStrings(); CommonStrings();
}; };
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "brpc/server.h" #include "brpc/server.h"
#include "brpc/channel.h" #include "brpc/channel.h"
#include "brpc/grpc.h" #include "brpc/grpc.h"
#include "butil/time.h"
#include "grpc.pb.h" #include "grpc.pb.h"
int main(int argc, char* argv[]) { int main(int argc, char* argv[]) {
...@@ -64,6 +65,14 @@ public: ...@@ -64,6 +65,14 @@ public:
cntl->SetFailed(brpc::EINTERNAL, "%s", g_prefix.c_str()); cntl->SetFailed(brpc::EINTERNAL, "%s", g_prefix.c_str());
return; return;
} }
if (req->has_timeout_us()) {
if (req->timeout_us() < 0) {
EXPECT_EQ(-1, cntl->deadline_us());
} else {
EXPECT_NEAR(cntl->deadline_us(),
butil::gettimeofday_us() + req->timeout_us(), 60);
}
}
} }
void MethodTimeOut(::google::protobuf::RpcController* cntl_base, void MethodTimeOut(::google::protobuf::RpcController* cntl_base,
...@@ -77,7 +86,6 @@ public: ...@@ -77,7 +86,6 @@ public:
} }
}; };
class GrpcTest : public ::testing::Test { class GrpcTest : public ::testing::Test {
protected: protected:
GrpcTest() { GrpcTest() {
...@@ -198,4 +206,68 @@ TEST_F(GrpcTest, MethodNotExist) { ...@@ -198,4 +206,68 @@ TEST_F(GrpcTest, MethodNotExist) {
ASSERT_TRUE(butil::StringPiece(cntl.ErrorText()).ends_with("Method MethodNotExist() not implemented.")); ASSERT_TRUE(butil::StringPiece(cntl.ErrorText()).ends_with("Method MethodNotExist() not implemented."));
} }
TEST_F(GrpcTest, GrpcTimeOut) {
const char* timeouts[] = {
// valid case
"2H", "7200000000",
"3M", "180000000",
"+1S", "1000000",
"4m", "4000",
"5u", "5",
"6n", "1",
// invalid case
"30A", "-1",
"123ASH", "-1",
"HHHH", "-1",
"112", "-1",
"H999m", "-1",
"", "-1"
};
// test all timeout format
for (size_t i = 0; i < arraysize(timeouts); i = i + 2) {
test::GrpcRequest req;
test::GrpcResponse res;
brpc::Controller cntl;
req.set_message(g_req);
req.set_gzip(false);
req.set_return_error(false);
req.set_timeout_us((int64_t)(strtol(timeouts[i+1], NULL, 10)));
cntl.set_timeout_ms(-1);
cntl.http_request().SetHeader("grpc-timeout", timeouts[i]);
test::GrpcService_Stub stub(&_channel);
stub.Method(&cntl, &req, &res, NULL);
EXPECT_FALSE(cntl.Failed());
}
// test timeout by using timeout_ms in cntl
{
test::GrpcRequest req;
test::GrpcResponse res;
brpc::Controller cntl;
req.set_message(g_req);
req.set_gzip(false);
req.set_return_error(false);
req.set_timeout_us(9876000);
cntl.set_timeout_ms(9876);
test::GrpcService_Stub stub(&_channel);
stub.Method(&cntl, &req, &res, NULL);
EXPECT_FALSE(cntl.Failed());
}
// test timeout by using timeout_ms in channel
{
test::GrpcRequest req;
test::GrpcResponse res;
brpc::Controller cntl;
req.set_message(g_req);
req.set_gzip(false);
req.set_return_error(false);
req.set_timeout_us(g_timeout_ms * 1000);
test::GrpcService_Stub stub(&_channel);
stub.Method(&cntl, &req, &res, NULL);
EXPECT_FALSE(cntl.Failed());
}
}
} // namespace } // namespace
...@@ -407,7 +407,7 @@ TEST_F(BthreadTest, stop_sleep) { ...@@ -407,7 +407,7 @@ TEST_F(BthreadTest, stop_sleep) {
ASSERT_EQ(0, bthread_stop(th)); ASSERT_EQ(0, bthread_stop(th));
ASSERT_EQ(0, bthread_join(th, NULL)); ASSERT_EQ(0, bthread_join(th, NULL));
tm.stop(); tm.stop();
ASSERT_LE(labs(tm.m_elapsed() - 10), 5); ASSERT_LE(labs(tm.m_elapsed() - 10), 10);
} }
TEST_F(BthreadTest, bthread_exit) { TEST_F(BthreadTest, bthread_exit) {
......
...@@ -7,6 +7,7 @@ message GrpcRequest { ...@@ -7,6 +7,7 @@ message GrpcRequest {
required string message = 1; required string message = 1;
required bool gzip = 2; required bool gzip = 2;
required bool return_error = 3; required bool return_error = 3;
optional int64 timeout_us = 4;
}; };
message GrpcResponse { message GrpcResponse {
...@@ -18,5 +19,3 @@ service GrpcService { ...@@ -18,5 +19,3 @@ service GrpcService {
rpc MethodTimeOut(GrpcRequest) returns (GrpcResponse); rpc MethodTimeOut(GrpcRequest) returns (GrpcResponse);
rpc MethodNotExist(GrpcRequest) returns (GrpcResponse); rpc MethodNotExist(GrpcRequest) returns (GrpcResponse);
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment