Commit 695a35c2 authored by zhujiashun's avatar zhujiashun

Implement grpc timeout

parent a6ccc96a
......@@ -493,12 +493,12 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
// Setup timer for backup request. When it occurs, we'll setup a
// timer of timeout_ms before sending backup request.
// _abstime_us is for truncating _connect_timeout_ms and resetting
// _abstime_ns is for truncating _connect_timeout_ms and resetting
// timer when EBACKUPREQUEST occurs.
if (cntl->timeout_ms() < 0) {
cntl->_abstime_us = -1;
cntl->_abstime_ns = -1;
} else {
cntl->_abstime_us = cntl->timeout_ms() * 1000L + start_send_real_us;
cntl->_abstime_ns = cntl->timeout_ms() * 1000000L + start_send_real_us * 1000L;
}
const int rc = bthread_timer_add(
&cntl->_timeout_id,
......@@ -512,18 +512,18 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
} else if (cntl->timeout_ms() >= 0) {
// Setup timer for RPC timetout
// _abstime_us is for truncating _connect_timeout_ms
cntl->_abstime_us = cntl->timeout_ms() * 1000L + start_send_real_us;
// _abstime_ns is for truncating _connect_timeout_ms
cntl->_abstime_ns = cntl->timeout_ms() * 1000000L + start_send_real_us * 1000L;
const int rc = bthread_timer_add(
&cntl->_timeout_id,
butil::microseconds_to_timespec(cntl->_abstime_us),
butil::nanoseconds_to_timespec(cntl->_abstime_ns),
HandleTimeout, (void*)correlation_id.value);
if (BAIDU_UNLIKELY(rc != 0)) {
cntl->SetFailed(rc, "Fail to add timer for timeout");
return cntl->HandleSendFailed();
}
} else {
cntl->_abstime_us = -1;
cntl->_abstime_ns = -1;
}
cntl->IssueRPC(start_send_real_us);
......
......@@ -222,7 +222,7 @@ void Controller::ResetPods() {
_timeout_ms = UNSET_MAGIC_NUM;
_backup_request_ms = UNSET_MAGIC_NUM;
_connect_timeout_ms = UNSET_MAGIC_NUM;
_abstime_us = -1;
_abstime_ns = -1;
_timeout_id = 0;
_begin_time_us = 0;
_end_time_us = 0;
......@@ -568,7 +568,7 @@ void Controller::OnVersionedRPCReturned(const CompletionInfo& info,
if (timeout_ms() >= 0) {
rc = bthread_timer_add(
&_timeout_id,
butil::microseconds_to_timespec(_abstime_us),
butil::nanoseconds_to_timespec(_abstime_ns),
HandleTimeout, (void*)_correlation_id.value);
}
if (rc != 0) {
......@@ -1111,10 +1111,10 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
timespec connect_abstime;
timespec* pabstime = NULL;
if (_connect_timeout_ms > 0) {
if (_abstime_us >= 0) {
connect_abstime = butil::microseconds_to_timespec(
std::min(_connect_timeout_ms * 1000L + start_realtime_us,
_abstime_us));
if (_abstime_ns >= 0) {
connect_abstime = butil::nanoseconds_to_timespec(
std::min(_connect_timeout_ms * 1000000L + start_realtime_us * 1000L,
_abstime_ns));
} else {
connect_abstime = butil::microseconds_to_timespec(
_connect_timeout_ms * 1000L + start_realtime_us);
......
......@@ -480,6 +480,10 @@ public:
// Get sock option. .e.g get vip info through ttm kernel module hook,
int GetSockOption(int level, int optname, void* optval, socklen_t* optlen);
// Get deadline of this RPC (since the Epoch in nanoseconds).
// -1 means no deadline.
int64_t deadline_ns() const { return _abstime_ns; }
private:
struct CompletionInfo {
CallId id; // call_id of the corresponding request
......@@ -662,8 +666,8 @@ private:
int32_t _timeout_ms;
int32_t _connect_timeout_ms;
int32_t _backup_request_ms;
// Deadline of this RPC (since the Epoch in microseconds).
int64_t _abstime_us;
// Deadline of this RPC (since the Epoch in nanoseconds).
int64_t _abstime_ns;
// Timer registered to trigger RPC timeout event
bthread_timer_t _timeout_id;
......
......@@ -128,6 +128,10 @@ public:
std::string& protocol_param() { return _cntl->protocol_param(); }
const std::string& protocol_param() const { return _cntl->protocol_param(); }
void set_deadline_ns(int64_t timeout_ns) {
_cntl->_abstime_ns = butil::gettimeofday_us() * 1000L + timeout_ns;
}
private:
Controller* _cntl;
};
......
......@@ -658,18 +658,18 @@ void ParallelChannel::CallMethod(
cntl->set_timeout_ms(_options.timeout_ms);
}
if (cntl->timeout_ms() >= 0) {
cntl->_abstime_us = cntl->timeout_ms() * 1000L + cntl->_begin_time_us;
cntl->_abstime_ns = cntl->timeout_ms() * 1000000L + cntl->_begin_time_us * 1000L;
// Setup timer for RPC timetout
const int rc = bthread_timer_add(
&cntl->_timeout_id,
butil::microseconds_to_timespec(cntl->_abstime_us),
butil::nanoseconds_to_timespec(cntl->_abstime_ns),
HandleTimeout, (void*)cid.value);
if (rc != 0) {
cntl->SetFailed(rc, "Fail to add timer");
goto FAIL;
}
} else {
cntl->_abstime_us = -1;
cntl->_abstime_ns = -1;
}
d->SaveThreadInfoOfCallsite();
CHECK_EQ(0, bthread_id_unlock(cid));
......
......@@ -135,6 +135,7 @@ CommonStrings::CommonStrings()
, GRPC_ACCEPT_ENCODING_VALUE("identity,gzip")
, GRPC_STATUS("grpc-status")
, GRPC_MESSAGE("grpc-message")
, GRPC_TIMEOUT("grpc-timeout")
{}
static CommonStrings* common = NULL;
......@@ -1190,6 +1191,31 @@ void EndRunningCallMethodInPool(
::google::protobuf::Message* response,
::google::protobuf::Closure* done);
static int64_t ConvertGrpcTimeoutToNS(int64_t timeout_value, const char timeout_unit) {
switch (timeout_unit) {
case 'H':
timeout_value *= (3600 * 1000000000L);
break;
case 'M':
timeout_value *= (60 * 1000000000L);
break;
case 'S':
timeout_value *= 1000000000L;
break;
case 'm':
timeout_value *= 1000000L;
break;
case 'u':
timeout_value *= 1000L;
case 'n':
break;
default:
return -1;
}
return timeout_value;
}
void ProcessHttpRequest(InputMessageBase *msg) {
const int64_t start_parse_us = butil::cpuwide_time_us();
DestroyingPtr<HttpContext> imsg_guard(static_cast<HttpContext*>(msg));
......@@ -1418,6 +1444,15 @@ void ProcessHttpRequest(InputMessageBase *msg) {
return;
}
}
const std::string* grpc_timeout = req_header.GetHeader(common->GRPC_TIMEOUT);
if (grpc_timeout) {
const char timeout_unit = grpc_timeout->back();
int64_t timeout_value_ns =
ConvertGrpcTimeoutToNS((int64_t)strtol(grpc_timeout->data(), NULL, 10), timeout_unit);
if (timeout_value_ns >= 0) {
accessor.set_deadline_ns(timeout_value_ns);
}
}
}
} else {
encoding = req_header.GetHeader(common->CONTENT_ENCODING);
......
......@@ -71,6 +71,7 @@ struct CommonStrings {
std::string GRPC_ACCEPT_ENCODING_VALUE;
std::string GRPC_STATUS;
std::string GRPC_MESSAGE;
std::string GRPC_TIMEOUT;
CommonStrings();
};
......
......@@ -20,6 +20,7 @@
#include "brpc/server.h"
#include "brpc/channel.h"
#include "brpc/grpc.h"
#include "butil/time.h"
#include "grpc.pb.h"
int main(int argc, char* argv[]) {
......@@ -64,6 +65,10 @@ public:
cntl->SetFailed(brpc::EINTERNAL, "%s", g_prefix.c_str());
return;
}
if (req->has_timeout_ns()) {
EXPECT_NEAR(cntl->deadline_ns() / 1000000000L,
butil::gettimeofday_s() + req->timeout_ns() / 1000000000L, 1);
}
}
void MethodTimeOut(::google::protobuf::RpcController* cntl_base,
......@@ -77,7 +82,6 @@ public:
}
};
class GrpcTest : public ::testing::Test {
protected:
GrpcTest() {
......@@ -198,4 +202,29 @@ TEST_F(GrpcTest, MethodNotExist) {
ASSERT_TRUE(butil::StringPiece(cntl.ErrorText()).ends_with("Method MethodNotExist() not implemented."));
}
TEST_F(GrpcTest, GrpcTimeOut) {
const char* timeouts[] = {
"2H", "7200000000000",
"3M", "180000000000",
"+1S", "1000000000",
"4m", "4000000",
"5u", "5000",
"6n", "6"
};
for (size_t i = 0; i < arraysize(timeouts); i = i + 2) {
test::GrpcRequest req;
test::GrpcResponse res;
brpc::Controller cntl;
req.set_message(g_req);
req.set_gzip(false);
req.set_return_error(false);
req.set_timeout_ns((int64_t)(strtol(timeouts[i+1], NULL, 10)));
cntl.http_request().SetHeader("grpc-timeout", timeouts[i]);
test::GrpcService_Stub stub(&_channel);
stub.Method(&cntl, &req, &res, NULL);
EXPECT_FALSE(cntl.Failed());
}
}
} // namespace
......@@ -7,6 +7,7 @@ message GrpcRequest {
required string message = 1;
required bool gzip = 2;
required bool return_error = 3;
optional int64 timeout_ns = 4;
};
message GrpcResponse {
......@@ -18,5 +19,3 @@ service GrpcService {
rpc MethodTimeOut(GrpcRequest) returns (GrpcResponse);
rpc MethodNotExist(GrpcRequest) returns (GrpcResponse);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment