Commit 86acfa89 authored by jamesge's avatar jamesge

add Controller.SessionKV() to record and print session-level KV; Add…

add Controller.SessionKV() to record and print session-level KV; Add LOGD/I/W/E/F to print contextual log; Add flag -log_as_json to print logs as valid JSON
parent 74d156c8
......@@ -79,7 +79,9 @@ BAIDU_REGISTER_ERRNO(brpc::EITP, "Bad Itp response");
namespace brpc {
DEFINE_bool(graceful_quit_on_sigterm, false, "Register SIGTERM handle func to quit graceful");
DEFINE_bool(graceful_quit_on_sigterm, false,
"Register SIGTERM handle func to quit graceful");
DEFINE_string(request_id_header, "x-request-id", "The http header to mark a session");
const IdlNames idl_single_req_single_res = { "req", "res" };
const IdlNames idl_single_req_multi_res = { "req", "" };
......@@ -128,6 +130,7 @@ Controller::Controller() {
Controller::~Controller() {
*g_ncontroller << -1;
FlushSessionKV(LOG_STREAM(INFO));
ResetNonPods();
}
......@@ -1485,4 +1488,82 @@ google::protobuf::Closure* DoNothing() {
return butil::get_leaky_singleton<DoNothingClosure>();
}
KVMap& Controller::SessionKV() {
if (_session_kv == nullptr) {
_session_kv.reset(new KVMap);
}
return *_session_kv.get();
}
#define BRPC_SESSION_END_MSG "Session ends"
#define BRPC_REQ_ID "@rid"
#define BRPC_KV_SEP ":"
void Controller::FlushSessionKV(std::ostream& os) {
if (_session_kv == nullptr || _session_kv->Count() == 0) {
return;
}
const std::string* pRID = nullptr;
if (_http_request) {
pRID = _http_request->GetHeader(FLAGS_request_id_header);
}
if (logging::FLAGS_log_as_json) {
os << "\"M\":\"" BRPC_SESSION_END_MSG "\"";
if (pRID) {
os << ",\"" BRPC_REQ_ID "\":\"" << *pRID << '"';
}
for (auto it = _session_kv->Begin(); it != _session_kv->End(); ++it) {
os << ",\"" << it->first << "\":\"" << it->second << '"';
}
} else {
os << BRPC_SESSION_END_MSG;
if (pRID) {
os << " " BRPC_REQ_ID BRPC_KV_SEP << *pRID;
}
for (auto it = _session_kv->Begin(); it != _session_kv->End(); ++it) {
os << ' ' << it->first << BRPC_KV_SEP << it->second;
}
}
}
Controller::LogPostfixDummy::~LogPostfixDummy() {
*osptr << postfix;
}
std::ostream& operator<<(std::ostream& os, const Controller::LogPostfixDummy& p) {
const_cast<brpc::Controller::LogPostfixDummy&>(p).osptr = &os;
if (logging::FLAGS_log_as_json) {
os << "\"M\":\"";
}
return os;
}
Controller::LogPostfixDummy Controller::LogPostfix() const {
Controller::LogPostfixDummy result;
std::string& p = result.postfix;
if (logging::FLAGS_log_as_json) {
p.push_back('"');
}
const std::string* pRID = nullptr;
if (_http_request) {
pRID = _http_request->GetHeader(FLAGS_request_id_header);
if (pRID) {
if (logging::FLAGS_log_as_json) {
p.append(",\"" BRPC_REQ_ID "\":\"");
p.append(*pRID);
p.push_back('"');
} else {
p.reserve(5 + pRID->size());
p.append(" " BRPC_REQ_ID BRPC_KV_SEP);
p.append(*pRID);
}
}
}
return result;
}
} // namespace brpc
......@@ -43,6 +43,7 @@
#include "brpc/progressive_attachment.h" // ProgressiveAttachment
#include "brpc/progressive_reader.h" // ProgressiveReader
#include "brpc/grpc.h"
#include "brpc/kvmap.h"
// EAUTH is defined in MAC
#ifndef EAUTH
......@@ -482,6 +483,19 @@ public:
const butil::IOBuf& request_attachment() const { return _request_attachment; }
const butil::IOBuf& response_attachment() const { return _response_attachment; }
// Get the object to write key/value which will be flushed into
// LOG(INFO) when this controller is deleted.
KVMap& SessionKV();
// Contextual prefixes for LOGD/LOGI/LOGW/LOGE/LOGF macros
struct LogPostfixDummy {
LogPostfixDummy() : osptr(nullptr) {}
~LogPostfixDummy();
std::string postfix;
std::ostream* osptr;
};
LogPostfixDummy LogPostfix() const;
// Return true if the remote side creates a stream.
bool has_remote_stream() { return _remote_stream_settings != NULL; }
......@@ -660,6 +674,9 @@ private:
std::string& protocol_param() { return _thrift_method_name; }
const std::string& protocol_param() const { return _thrift_method_name; }
// Flush this->SessionKV() into `os'
void FlushSessionKV(std::ostream& os);
private:
// NOTE: align and group fields to make Controller as compact as possible.
......@@ -739,6 +756,8 @@ private:
HttpHeader* _http_request;
HttpHeader* _http_response;
std::unique_ptr<KVMap> _session_kv;
// Fields with large size but low access frequency
butil::IOBuf _request_attachment;
butil::IOBuf _response_attachment;
......@@ -787,7 +806,15 @@ bool IsAskedToQuit();
// Send Ctrl-C to current process.
void AskToQuit();
std::ostream& operator<<(std::ostream& os, const Controller::LogPostfixDummy& p);
} // namespace brpc
// Print contextual logs
#define LOGD(cntl) LOG(DEBUG) << (cntl)->LogPostfix()
#define LOGI(cntl) LOG(INFO) << (cntl)->LogPostfix()
#define LOGW(cntl) LOG(WARNING) << (cntl)->LogPostfix()
#define LOGE(cntl) LOG(ERROR) << (cntl)->LogPostfix()
#define LOGF(cntl) LOG(FATAL) << (cntl)->LogPostfix()
#endif // BRPC_CONTROLLER_H
......@@ -15,28 +15,23 @@
// specific language governing permissions and limitations
// under the License.
#ifndef BRPC_SESSION_LOG_H
#define BRPC_SESSION_LOG_H
#ifndef BRPC_KVMAP_H
#define BRPC_KVMAP_H
#include "butil/containers/flat_map.h"
namespace brpc {
class SessionLog {
// Remember Key/Values in string
class KVMap {
public:
class Formatter {
public:
virtual ~Formatter() {}
virtual void Print(std::ostream&, const SessionLog&) = 0;
};
typedef butil::FlatMap<std::string, std::string> Map;
typedef Map::const_iterator Iterator;
SessionLog() {}
KVMap() {}
// Exchange internal fields with another SessionLog.
void Swap(SessionLog &rhs) { _entries.swap(rhs._entries); }
// Exchange internal fields with another KVMap.
void Swap(KVMap &rhs) { _entries.swap(rhs._entries); }
// Reset internal fields as if they're just default-constructed.
void Clear() { _entries.clear(); }
......@@ -77,4 +72,4 @@ private:
} // namespace brpc
#endif // BRPC_SESSION_LOG_H
#endif // BRPC_KVMAP_H
......@@ -127,7 +127,7 @@ DEFINE_string(vmodule, "", "per-module verbose level."
" (that is, name ignoring .cpp/.h)."
" LOG_LEVEL overrides any value given by --v.");
DEFINE_bool(log_process_id, false, "Log process id");
DEFINE_bool(log_pid, false, "Log process id");
DEFINE_int32(minloglevel, 0, "Any log at or above this level will be "
"displayed. Anything below this level will be silently ignored. "
......@@ -139,6 +139,8 @@ DEFINE_bool(log_hostname, false, "Add host after pid in each log so"
DEFINE_bool(log_year, false, "Log year in datetime part in each log");
DEFINE_bool(log_as_json, false, "Print log as a valid JSON");
namespace {
LoggingDestination logging_destination = LOG_DEFAULT;
......@@ -453,7 +455,7 @@ void SetLogAssertHandler(LogAssertHandler handler) {
const char* const log_severity_names[LOG_NUM_SEVERITIES] = {
"INFO", "NOTICE", "WARNING", "ERROR", "FATAL" };
inline void log_severity_name(std::ostream& os, int severity) {
static void PrintLogSeverity(std::ostream& os, int severity) {
if (severity < 0) {
// Add extra space to separate from following datetime.
os << 'V' << -severity << ' ';
......@@ -464,9 +466,9 @@ inline void log_severity_name(std::ostream& os, int severity) {
}
}
void print_log_prefix(std::ostream& os,
int severity, const char* file, int line) {
log_severity_name(os, severity);
static void PrintLogPrefix(
std::ostream& os, int severity, const char* file, int line) {
PrintLogSeverity(os, severity);
#if defined(OS_LINUX)
timeval tv;
gettimeofday(&tv, NULL);
......@@ -492,7 +494,7 @@ void print_log_prefix(std::ostream& os,
#if defined(OS_LINUX)
os << '.' << std::setw(6) << tv.tv_usec;
#endif
if (FLAGS_log_process_id) {
if (FLAGS_log_pid) {
os << ' ' << std::setfill(' ') << std::setw(5) << CurrentProcessId();
}
os << ' ' << std::setfill(' ') << std::setw(5)
......@@ -508,6 +510,62 @@ void print_log_prefix(std::ostream& os,
os.fill(prev_fill);
}
static void PrintLogPrefixAsJSON(
std::ostream& os, int severity, const char* file, int line) {
// severity
os << "\"L\":\"";
if (severity < 0) {
os << 'V' << -severity;
} else if (severity < LOG_NUM_SEVERITIES) {
os << log_severity_names[severity][0];
} else {
os << 'U';
}
// time
os << "\",\"T\":\"";
#if defined(OS_LINUX)
timeval tv;
gettimeofday(&tv, NULL);
time_t t = tv.tv_sec;
#else
time_t t = time(NULL);
#endif
struct tm local_tm = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, NULL};
#if _MSC_VER >= 1400
localtime_s(&local_tm, &t);
#else
localtime_r(&t, &local_tm);
#endif
const char prev_fill = os.fill('0');
if (FLAGS_log_year) {
os << std::setw(4) << local_tm.tm_year + 1900;
}
os << std::setw(2) << local_tm.tm_mon + 1
<< std::setw(2) << local_tm.tm_mday << ' '
<< std::setw(2) << local_tm.tm_hour << ':'
<< std::setw(2) << local_tm.tm_min << ':'
<< std::setw(2) << local_tm.tm_sec;
#if defined(OS_LINUX)
os << '.' << std::setw(6) << tv.tv_usec;
#endif
os << "\",";
os.fill(prev_fill);
if (FLAGS_log_pid) {
os << "\"pid\":\"" << CurrentProcessId() << "\",";
}
os << "\"tid\":\"" << butil::PlatformThread::CurrentId() << "\",";
if (FLAGS_log_hostname) {
butil::StringPiece hostname(butil::my_hostname());
if (hostname.ends_with(".baidu.com")) { // make it shorter
hostname.remove_suffix(10);
}
os << "\"host\":\"" << hostname << "\",";
}
os << "\"C\":\"" << file << ':' << line << "\"";
}
// A log message handler that gets notified of every log message we process.
class DoublyBufferedLogSink : public butil::DoublyBufferedData<LogSink*> {
public:
......@@ -612,13 +670,32 @@ void DisplayDebugMessageInDialog(const std::string& str) {
bool StringSink::OnLogMessage(int severity, const char* file, int line,
const butil::StringPiece& content) {
std::ostringstream prefix_os;
print_log_prefix(prefix_os, severity, file, line);
bool pair_quote = false;
if (FLAGS_log_as_json) {
prefix_os << '{';
PrintLogPrefixAsJSON(prefix_os, severity, file, line);
if (content.empty() || content[0] != '"') {
// not a json, add 'M' field
prefix_os << ",\"M\":\"";
pair_quote = true;
} else {
prefix_os << ',';
}
} else {
PrintLogPrefix(prefix_os, severity, file, line);
}
const std::string prefix = prefix_os.str();
{
butil::AutoLock lock_guard(_lock);
reserve(size() + prefix.size() + content.size());
append(prefix);
append(content.data(), content.size());
if (FLAGS_log_as_json) {
if (pair_quote) {
push_back('"');
}
push_back('}');
}
}
return true;
}
......@@ -772,9 +849,27 @@ public:
// A LogSink focused on performance should also be able to handle
// non-continuous inputs which is a must to maximize performance.
std::ostringstream os;
print_log_prefix(os, severity, file, line);
os.write(content.data(), content.size());
os << '\n';
if (!FLAGS_log_as_json) {
PrintLogPrefix(os, severity, file, line);
os.write(content.data(), content.size());
os << '\n';
} else {
os << '{';
PrintLogPrefixAsJSON(os, severity, file, line);
bool pair_quote = false;
if (content.empty() || content[0] != '"') {
// not a json, add a 'M' field
os << ",\"M\":\"";
pair_quote = true;
} else {
os << ',';
}
os.write(content.data(), content.size());
if (pair_quote) {
os << '"';
}
os << "}\n";
}
std::string log = os.str();
if ((logging_destination & LOG_TO_SYSTEM_DEBUG_LOG) != 0) {
......
......@@ -409,6 +409,7 @@ const LogSeverity BLOG_0 = BLOG_ERROR;
#define VLOG_IS_ON(verbose_level) BAIDU_VLOG_IS_ON(verbose_level, __FILE__)
DECLARE_int32(v);
DECLARE_bool(log_as_json);
extern const int VLOG_UNINITIALIZED;
......
......@@ -21,6 +21,7 @@
#include <gtest/gtest.h>
#include <google/protobuf/stubs/common.h>
#include "butil/logging.h"
#include "butil/time.h"
#include "butil/macros.h"
#include "brpc/socket.h"
......@@ -72,3 +73,57 @@ TEST_F(ControllerTest, notify_on_destruction) {
delete cntl;
ASSERT_TRUE(cancel);
}
/*
class MyFormatter : public brpc::SessionLog::Formatter {
void Print(std::ostream& os, const brpc::SessionLog& log) override {
for (auto it = log.Begin(); it != log.End(); ++it) {
os << '"' << it->first << "\":\"" << it->second << "\",";
}
}
};
*/
static bool endsWith(const std::string& s1, const butil::StringPiece& s2) {
if (s1.size() < s2.size()) {
return false;
}
return memcmp(s1.data() + s1.size() - s2.size(), s2.data(), s2.size()) == 0;
}
static bool startsWith(const std::string& s1, const butil::StringPiece& s2) {
if (s1.size() < s2.size()) {
return false;
}
return memcmp(s1.data(), s2.data(), s2.size()) == 0;
}
TEST_F(ControllerTest, SessionKV) {
logging::FLAGS_log_as_json = false;
logging::StringSink sink1;
auto oldSink = logging::SetLogSink(&sink1);
//brpc::SetGlobalSessionLogFormatter(new MyFormatter);
{
brpc::Controller cntl;
cntl.set_log_id(123); // not working now
cntl.SessionKV().Set("Apple", 1);
cntl.SessionKV().Set("Baidu", "22");
cntl.SessionKV().Set("Cisco", 33.3);
LOGW(&cntl) << "My WARNING Log";
ASSERT_TRUE(endsWith(sink1, "] My WARNING Log")) << sink1;
ASSERT_TRUE(startsWith(sink1, "W")) << sink1;
sink1.clear();
cntl.http_request().SetHeader("x-request-id", "abcdEFG-456");
LOGE(&cntl) << "My ERROR Log";
ASSERT_TRUE(endsWith(sink1, "] My ERROR Log @rid:abcdEFG-456")) << sink1;
ASSERT_TRUE(startsWith(sink1, "E")) << sink1;
sink1.clear();
logging::FLAGS_log_as_json = true;
}
ASSERT_TRUE(endsWith(sink1, R"(,"M":"Session ends","@rid":"abcdEFG-456","Baidu":"22","Cisco":"33.300000","Apple":"1"})")) << sink1;
ASSERT_TRUE(startsWith(sink1, R"({"L":"I",)")) << sink1;
logging::SetLogSink(oldSink);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment