// Copyright (c) 2014 Baidu, Inc. // Author Zhangyi Chen (chenzhangyi01@baidu.com) // Date 2014/10/16 17:55:39 #include <limits> //std::numeric_limits #include "bvar/reducer.h" #include "butil/time.h" #include "butil/macros.h" #include "butil/string_printf.h" #include "butil/string_splitter.h" #include <gtest/gtest.h> namespace { class ReducerTest : public testing::Test { protected: void SetUp() {} void TearDown() {} }; TEST_F(ReducerTest, atomicity) { ASSERT_EQ(sizeof(int32_t), sizeof(bvar::detail::ElementContainer<int32_t>)); ASSERT_EQ(sizeof(int64_t), sizeof(bvar::detail::ElementContainer<int64_t>)); ASSERT_EQ(sizeof(float), sizeof(bvar::detail::ElementContainer<float>)); ASSERT_EQ(sizeof(double), sizeof(bvar::detail::ElementContainer<double>)); } TEST_F(ReducerTest, adder) { bvar::Adder<uint32_t> reducer1; ASSERT_TRUE(reducer1.valid()); reducer1 << 2 << 4; ASSERT_EQ(6ul, reducer1.get_value()); #ifdef BAIDU_INTERNAL boost::any v1; reducer1.get_value(&v1); ASSERT_EQ(6u, boost::any_cast<unsigned int>(v1)); #endif bvar::Adder<double> reducer2; ASSERT_TRUE(reducer2.valid()); reducer2 << 2.0 << 4.0; ASSERT_DOUBLE_EQ(6.0, reducer2.get_value()); bvar::Adder<int> reducer3; ASSERT_TRUE(reducer3.valid()); reducer3 << -9 << 1 << 0 << 3; ASSERT_EQ(-5, reducer3.get_value()); } const size_t OPS_PER_THREAD = 500000; static void *thread_counter(void *arg) { bvar::Adder<uint64_t> *reducer = (bvar::Adder<uint64_t> *)arg; butil::Timer timer; timer.start(); for (size_t i = 0; i < OPS_PER_THREAD; ++i) { (*reducer) << 2; } timer.stop(); return (void *)(timer.n_elapsed()); } void *add_atomic(void *arg) { butil::atomic<uint64_t> *counter = (butil::atomic<uint64_t> *)arg; butil::Timer timer; timer.start(); for (size_t i = 0; i < OPS_PER_THREAD / 100; ++i) { counter->fetch_add(2, butil::memory_order_relaxed); } timer.stop(); return (void *)(timer.n_elapsed()); } static long start_perf_test_with_atomic(size_t num_thread) { butil::atomic<uint64_t> counter(0); pthread_t threads[num_thread]; for (size_t i = 0; i < num_thread; ++i) { pthread_create(&threads[i], NULL, &add_atomic, (void *)&counter); } long totol_time = 0; for (size_t i = 0; i < num_thread; ++i) { void *ret; pthread_join(threads[i], &ret); totol_time += (long)ret; } long avg_time = totol_time / (OPS_PER_THREAD / 100 * num_thread); EXPECT_EQ(2ul * num_thread * OPS_PER_THREAD / 100, counter.load()); return avg_time; } static long start_perf_test_with_adder(size_t num_thread) { bvar::Adder<uint64_t> reducer; EXPECT_TRUE(reducer.valid()); pthread_t threads[num_thread]; for (size_t i = 0; i < num_thread; ++i) { pthread_create(&threads[i], NULL, &thread_counter, (void *)&reducer); } long totol_time = 0; for (size_t i = 0; i < num_thread; ++i) { void *ret = NULL; pthread_join(threads[i], &ret); totol_time += (long)ret; } long avg_time = totol_time / (OPS_PER_THREAD * num_thread); EXPECT_EQ(2ul * num_thread * OPS_PER_THREAD, reducer.get_value()); return avg_time; } TEST_F(ReducerTest, perf) { std::ostringstream oss; for (size_t i = 1; i <= 24; ++i) { oss << i << '\t' << start_perf_test_with_adder(i) << '\n'; } LOG(INFO) << "Adder performance:\n" << oss.str(); oss.str(""); for (size_t i = 1; i <= 24; ++i) { oss << i << '\t' << start_perf_test_with_atomic(i) << '\n'; } LOG(INFO) << "Atomic performance:\n" << oss.str(); } TEST_F(ReducerTest, Min) { bvar::Miner<uint64_t> reducer; ASSERT_EQ(std::numeric_limits<uint64_t>::max(), reducer.get_value()); reducer << 10 << 20; ASSERT_EQ(10ul, reducer.get_value()); reducer << 5; ASSERT_EQ(5ul, reducer.get_value()); reducer << std::numeric_limits<uint64_t>::max(); ASSERT_EQ(5ul, reducer.get_value()); reducer << 0; ASSERT_EQ(0ul, reducer.get_value()); bvar::Miner<int> reducer2; ASSERT_EQ(std::numeric_limits<int>::max(), reducer2.get_value()); reducer2 << 10 << 20; ASSERT_EQ(10, reducer2.get_value()); reducer2 << -5; ASSERT_EQ(-5, reducer2.get_value()); reducer2 << std::numeric_limits<int>::max(); ASSERT_EQ(-5, reducer2.get_value()); reducer2 << 0; ASSERT_EQ(-5, reducer2.get_value()); reducer2 << std::numeric_limits<int>::min(); ASSERT_EQ(std::numeric_limits<int>::min(), reducer2.get_value()); } TEST_F(ReducerTest, max) { bvar::Maxer<uint64_t> reducer; ASSERT_EQ(std::numeric_limits<uint64_t>::min(), reducer.get_value()); ASSERT_TRUE(reducer.valid()); reducer << 20 << 10; ASSERT_EQ(20ul, reducer.get_value()); reducer << 30; ASSERT_EQ(30ul, reducer.get_value()); reducer << 0; ASSERT_EQ(30ul, reducer.get_value()); bvar::Maxer<int> reducer2; ASSERT_EQ(std::numeric_limits<int>::min(), reducer2.get_value()); ASSERT_TRUE(reducer2.valid()); reducer2 << 20 << 10; ASSERT_EQ(20, reducer2.get_value()); reducer2 << 30; ASSERT_EQ(30, reducer2.get_value()); reducer2 << 0; ASSERT_EQ(30, reducer2.get_value()); reducer2 << std::numeric_limits<int>::max(); ASSERT_EQ(std::numeric_limits<int>::max(), reducer2.get_value()); } bvar::Adder<long> g_a; TEST_F(ReducerTest, global) { ASSERT_TRUE(g_a.valid()); g_a.get_value(); } void ReducerTest_window() { bvar::Adder<int> c1; bvar::Maxer<int> c2; bvar::Miner<int> c3; bvar::Window<bvar::Adder<int> > w1(&c1, 1); bvar::Window<bvar::Adder<int> > w2(&c1, 2); bvar::Window<bvar::Adder<int> > w3(&c1, 3); bvar::Window<bvar::Maxer<int> > w4(&c2, 1); bvar::Window<bvar::Maxer<int> > w5(&c2, 2); bvar::Window<bvar::Maxer<int> > w6(&c2, 3); bvar::Window<bvar::Miner<int> > w7(&c3, 1); bvar::Window<bvar::Miner<int> > w8(&c3, 2); bvar::Window<bvar::Miner<int> > w9(&c3, 3); #if !BRPC_WITH_GLOG logging::StringSink log_str; logging::LogSink* old_sink = logging::SetLogSink(&log_str); c2.get_value(); ASSERT_EQ(&log_str, logging::SetLogSink(old_sink)); ASSERT_NE(std::string::npos, log_str.find( "You should not call Reducer<int, bvar::detail::MaxTo<int>>" "::get_value() when a Window<> is used because the operator" " does not have inverse.")); #endif const int N = 6000; int count = 0; int total_count = 0; int64_t last_time = butil::gettimeofday_us(); for (int i = 1; i <= N; ++i) { c1 << 1; c2 << N - i; c3 << i; ++count; ++total_count; int64_t now = butil::gettimeofday_us(); if (now - last_time >= 1000000L) { last_time = now; ASSERT_EQ(total_count, c1.get_value()); LOG(INFO) << "c1=" << total_count << " count=" << count << " w1=" << w1 << " w2=" << w2 << " w3=" << w3 << " w4=" << w4 << " w5=" << w5 << " w6=" << w6 << " w7=" << w7 << " w8=" << w8 << " w9=" << w9; count = 0; } else { usleep(950); } } } TEST_F(ReducerTest, window) { #if !BRPC_WITH_GLOG ReducerTest_window(); logging::StringSink log_str; logging::LogSink* old_sink = logging::SetLogSink(&log_str); sleep(1); ASSERT_EQ(&log_str, logging::SetLogSink(old_sink)); if (log_str.find("Removed ") != std::string::npos) { ASSERT_NE(std::string::npos, log_str.find("Removed 3, sampled 0")) << log_str; } #endif } struct Foo { int x; Foo() : x(0) {} explicit Foo(int x2) : x(x2) {} void operator+=(const Foo& rhs) { x += rhs.x; } }; std::ostream& operator<<(std::ostream& os, const Foo& f) { return os << "Foo{" << f.x << "}"; } TEST_F(ReducerTest, non_primitive) { bvar::Adder<Foo> adder; adder << Foo(2) << Foo(3) << Foo(4); ASSERT_EQ(9, adder.get_value().x); } bool g_stop = false; struct StringAppenderResult { int count; }; static void* string_appender(void* arg) { bvar::Adder<std::string>* cater = (bvar::Adder<std::string>*)arg; int count = 0; std::string id = butil::string_printf("%lld", (long long)pthread_self()); std::string tmp = "a"; for (count = 0; !count || !g_stop; ++count) { *cater << id << ":"; for (char c = 'a'; c <= 'z'; ++c) { tmp[0] = c; *cater << tmp; } *cater << "."; } StringAppenderResult* res = new StringAppenderResult; res->count = count; LOG(INFO) << "Appended " << count; return res; } TEST_F(ReducerTest, non_primitive_mt) { bvar::Adder<std::string> cater; pthread_t th[8]; g_stop = false; for (size_t i = 0; i < arraysize(th); ++i) { pthread_create(&th[i], NULL, string_appender, &cater); } usleep(50000); g_stop = true; butil::hash_map<pthread_t, int> appended_count; for (size_t i = 0; i < arraysize(th); ++i) { StringAppenderResult* res = NULL; pthread_join(th[i], (void**)&res); appended_count[th[i]] = res->count; delete res; } butil::hash_map<pthread_t, int> got_count; std::string res = cater.get_value(); for (butil::StringSplitter sp(res.c_str(), '.'); sp; ++sp) { char* endptr = NULL; ++got_count[(pthread_t)strtoll(sp.field(), &endptr, 10)]; ASSERT_EQ(27LL, sp.field() + sp.length() - endptr) << butil::StringPiece(sp.field(), sp.length()); ASSERT_EQ(0, memcmp(":abcdefghijklmnopqrstuvwxyz", endptr, 27)); } ASSERT_EQ(appended_count.size(), got_count.size()); for (size_t i = 0; i < arraysize(th); ++i) { ASSERT_EQ(appended_count[th[i]], got_count[th[i]]); } } TEST_F(ReducerTest, simple_window) { bvar::Adder<int64_t> a; bvar::Window<bvar::Adder<int64_t> > w(&a, 10); a << 100; sleep(3); const int64_t v = w.get_value(); ASSERT_EQ(100, v) << "v=" << v; } } // namespace