1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
// Copyright (c) 2014 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Author: chenzhangyi01@baidu.com gejun@baidu.com
// Date 2014/09/25 17:50:21
#ifndef BVAR_RECORDER_H
#define BVAR_RECORDER_H
#include <stdint.h> // int64_t uint64_t
#include "butil/macros.h" // BAIDU_CASSERT
#include "butil/logging.h" // LOG
#include "bvar/detail/combiner.h" // detail::AgentCombiner
#include "bvar/variable.h"
#include "bvar/window.h"
#include "bvar/detail/sampler.h"
namespace bvar {
struct Stat {
Stat() : sum(0), num(0) {};
Stat(int64_t sum2, int64_t num2) : sum(sum2), num(num2) {}
int64_t sum;
int64_t num;
int64_t get_average_int() const {
//num can be changed by sampling thread, use tmp_num
int64_t tmp_num = num;
if (tmp_num == 0) {
return 0;
}
return sum / (int64_t)tmp_num;
}
double get_average_double() const {
int64_t tmp_num = num;
if (tmp_num == 0) {
return 0.0;
}
return (double)sum / (double)tmp_num;
}
Stat operator-(const Stat& rhs) const {
return Stat(sum - rhs.sum, num - rhs.num);
}
void operator-=(const Stat& rhs) {
sum -= rhs.sum;
num -= rhs.num;
}
Stat operator+(const Stat& rhs) const {
return Stat(sum + rhs.sum, num + rhs.num);
}
void operator+=(const Stat& rhs) {
sum += rhs.sum;
num += rhs.num;
}
};
inline std::ostream& operator<<(std::ostream& os, const Stat& s) {
const int64_t v = s.get_average_int();
if (v != 0) {
return os << v;
} else {
return os << s.get_average_double();
}
}
// For calculating average of numbers.
// Example:
// IntRecorder latency;
// latency << 1 << 3 << 5;
// CHECK_EQ(3, latency.average());
class IntRecorder : public Variable {
public:
// Compressing format:
// | 20 bits (unsigned) | sign bit | 43 bits |
// num sum
const static size_t SUM_BIT_WIDTH=44;
const static uint64_t MAX_SUM_PER_THREAD = (1ul << SUM_BIT_WIDTH) - 1;
const static uint64_t MAX_NUM_PER_THREAD = (1ul << (64ul - SUM_BIT_WIDTH)) - 1;
BAIDU_CASSERT(SUM_BIT_WIDTH > 32 && SUM_BIT_WIDTH < 64,
SUM_BIT_WIDTH_must_be_between_33_and_63);
struct AddStat {
void operator()(Stat& s1, const Stat& s2) const { s1 += s2; }
};
struct MinusStat {
void operator()(Stat& s1, const Stat& s2) const { s1 -= s2; }
};
typedef Stat value_type;
typedef detail::ReducerSampler<IntRecorder, Stat,
AddStat, MinusStat> sampler_type;
typedef Stat SampleSet;
struct AddToStat {
void operator()(Stat& lhs, uint64_t rhs) const {
lhs.sum += _extend_sign_bit(_get_sum(rhs));
lhs.num += _get_num(rhs);
}
};
typedef detail::AgentCombiner<Stat, uint64_t, AddToStat> combiner_type;
typedef combiner_type::Agent agent_type;
IntRecorder() : _sampler(NULL) {}
explicit IntRecorder(const butil::StringPiece& name) : _sampler(NULL) {
expose(name);
}
IntRecorder(const butil::StringPiece& prefix, const butil::StringPiece& name)
: _sampler(NULL) {
expose_as(prefix, name);
}
~IntRecorder() {
hide();
if (_sampler) {
_sampler->destroy();
_sampler = NULL;
}
}
// Note: The input type is acutally int. Use int64_t to check overflow.
IntRecorder& operator<<(int64_t/*note*/ sample);
int64_t average() const {
return _combiner.combine_agents().get_average_int();
}
double average(double) const {
return _combiner.combine_agents().get_average_double();
}
Stat get_value() const {
return _combiner.combine_agents();
}
Stat reset() {
return _combiner.reset_all_agents();
}
AddStat op() const { return AddStat(); }
MinusStat inv_op() const { return MinusStat(); }
void describe(std::ostream& os, bool /*quote_string*/) const {
os << get_value();
}
bool valid() const { return _combiner.valid(); }
sampler_type* get_sampler() {
if (NULL == _sampler) {
_sampler = new sampler_type(this);
_sampler->schedule();
}
return _sampler;
}
// This name is useful for printing overflow log in operator<< since
// IntRecorder is often used as the source of data and not exposed.
void set_debug_name(const butil::StringPiece& name) {
_debug_name.assign(name.data(), name.size());
}
private:
// TODO: The following numeric functions should be independent utils
static uint64_t _get_sum(const uint64_t n) {
return (n & MAX_SUM_PER_THREAD);
}
static uint64_t _get_num(const uint64_t n) {
return n >> SUM_BIT_WIDTH;
}
// Fill all the first (64 - SUM_BIT_WIDTH + 1) bits with 1 if the sign bit is 1
// to represent a complete 64-bit negative number
// Check out http://en.wikipedia.org/wiki/Signed_number_representations if
// you are confused
static int64_t _extend_sign_bit(const uint64_t sum) {
return (((1ul << (64ul - SUM_BIT_WIDTH + 1)) - 1)
* ((1ul << (SUM_BIT_WIDTH - 1) & sum)))
| (int64_t)sum;
}
// Convert complement into a |SUM_BIT_WIDTH|-bit unsigned integer
static uint64_t _get_complement(int64_t n) {
return n & (MAX_SUM_PER_THREAD);
}
static uint64_t _compress(const uint64_t num, const uint64_t sum) {
return (num << SUM_BIT_WIDTH)
// There is a redundant '1' in the front of sum which was
// combined with two negative number, so truncation has to be
// done here
| (sum & MAX_SUM_PER_THREAD)
;
}
// Check whether the sum of the two integer overflows the range of signed
// integer with the width of SUM_BIT_WIDTH, which is
// [-2^(SUM_BIT_WIDTH -1), 2^(SUM_BIT_WIDTH -1) - 1) (eg. [-128, 127) for
// signed 8-bit integer)
static bool _will_overflow(const int64_t lhs, const int rhs) {
return
// Both integers are positive and the sum is larger than the largest
// number
((lhs > 0) && (rhs > 0)
&& (lhs + rhs > ((int64_t)MAX_SUM_PER_THREAD >> 1)))
// Or both integers are negative and the sum is less than the lowest
// number
|| ((lhs < 0) && (rhs < 0)
&& (lhs + rhs < (-((int64_t)MAX_SUM_PER_THREAD >> 1)) - 1))
// otherwise the sum cannot overflow iff lhs does not overflow
// because |sum| < |lhs|
;
}
private:
combiner_type _combiner;
sampler_type* _sampler;
std::string _debug_name;
};
inline IntRecorder& IntRecorder::operator<<(int64_t sample) {
if (BAIDU_UNLIKELY((int64_t)(int)sample != sample)) {
const char* reason = NULL;
if (sample > std::numeric_limits<int>::max()) {
reason = "overflows";
sample = std::numeric_limits<int>::max();
} else {
reason = "underflows";
sample = std::numeric_limits<int>::min();
}
// Truncate to be max or min of int. We're using 44 bits to store the
// sum thus following aggregations are not likely to be over/underflow.
if (!name().empty()) {
LOG(WARNING) << "Input=" << sample << " to `" << name()
<< "\' " << reason;
} else if (!_debug_name.empty()) {
LOG(WARNING) << "Input=" << sample << " to `" << _debug_name
<< "\' " << reason;
} else {
LOG(WARNING) << "Input=" << sample << " to IntRecorder("
<< (void*)this << ") " << reason;
}
}
agent_type* agent = _combiner.get_or_create_tls_agent();
if (BAIDU_UNLIKELY(!agent)) {
LOG(FATAL) << "Fail to create agent";
return *this;
}
uint64_t n;
agent->element.load(&n);
const uint64_t complement = _get_complement(sample);
uint64_t num;
uint64_t sum;
do {
num = _get_num(n);
sum = _get_sum(n);
if (BAIDU_UNLIKELY((num + 1 > MAX_NUM_PER_THREAD) ||
_will_overflow(_extend_sign_bit(sum), sample))) {
// Although agent->element might have been cleared at this
// point, it is just OK because the very value is 0 in
// this case
agent->combiner->commit_and_clear(agent);
sum = 0;
num = 0;
n = 0;
}
} while (!agent->element.compare_exchange_weak(
n, _compress(num + 1, sum + complement)));
return *this;
}
} // namespace bvar
#endif //BVAR_RECORDER_H