Commit cf9e2f71 authored by Ge Jun's avatar Ge Jun

Calculate server latencies starting from received_us instead of start_parse_us

parent d3f7ee09
......@@ -67,7 +67,7 @@ SendNsheadPbResponse::SendNsheadPbResponse(
void SendNsheadPbResponse::Run() {
MethodStatus* saved_status = status;
const int64_t saved_start_us = done->cpuwide_start_us();
const int64_t received_us = done->received_us();
if (!cntl->IsCloseConnection()) {
adaptor->SerializeResponseToIOBuf(meta, cntl, pbres.get(), ns_res);
}
......@@ -85,7 +85,7 @@ void SendNsheadPbResponse::Run() {
// back response.
if (saved_status) {
saved_status->OnResponded(
!saved_failed, butil::cpuwide_time_us() - saved_start_us);
!saved_failed, butil::cpuwide_time_us() - received_us);
}
saved_done->Run();
}
......
......@@ -48,8 +48,7 @@ public:
// If subclass does not require space, this return value is NULL.
void* additional_space() { return _additional_space; }
// The starting time of the RPC, got from butil::cpuwide_time_us().
int64_t cpuwide_start_us() const { return _start_parse_us; }
int64_t received_us() const { return _received_us; }
// Don't send response back, used by MIMO.
void DoNotRespond();
......@@ -61,7 +60,7 @@ friend class DeleteNsheadClosure;
~NsheadClosure();
const Server* _server;
int64_t _start_parse_us;
int64_t _received_us;
NsheadMessage _request;
NsheadMessage _response;
bool _do_respond;
......
......@@ -139,7 +139,7 @@ void SendRpcResponse(int64_t correlation_id,
const google::protobuf::Message* res,
const Server* server,
MethodStatus* method_status_raw,
long start_parse_us) {
int64_t received_us) {
ControllerPrivateAccessor accessor(cntl);
Span* span = accessor.span();
if (span) {
......@@ -266,7 +266,7 @@ void SendRpcResponse(int64_t correlation_id,
}
if (method_status) {
method_status.release()->OnResponded(
!cntl->Failed(), butil::cpuwide_time_us() - start_parse_us);
!cntl->Failed(), butil::cpuwide_time_us() - received_us);
}
}
......@@ -477,19 +477,20 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
break;
}
// optional, just release resourse ASAP
msg.reset();
req_buf.clear();
res.reset(svc->GetResponsePrototype(method).New());
// `socket' will be held until response has been sent
google::protobuf::Closure* done = ::brpc::NewCallback<
int64_t, Controller*, const google::protobuf::Message*,
const google::protobuf::Message*, const Server*,
MethodStatus*, long>(
MethodStatus*, int64_t>(
&SendRpcResponse, meta.correlation_id(), cntl.get(),
req.get(), res.get(), server,
method_status, start_parse_us);
method_status, msg->received_us());
// optional, just release resourse ASAP
msg.reset();
req_buf.clear();
if (span) {
span->set_start_callback_us(butil::cpuwide_time_us());
span->AsParent();
......@@ -513,7 +514,7 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
// `socket' will be held until response has been sent
SendRpcResponse(meta.correlation_id(), cntl.release(),
req.release(), res.release(), server,
method_status, -1);
method_status, msg->received_us());
}
bool VerifyRpcRequest(const InputMessageBase* msg_base) {
......
......@@ -114,7 +114,6 @@ void PackEspRequest(butil::IOBuf* packet_buf,
}
void ProcessEspResponse(InputMessageBase* msg_base) {
const int64_t start_parse_us = butil::cpuwide_time_us();
DestroyingPtr<MostCommonMessage> msg(static_cast<MostCommonMessage*>(msg_base));
......
......@@ -551,7 +551,7 @@ static void SendHttpResponse(Controller *cntl,
const google::protobuf::Message *res,
const Server* server,
MethodStatus* method_status_raw,
long start_parse_us) {
int64_t received_us) {
ControllerPrivateAccessor accessor(cntl);
Span* span = accessor.span();
if (span) {
......@@ -729,15 +729,17 @@ static void SendHttpResponse(Controller *cntl,
}
if (method_status) {
method_status.release()->OnResponded(
!cntl->Failed(), butil::cpuwide_time_us() - start_parse_us);
!cntl->Failed(), butil::cpuwide_time_us() - received_us);
}
}
inline void SendHttpResponse(Controller *cntl, const Server* svr,
MethodStatus* method_status) {
SendHttpResponse(cntl, NULL, NULL, svr, method_status, -1);
MethodStatus* method_status,
int64_t received_us) {
SendHttpResponse(cntl, NULL, NULL, svr, method_status, received_us);
}
// Normalize the sub string of `uri_path' covered by `splitter' and
// put it into `unresolved_path'
static void FillUnresolvedPath(std::string* unresolved_path,
......@@ -1117,7 +1119,7 @@ void ProcessHttpRequest(InputMessageBase *msg) {
if (!server->IsRunning()) {
cntl->SetFailed(ELOGOFF, "Server is stopping");
return SendHttpResponse(cntl.release(), server, NULL);
return SendHttpResponse(cntl.release(), server, NULL, msg->received_us());
}
if (server->options().http_master_service) {
......@@ -1127,16 +1129,16 @@ void ProcessHttpRequest(InputMessageBase *msg) {
svc->GetDescriptor()->FindMethodByName(common->DEFAULT_METHOD);
if (md == NULL) {
cntl->SetFailed(ENOMETHOD, "No default_method in http_master_service");
return SendHttpResponse(cntl.release(), server, NULL);
return SendHttpResponse(cntl.release(), server, NULL, msg->received_us());
}
accessor.set_method(md);
cntl->request_attachment().swap(req_body);
google::protobuf::Closure* done = brpc::NewCallback<
Controller*, const google::protobuf::Message*,
const google::protobuf::Message*, const Server*,
MethodStatus *, long>(
MethodStatus *, int64_t>(
&SendHttpResponse, cntl.get(), NULL, NULL, server,
NULL, start_parse_us);
NULL, msg->received_us());
if (span) {
span->ResetServerSpanName(md->full_name());
span->set_start_callback_us(butil::cpuwide_time_us());
......@@ -1156,14 +1158,14 @@ void ProcessHttpRequest(InputMessageBase *msg) {
} else {
cntl->SetFailed(ENOMETHOD, "Fail to find method on `%s'", path.c_str());
}
return SendHttpResponse(cntl.release(), server, NULL);
return SendHttpResponse(cntl.release(), server, NULL, msg->received_us());
} else if (sp->service->GetDescriptor() == BadMethodService::descriptor()) {
BadMethodRequest breq;
BadMethodResponse bres;
butil::StringSplitter split(path.c_str(), '/');
breq.set_service_name(std::string(split.field(), split.length()));
sp->service->CallMethod(sp->method, cntl.get(), &breq, &bres, NULL);
return SendHttpResponse(cntl.release(), server, NULL);
return SendHttpResponse(cntl.release(), server, NULL, msg->received_us());
}
// Switch to service-specific error.
non_service_error.release();
......@@ -1173,7 +1175,7 @@ void ProcessHttpRequest(InputMessageBase *msg) {
cntl->SetFailed(ELIMIT, "Reached %s's max_concurrency=%d",
sp->method->full_name().c_str(),
method_status->max_concurrency());
return SendHttpResponse(cntl.release(), server, method_status);
return SendHttpResponse(cntl.release(), server, method_status, msg->received_us());
}
}
......@@ -1186,23 +1188,23 @@ void ProcessHttpRequest(InputMessageBase *msg) {
if (socket->is_overcrowded()) {
cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",
butil::endpoint2str(socket->remote_side()).c_str());
return SendHttpResponse(cntl.release(), server, method_status);
return SendHttpResponse(cntl.release(), server, method_status, msg->received_us());
}
if (!server_accessor.AddConcurrency(cntl.get())) {
cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",
server->options().max_concurrency);
return SendHttpResponse(cntl.release(), server, method_status);
return SendHttpResponse(cntl.release(), server, method_status, msg->received_us());
}
if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
cntl->SetFailed(ELIMIT, "Too many user code to run when"
" -usercode_in_pthread is on");
return SendHttpResponse(cntl.release(), server, method_status);
return SendHttpResponse(cntl.release(), server, method_status, msg->received_us());
}
} else if (security_mode) {
cntl->SetFailed(EPERM, "Not allowed to access builtin services, try "
"ServerOptions.internal_port=%d instead if you're in"
" internal network", server->options().internal_port);
return SendHttpResponse(cntl.release(), server, method_status);
return SendHttpResponse(cntl.release(), server, method_status, msg->received_us());
}
google::protobuf::Service* svc = sp->service;
......@@ -1215,7 +1217,7 @@ void ProcessHttpRequest(InputMessageBase *msg) {
if (__builtin_expect(!req || !res, 0)) {
PLOG(FATAL) << "Fail to new req or res";
cntl->SetFailed("Fail to new req or res");
return SendHttpResponse(cntl.release(), server, method_status);
return SendHttpResponse(cntl.release(), server, method_status, msg->received_us());
}
if (sp->params.allow_http_body_to_pb &&
method->input_type()->field_count() > 0) {
......@@ -1229,7 +1231,7 @@ void ProcessHttpRequest(InputMessageBase *msg) {
cntl->SetFailed(EREQUEST, "%s needs to be created from a"
" non-empty json, it has required fields.",
req->GetDescriptor()->full_name().c_str());
return SendHttpResponse(cntl.release(), server, method_status);
return SendHttpResponse(cntl.release(), server, method_status, msg->received_us());
} // else all fields of the request are optional.
} else {
const std::string* encoding =
......@@ -1240,7 +1242,7 @@ void ProcessHttpRequest(InputMessageBase *msg) {
butil::IOBuf uncompressed;
if (!policy::GzipDecompress(req_body, &uncompressed)) {
cntl->SetFailed(EREQUEST, "Fail to un-gzip request body");
return SendHttpResponse(cntl.release(), server, method_status);
return SendHttpResponse(cntl.release(), server, method_status, msg->received_us());
}
req_body.swap(uncompressed);
}
......@@ -1248,7 +1250,7 @@ void ProcessHttpRequest(InputMessageBase *msg) {
if (!ParsePbFromIOBuf(req.get(), req_body)) {
cntl->SetFailed(EREQUEST, "Fail to parse http body as %s",
req->GetDescriptor()->full_name().c_str());
return SendHttpResponse(cntl.release(), server, method_status);
return SendHttpResponse(cntl.release(), server, method_status, msg->received_us());
}
} else {
butil::IOBufAsZeroCopyInputStream wrapper(req_body);
......@@ -1259,7 +1261,7 @@ void ProcessHttpRequest(InputMessageBase *msg) {
if (!json2pb::JsonToProtoMessage(&wrapper, req.get(), options, &err)) {
cntl->SetFailed(EREQUEST, "Fail to parse http body as %s, %s",
req->GetDescriptor()->full_name().c_str(), err.c_str());
return SendHttpResponse(cntl.release(), server, method_status);
return SendHttpResponse(cntl.release(), server, method_status, msg->received_us());
}
}
}
......@@ -1268,15 +1270,16 @@ void ProcessHttpRequest(InputMessageBase *msg) {
cntl->request_attachment().swap(req_body);
}
imsg_guard.reset(); // optional, just release resourse ASAP
google::protobuf::Closure* done = brpc::NewCallback<
Controller*, const google::protobuf::Message*,
const google::protobuf::Message*, const Server*,
MethodStatus *, long>(
MethodStatus *, int64_t>(
&SendHttpResponse, cntl.get(),
req.get(), res.get(), server,
method_status, start_parse_us);
method_status, msg->received_us());
imsg_guard.reset(); // optional, just release resourse ASAP
if (span) {
span->set_start_callback_us(butil::cpuwide_time_us());
span->AsParent();
......
......@@ -225,7 +225,7 @@ static void SendHuluResponse(int64_t correlation_id,
const google::protobuf::Message* res,
const Server* server,
MethodStatus* method_status_raw,
long start_parse_us) {
int64_t received_us) {
ControllerPrivateAccessor accessor(cntl);
Span* span = accessor.span();
if (span) {
......@@ -320,7 +320,7 @@ static void SendHuluResponse(int64_t correlation_id,
}
if (method_status) {
method_status.release()->OnResponded(
!cntl->Failed(), butil::cpuwide_time_us() - start_parse_us);
!cntl->Failed(), butil::cpuwide_time_us() - received_us);
}
}
......@@ -487,19 +487,21 @@ void ProcessHuluRequest(InputMessageBase* msg_base) {
CompressTypeToCStr(req_cmp_type), reqsize);
break;
}
// optional, just release resourse ASAP
msg.reset();
req_buf.clear();
res.reset(svc->GetResponsePrototype(method).New());
// `socket' will be held until response has been sent
google::protobuf::Closure* done = ::brpc::NewCallback<
int64_t, HuluController*, const google::protobuf::Message*,
const google::protobuf::Message*, const Server*,
MethodStatus *, long>(
MethodStatus *, int64_t>(
&SendHuluResponse, correlation_id, cntl.get(),
req.get(), res.get(), server,
method_status, start_parse_us);
method_status, msg->received_us());
// optional, just release resourse ASAP
msg.reset();
req_buf.clear();
if (span) {
span->set_start_callback_us(butil::cpuwide_time_us());
span->AsParent();
......@@ -523,7 +525,7 @@ void ProcessHuluRequest(InputMessageBase* msg_base) {
// `socket' will be held until response has been sent
SendHuluResponse(correlation_id, cntl.release(),
req.release(), res.release(), server,
method_status, -1);
method_status, msg->received_us());
}
bool VerifyHuluRequest(const InputMessageBase* msg_base) {
......
......@@ -41,13 +41,13 @@ namespace policy {
struct SendMongoResponse : public google::protobuf::Closure {
SendMongoResponse(const Server *server) :
status(NULL),
start_callback_us(0L),
received_us(0L),
server(server) {}
~SendMongoResponse();
void Run();
MethodStatus* status;
long start_callback_us;
int64_t received_us;
const Server *server;
Controller cntl;
MongoRequest req;
......@@ -104,7 +104,7 @@ void SendMongoResponse::Run() {
}
if (method_status) {
method_status.release()->OnResponded(
!cntl.Failed(), butil::cpuwide_time_us() - start_callback_us);
!cntl.Failed(), butil::cpuwide_time_us() - received_us);
}
}
......@@ -267,7 +267,7 @@ void ProcessMongoRequest(InputMessageBase* msg_base) {
mongo_done->req.mutable_header()->set_op_code(
static_cast<MongoOp>(header->op_code));
mongo_done->res.mutable_header()->set_response_to(header->request_id);
mongo_done->start_callback_us = butil::cpuwide_time_us();
mongo_done->received_us = msg->received_us();
google::protobuf::Service* svc = mp->service;
const google::protobuf::MethodDescriptor* method = mp->method;
......
......@@ -40,7 +40,7 @@ namespace brpc {
NsheadClosure::NsheadClosure(void* additional_space)
: _server(NULL)
, _start_parse_us(0)
, _received_us(0)
, _do_respond(true)
, _additional_space(additional_space) {
}
......@@ -125,7 +125,7 @@ void NsheadClosure::Run() {
}
if (method_status) {
method_status.release()->OnResponded(
!_controller.Failed(), butil::cpuwide_time_us() - cpuwide_start_us());
!_controller.Failed(), butil::cpuwide_time_us() - _received_us);
}
}
......@@ -249,7 +249,7 @@ void ProcessNsheadRequest(InputMessageBase* msg_base) {
req->head = *req_head;
msg->payload.swap(req->body);
nshead_done->_start_parse_us = start_parse_us;
nshead_done->_received_us = msg->received_us();
nshead_done->_server = server;
ServerPrivateAccessor server_accessor(server);
......
......@@ -209,7 +209,7 @@ static void SendSofaResponse(int64_t correlation_id,
const google::protobuf::Message* res,
const Server* server,
MethodStatus* method_status_raw,
long start_parse_us) {
int64_t received_us) {
ControllerPrivateAccessor accessor(cntl);
Span* span = accessor.span();
if (span) {
......@@ -296,7 +296,7 @@ static void SendSofaResponse(int64_t correlation_id,
}
if (method_status) {
method_status.release()->OnResponded(
!cntl->Failed(), butil::cpuwide_time_us() - start_parse_us);
!cntl->Failed(), butil::cpuwide_time_us() - received_us);
}
}
......@@ -432,17 +432,19 @@ void ProcessSofaRequest(InputMessageBase* msg_base) {
req_cmp_type, (int)msg->payload.size());
break;
}
msg.reset(); // optional, just release resourse ASAP
res.reset(svc->GetResponsePrototype(method).New());
// `socket' will be held until response has been sent
google::protobuf::Closure* done = ::brpc::NewCallback<
int64_t, Controller*, const google::protobuf::Message*,
const google::protobuf::Message*, const Server*,
MethodStatus *, long>(
MethodStatus *, int64_t>(
&SendSofaResponse, correlation_id, cntl.get(),
req.get(), res.get(), server,
method_status, start_parse_us);
method_status, msg->received_us());
msg.reset(); // optional, just release resourse ASAP
// `cntl', `req' and `res' will be deleted inside `done'
if (span) {
span->set_start_callback_us(butil::cpuwide_time_us());
......@@ -467,7 +469,7 @@ void ProcessSofaRequest(InputMessageBase* msg_base) {
// `socket' will be held until response has been sent
SendSofaResponse(correlation_id, cntl.release(),
req.release(), res.release(), server,
method_status, -1);
method_status, msg->received_us());
}
bool VerifySofaRequest(const InputMessageBase* msg_base) {
......
......@@ -195,14 +195,14 @@ private:
friend void ProcessThriftRequest(InputMessageBase* msg_base);
butil::atomic<int> _run_counter;
int64_t _start_parse_us;
int64_t _received_us;
ThriftFramedMessage _request;
ThriftFramedMessage _response;
Controller _controller;
};
inline ThriftClosure::ThriftClosure()
: _run_counter(0), _start_parse_us(0) {
: _run_counter(0), _received_us(0) {
}
ThriftClosure::~ThriftClosure() {
......@@ -350,7 +350,7 @@ void ThriftClosure::DoRun() {
}
if (method_status) {
method_status.release()->OnResponded(
!_controller.Failed(), butil::cpuwide_time_us() - _start_parse_us);
!_controller.Failed(), butil::cpuwide_time_us() - _received_us);
}
}
......@@ -446,7 +446,7 @@ void ProcessThriftRequest(InputMessageBase* msg_base) {
Controller* cntl = &(thrift_done->_controller);
ThriftFramedMessage* req = &(thrift_done->_request);
ThriftFramedMessage* res = &(thrift_done->_response);
thrift_done->_start_parse_us = start_parse_us;
thrift_done->_received_us = msg->received_us();
ServerPrivateAccessor server_accessor(server);
const bool security_mode = server->options().security_mode() &&
......@@ -533,6 +533,7 @@ void ProcessThriftRequest(InputMessageBase* msg_base) {
} while (false);
msg.reset(); // optional, just release resourse ASAP
if (span) {
span->ResetServerSpanName(cntl->thrift_method_name());
span->set_start_callback_us(butil::cpuwide_time_us());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment