Commit 20f2b4a8 authored by jamesge's avatar jamesge

Support pass down request_id

parent d06e3305
......@@ -26,8 +26,9 @@
#include <brpc/server.h>
#include "echo.pb.h"
#include <bvar/bvar.h>
#include <butil/fast_rand.h>
DEFINE_int32(thread_num, 4, "Number of threads to send requests");
DEFINE_int32(thread_num, 2, "Number of threads to send requests");
DEFINE_bool(use_bthread, false, "Use bthread to send requests");
DEFINE_string(attachment, "foo", "Carry this along with requests");
DEFINE_string(connection_type, "", "Connection type. Available values: single, pooled, short");
......@@ -38,7 +39,7 @@ DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
DEFINE_string(protocol, "baidu_std", "Protocol type. Defined in src/brpc/options.proto");
DEFINE_int32(depth, 0, "number of loop calls");
// Don't send too frequently in this example
DEFINE_int32(sleep_ms, 100, "milliseconds to sleep after each RPC");
DEFINE_int32(sleep_ms, 1000, "milliseconds to sleep after each RPC");
DEFINE_int32(dummy_port, -1, "Launch dummy server at this port");
bvar::LatencyRecorder g_latency_recorder("client");
......@@ -50,7 +51,6 @@ void* sender(void* arg) {
example::EchoService_Stub stub(chan);
// Send a request and wait for the response every 1 second.
int log_id = 0;
while (!brpc::IsAskedToQuit()) {
// We will receive response synchronously, safe to put variables
// on stack.
......@@ -63,7 +63,9 @@ void* sender(void* arg) {
request.set_depth(FLAGS_depth);
}
cntl.set_log_id(log_id ++); // set by user
// Set request_id to be a random string
cntl.set_request_id(butil::fast_rand_printable(9));
// Set attachment which is wired to network directly instead of
// being serialized into protobuf messages.
cntl.request_attachment().append(FLAGS_attachment);
......
......@@ -52,27 +52,25 @@ public:
static_cast<brpc::Controller*>(cntl_base);
if (request->depth() > 0) {
TRACEPRINTF("I'm about to call myself for another time, depth=%d",
request->depth());
CLOGI(cntl) << "I'm about to call myself for another time, depth=" << request->depth();
example::EchoService_Stub stub(&channel);
example::EchoRequest request2;
example::EchoResponse response2;
brpc::Controller cntl2;
brpc::Controller cntl2(cntl->inheritable());
request2.set_message(request->message());
request2.set_depth(request->depth() - 1);
cntl2.set_log_id(cntl->log_id());
cntl2.set_timeout_ms(FLAGS_timeout_ms);
cntl2.set_max_retry(FLAGS_max_retry);
stub.Echo(&cntl2, &request2, &response2, NULL);
if (cntl2.Failed()) {
LOG(ERROR) << "Fail to send EchoRequest, " << cntl2.ErrorText();
CLOGE(&cntl2) << "Fail to send EchoRequest, " << cntl2.ErrorText();
cntl->SetFailed(cntl2.ErrorCode(), "%s", cntl2.ErrorText().c_str());
return;
}
response->set_message(response2.message());
} else {
TRACEPRINTF("I'm the last call");
CLOGI(cntl) << "I'm the last call";
response->set_message(request->message());
}
......
......@@ -83,7 +83,6 @@ namespace brpc {
DEFINE_bool(graceful_quit_on_sigterm, false,
"Register SIGTERM handle func to quit graceful");
DEFINE_string(request_id_header, "x-request-id", "The http header to mark a session");
const IdlNames idl_single_req_single_res = { "req", "res" };
const IdlNames idl_single_req_multi_res = { "req", "" };
......@@ -130,6 +129,13 @@ Controller::Controller() {
ResetPods();
}
Controller::Controller(const Inheritable& parent_ctx) {
CHECK_EQ(0, pthread_once(&s_create_vars_once, CreateVars));
*g_ncontroller << 1;
ResetPods();
_inheritable = parent_ctx;
}
struct SessionKVFlusher {
Controller* cntl;
};
......@@ -247,7 +253,7 @@ void Controller::ResetPods() {
_response_compress_type = COMPRESS_TYPE_NONE;
_fail_limit = UNSET_MAGIC_NUM;
_pipelined_count = 0;
_log_id = 0;
_inheritable.Reset();
_pchan_sub_count = 0;
_response = NULL;
_done = NULL;
......@@ -330,7 +336,7 @@ void Controller::set_max_retry(int max_retry) {
void Controller::set_log_id(uint64_t log_id) {
add_flag(FLAGS_LOG_ID);
_log_id = log_id;
_inheritable.log_id = log_id;
}
......@@ -1249,7 +1255,7 @@ void Controller::SaveClientSettings(ClientSettings* s) const {
s->tos = _tos;
s->connection_type = _connection_type;
s->request_compress_type = _request_compress_type;
s->log_id = _log_id;
s->log_id = log_id();
s->has_request_code = has_request_code();
s->request_code = _request_code;
}
......@@ -1517,8 +1523,8 @@ void Controller::FlushSessionKV(std::ostream& os) {
}
const std::string* pRID = nullptr;
if (_http_request) {
pRID = _http_request->GetHeader(FLAGS_request_id_header);
if (!request_id().empty()) {
pRID = &request_id();
}
if (FLAGS_log_as_json) {
......@@ -1544,10 +1550,11 @@ std::ostream& operator<<(std::ostream& os, const Controller::LogPrefixDummy& p)
p.DoPrintLogPrefix(os);
return os;
}
void Controller::DoPrintLogPrefix(std::ostream& os) const {
const std::string* pRID = nullptr;
if (_http_request) {
pRID = _http_request->GetHeader(FLAGS_request_id_header);
if (!request_id().empty()) {
pRID = &request_id();
if (pRID) {
if (FLAGS_log_as_json) {
os << BRPC_REQ_ID "\":\"" << *pRID << "\",";
......
......@@ -23,6 +23,7 @@
// on internal structures, use opaque pointers instead.
#include <gflags/gflags.h> // Users often need gflags
#include <string>
#include "butil/intrusive_ptr.hpp" // butil::intrusive_ptr
#include "bthread/errno.h" // Redefine errno
#include "butil/endpoint.h" // butil::EndPoint
......@@ -143,8 +144,21 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
static const uint32_t FLAGS_ALWAYS_PRINT_PRIMITIVE_FIELDS = (1 << 18);
static const uint32_t FLAGS_HEALTH_CHECK_CALL = (1 << 19);
public:
struct Inheritable {
Inheritable() : log_id(0) {}
void Reset() {
log_id = 0;
request_id.clear();
}
uint64_t log_id;
std::string request_id;
};
public:
Controller();
Controller(const Inheritable& parent_ctx);
~Controller();
// ------------------------------------------------------------------
......@@ -200,6 +214,8 @@ public:
// queries following the topology of servers) with a same log_id.
void set_log_id(uint64_t log_id);
void set_request_id(std::string request_id) { _inheritable.request_id = request_id; }
// Set type of service: http://en.wikipedia.org/wiki/Type_of_service
// Current implementation has limits: If the connection is already
// established, this setting has no effect until the connection is broken
......@@ -470,8 +486,10 @@ public:
int ErrorCode() const { return _error_code; }
// Getters:
const Inheritable& inheritable() { return _inheritable; }
bool has_log_id() const { return has_flag(FLAGS_LOG_ID); }
uint64_t log_id() const { return _log_id; }
uint64_t log_id() const { return _inheritable.log_id; }
const std::string& request_id() const { return _inheritable.request_id; }
CompressType request_compress_type() const { return _request_compress_type; }
CompressType response_compress_type() const { return _response_compress_type; }
const HttpHeader& http_request() const
......@@ -731,7 +749,7 @@ private:
int _preferred_index;
CompressType _request_compress_type;
CompressType _response_compress_type;
uint64_t _log_id;
Inheritable _inheritable;
int _pchan_sub_count;
google::protobuf::Message* _response;
google::protobuf::Closure* _done;
......@@ -814,8 +832,19 @@ std::ostream& operator<<(std::ostream& os, const Controller::LogPrefixDummy& p);
} // namespace brpc
// Print contextual logs with @rid which is got from "x-request-id"(changable
// by -request_id_header) in http header by default
// Print contextual logs prefixed with "@rid=REQUEST_ID" which marks a session
// and eases debugging. The REQUEST_ID is carried in http/rpc request or
// inherited from another controller.
// As a server:
// Call CLOG*(cntl) << ... to log instead of LOG(*) << ..
// As a client:
// Inside a service:
// Use Controller(service_cntl->inheritable()) to create controllers which
// inherit session info from the service's requests
// Standalone brpc client:
// Set cntl->set_request_id(REQUEST_ID);
// Standalone http client:
// Set header 'X-REQUEST-ID'
#define CLOGD(cntl) LOG(DEBUG) << (cntl)->LogPrefix()
#define CLOGI(cntl) LOG(INFO) << (cntl)->LogPrefix()
#define CLOGW(cntl) LOG(WARNING) << (cntl)->LogPrefix()
......
......@@ -41,6 +41,7 @@ message RpcRequestMeta {
optional int64 trace_id = 4;
optional int64 span_id = 5;
optional int64 parent_span_id = 6;
optional string request_id = 7; // correspond to x-request-id in http header
}
message RpcResponseMeta {
......
......@@ -343,6 +343,9 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
if (request_meta.has_log_id()) {
cntl->set_log_id(request_meta.log_id());
}
if (request_meta.has_request_id()) {
cntl->set_request_id(request_meta.request_id());
}
cntl->set_request_compress_type((CompressType)meta.compress_type());
accessor.set_server(server)
.set_security_mode(security_mode)
......@@ -648,6 +651,9 @@ void PackRpcRequest(butil::IOBuf* req_buf,
if (cntl->has_log_id()) {
request_meta->set_log_id(cntl->log_id());
}
if (!cntl->request_id().empty()) {
request_meta->set_request_id(cntl->request_id());
}
meta.set_correlation_id(correlation_id);
StreamId request_stream_id = accessor.request_stream();
if (request_stream_id != INVALID_STREAM_ID) {
......
......@@ -67,10 +67,13 @@ DEFINE_string(http_header_of_user_ip, "", "http requests sent by proxies may "
"brpc will read ip:port from the specified header for "
"authorization and set Controller::remote_side()");
DEFINE_bool(pb_enum_as_number, false, "[Not recommended] Convert enums in "
DEFINE_bool(pb_enum_as_number, false,
"[Not recommended] Convert enums in "
"protobuf to json as numbers, affecting both client-side and "
"server-side");
DEFINE_string(request_id_header, "x-request-id", "The http header to mark a session");
// Read user address from the header specified by -http_header_of_user_ip
static bool GetUserAddressFromHeaderImpl(const HttpHeader& headers,
butil::EndPoint* user_addr) {
......@@ -566,8 +569,10 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/,
// Fill log-id if user set it.
if (cntl->has_log_id()) {
hreq.SetHeader(common->LOG_ID,
butil::string_printf(
"%llu", (unsigned long long)cntl->log_id()));
butil::string_printf("%llu", (unsigned long long)cntl->log_id()));
}
if (!cntl->request_id().empty()) {
hreq.SetHeader(FLAGS_request_id_header, cntl->request_id());
}
if (!is_http2) {
......@@ -1264,6 +1269,11 @@ void ProcessHttpRequest(InputMessageBase *msg) {
}
}
const std::string* request_id = req_header.GetHeader(FLAGS_request_id_header);
if (request_id) {
cntl->set_request_id(*request_id);
}
// Tag the bthread with this server's key for
// thread_local_data().
if (server->thread_local_options().thread_local_data_factory) {
......
......@@ -178,4 +178,19 @@ void fast_rand_bytes(void* output, size_t output_length) {
}
}
std::string fast_rand_printable(size_t length) {
std::string result(length, 0);
const size_t halflen = length/2;
fast_rand_bytes(&result[0], halflen);
for (size_t i = 0; i < halflen; ++i) {
const uint8_t b = result[halflen - 1 - i];
result[length - 1 - 2*i] = 'A' + (b & 0xF);
result[length - 2 - 2*i] = 'A' + (b >> 4);
}
if (halflen * 2 != length) {
result[0] = 'A' + (fast_rand() % 16);
}
return result;
}
} // namespace butil
......@@ -20,7 +20,9 @@
#ifndef BUTIL_FAST_RAND_H
#define BUTIL_FAST_RAND_H
#include <cstddef>
#include <stdint.h>
#include <string>
namespace butil {
......@@ -66,7 +68,10 @@ template <typename T> T fast_rand_in(T min, T max) {
double fast_rand_double();
// Fills |output_length| bytes of |output| with random data.
void fast_rand_bytes(void* output, size_t output_length, uint8_t min);
void fast_rand_bytes(void *output, size_t output_length);
// Generate a random printable string of |length| bytes
std::string fast_rand_printable(size_t length);
}
......
......@@ -122,7 +122,7 @@ TEST_F(ControllerTest, SessionKV) {
ASSERT_TRUE(startsWith(sink1, "W")) << sink1;
sink1.clear();
cntl.http_request().SetHeader("x-request-id", "abcdEFG-456");
cntl.set_request_id("abcdEFG-456");
CLOGE(&cntl) << "My ERROR Log";
ASSERT_TRUE(endsWith(sink1, "] @rid=abcdEFG-456 My ERROR Log")) << sink1;
ASSERT_TRUE(startsWith(sink1, "E")) << sink1;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment