Commit 43d2a990 authored by gejun's avatar gejun

Patch svn r35046 r35061 r35064 r35065

Change-Id: I6c7e7672d72097fac5f052fe3c3cd99638cdd510
parent 78630513
*
!*.*
*.o
*.a
*.pb.cc
*.pb.h
*.prof
/output
/test/output
include config.mk
CPPFLAGS=-D__const__= -D_GNU_SOURCE -DNDEBUG -DUSE_SYMBOLIZE -DNO_TCMALLOC -D__STDC_FORMAT_MACROS -D__STDC_LIMIT_MACROS -D__STDC_CONSTANT_MACROS -DBRPC_REVISION=\"$(shell git rev-parse --short HEAD)\"
CPPFLAGS=-DBTHREAD_USE_FAST_PTHREAD_MUTEX -D__const__= -D_GNU_SOURCE -DNDEBUG -DUSE_SYMBOLIZE -DNO_TCMALLOC -D__STDC_FORMAT_MACROS -D__STDC_LIMIT_MACROS -D__STDC_CONSTANT_MACROS -DBRPC_REVISION=\"$(shell git rev-parse --short HEAD)\"
CXXFLAGS=$(CPPFLAGS) -O2 -g -pipe -Wall -W -Werror -fPIC -fstrict-aliasing -Wno-invalid-offsetof -Wno-unused-parameter -std=c++0x -include brpc/config.h
CFLAGS=$(CPPFLAGS) -O2 -g -pipe -Wall -W -Werror -fPIC -fstrict-aliasing -Wno-unused-parameter
......
......@@ -80,9 +80,10 @@ public:
// When a thread needs memory, it allocates a Block. To improve locality,
// items in the Block are only used by the thread.
// To support cache-aligned objects, align Block.items by cacheline.
struct BAIDU_CACHELINE_ALIGNMENT Block {
size_t nitem;
char items[sizeof(T) * BLOCK_NITEM];
size_t nitem;
Block() : nitem(0) {}
};
......
......@@ -96,9 +96,10 @@ public:
// When a thread needs memory, it allocates a Block. To improve locality,
// items in the Block are only used by the thread.
// To support cache-aligned objects, align Block.items by cacheline.
struct BAIDU_CACHELINE_ALIGNMENT Block {
size_t nitem;
char items[sizeof(T) * BLOCK_NITEM];
size_t nitem;
Block() : nitem(0) {}
};
......
......@@ -15,8 +15,8 @@ static int cast_nprocessing(void* arg) {
}
MethodStatus::MethodStatus()
: _nprocessing_bvar(cast_nprocessing, &_nprocessing)
, _max_concurrency(0)
: _max_concurrency(0)
, _nprocessing_bvar(cast_nprocessing, &_nprocessing)
, _nprocessing(0) {
}
......
......@@ -48,10 +48,10 @@ friend class ScopedMethodStatus;
DISALLOW_COPY_AND_ASSIGN(MethodStatus);
void OnError();
bvar::LatencyRecorder _latency_rec;
int _max_concurrency;
bvar::Adder<int64_t> _nerror;
bvar::LatencyRecorder _latency_rec;
bvar::PassiveStatus<int> _nprocessing_bvar;
int _max_concurrency;
base::atomic<int> BAIDU_CACHELINE_ALIGNMENT _nprocessing;
};
......
......@@ -8,6 +8,8 @@
#define BRPC_SPARSE_MINUTE_COUNTER_H
#include "base/containers/bounded_queue.h"
namespace brpc {
// An utility to add up per-second value into per-minute value.
......
......@@ -6,9 +6,9 @@
#include <gflags/gflags.h> // DEFINE_int32
#include <sys/epoll.h> // epoll_create
#include "base/macros.h"
#include "base/fd_utility.h" // make_close_on_exec
#include "base/logging.h" // LOG
#include "base/third_party/murmurhash3/murmurhash3.h"// fmix32
#include "bthread/bthread.h" // bthread_start_background
#include "brpc/event_dispatcher.h"
#ifdef BRPC_SOCKET_HAS_EOF
......@@ -192,6 +192,7 @@ void EventDispatcher::Run() {
epoll_event e[32];
while (!_stop) {
#ifdef BRPC_ADDITIONAL_EPOLL
// Performance downgrades in examples.
int n = epoll_wait(_epfd, e, ARRAY_SIZE(e), 0);
if (n == 0) {
n = epoll_wait(_epfd, e, ARRAY_SIZE(e), -1);
......@@ -213,12 +214,6 @@ void EventDispatcher::Run() {
PLOG(FATAL) << "Fail to epoll_wait epfd=" << _epfd;
break;
}
for (int i = 0; i < n; ++i) {
if (e[i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) {
// We don't care about the return value.
Socket::HandleEpollOut(e[i].data.u64);
}
}
for (int i = 0; i < n; ++i) {
if (e[i].events & (EPOLLIN | EPOLLERR | EPOLLHUP)
#ifdef BRPC_SOCKET_HAS_EOF
......@@ -230,6 +225,12 @@ void EventDispatcher::Run() {
_consumer_thread_attr);
}
}
for (int i = 0; i < n; ++i) {
if (e[i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) {
// We don't care about the return value.
Socket::HandleEpollOut(e[i].data.u64);
}
}
}
}
......@@ -254,21 +255,12 @@ void InitializeGlobalDispatchers() {
CHECK_EQ(0, atexit(StopAndJoinGlobalDispatchers));
}
inline uint32_t fmix32 ( uint32_t h ) {
h ^= h >> 16;
h *= 0x85ebca6b;
h ^= h >> 13;
h *= 0xc2b2ae35;
h ^= h >> 16;
return h;
}
EventDispatcher& GetGlobalEventDispatcher(int fd) {
pthread_once(&g_edisp_once, InitializeGlobalDispatchers);
if (FLAGS_event_dispatcher_num == 1) {
return g_edisp[0];
}
int index = fmix32(fd) % FLAGS_event_dispatcher_num;
int index = base::fmix32(fd) % FLAGS_event_dispatcher_num;
return g_edisp[index];
}
......
......@@ -111,7 +111,7 @@ private:
};
// Get the global InputMessenger at client-side.
inline InputMessenger* get_client_side_messenger() {
BASE_FORCE_INLINE InputMessenger* get_client_side_messenger() {
extern InputMessenger* g_messenger;
return g_messenger;
}
......
......@@ -18,7 +18,7 @@ namespace policy {
// Process() to maximize usage of ObjectPool<MostCommonMessage>, otherwise
// you have to new the messages or use a separate ObjectPool (which is likely
// to waste more memory)
struct MostCommonMessage : public InputMessageBase {
struct BAIDU_CACHELINE_ALIGNMENT MostCommonMessage : public InputMessageBase {
base::IOBuf meta;
base::IOBuf payload;
PipelinedInfo pi;
......@@ -31,7 +31,7 @@ struct MostCommonMessage : public InputMessageBase {
void DestroyImpl() {
meta.clear();
payload.clear();
pi = PipelinedInfo();
pi.reset();
base::return_object(this);
}
};
......
......@@ -1435,20 +1435,14 @@ void Server::ClearServices() {
google::protobuf::Service* Server::FindServiceByFullName(
const base::StringPiece& full_name) const {
if (BAIDU_LIKELY(status() != UNINITIALIZED)) {
ServiceProperty* ss = _fullname_service_map.seek(full_name);
return (ss ? ss->service : NULL);
}
return NULL;
ServiceProperty* ss = _fullname_service_map.seek(full_name);
return (ss ? ss->service : NULL);
}
google::protobuf::Service* Server::FindServiceByName(
const base::StringPiece& name) const {
if (BAIDU_LIKELY(status() != UNINITIALIZED)) {
ServiceProperty* ss = _service_map.seek(name);
return (ss ? ss->service : NULL);
}
return NULL;
ServiceProperty* ss = _service_map.seek(name);
return (ss ? ss->service : NULL);
}
void Server::GetStat(ServerStatistics* stat) const {
......@@ -1639,6 +1633,11 @@ bool IsDummyServerRunning() {
return g_dummy_server != NULL;
}
const Server::MethodProperty*
Server::FindMethodPropertyByFullName(const base::StringPiece&fullname) const {
return _method_map.seek(fullname);
}
const Server::MethodProperty*
Server::FindMethodPropertyByFullName(const base::StringPiece& service_name/*full*/,
const base::StringPiece& method_name) const {
......@@ -1675,6 +1674,16 @@ Server::FindMethodPropertyByNameAndIndex(const base::StringPiece& service_name,
return FindMethodPropertyByFullName(method->full_name());
}
const Server::ServiceProperty*
Server::FindServicePropertyByFullName(const base::StringPiece& fullname) const {
return _fullname_service_map.seek(fullname);
}
const Server::ServiceProperty*
Server::FindServicePropertyByName(const base::StringPiece& name) const {
return _service_map.seek(name);
}
int Server::AddCertificate(const CertInfo& cert) {
std::string cert_key(cert.certificate);
cert_key.append(cert.private_key);
......
......@@ -700,21 +700,6 @@ friend class Controller;
mutable int32_t BAIDU_CACHELINE_ALIGNMENT _concurrency;
};
inline const Server::MethodProperty*
Server::FindMethodPropertyByFullName(const base::StringPiece&fullname) const {
return _method_map.seek(fullname);
}
inline const Server::ServiceProperty*
Server::FindServicePropertyByFullName(const base::StringPiece& fullname) const {
return _fullname_service_map.seek(fullname);
}
inline const Server::ServiceProperty*
Server::FindServicePropertyByName(const base::StringPiece& name) const {
return _service_map.seek(name);
}
// Get the data attached to current searching thread. The data is created by
// ServerOptions.thread_local_data_factory and reused between different threads.
// If ServerOptions.thread_local_data_factory is NULL, return NULL.
......
......@@ -1088,10 +1088,9 @@ int Socket::WaitEpollOut(int fd, bool pollin, const timespec* abstime) {
}
// Do not need to check addressable since it will be called by
// health checker which called `SetFailed' before
const int expected_val =
_epollout_butex->load(base::memory_order_relaxed);
EventDispatcher* const edisp = &GetGlobalEventDispatcher(fd);
if (edisp->AddEpollOut(id(), fd, pollin) != 0) {
const int expected_val = _epollout_butex->load(base::memory_order_relaxed);
EventDispatcher& edisp = GetGlobalEventDispatcher(fd);
if (edisp.AddEpollOut(id(), fd, pollin) != 0) {
return -1;
}
......@@ -1103,7 +1102,7 @@ int Socket::WaitEpollOut(int fd, bool pollin, const timespec* abstime) {
}
// Ignore return value since `fd' might have been removed
// by `RemoveConsumer' in `SetFailed'
edisp->RemoveEpollOut(id(), fd, pollin);
base::ignore_result(edisp.RemoveEpollOut(id(), fd, pollin));
errno = saved_errno;
// Could be writable or spurious wakeup (by former epollout)
return rc;
......@@ -1582,9 +1581,9 @@ void* Socket::KeepWrite(void* void_arg) {
}
// TODO(gejun): wait for epollout when we actually have written
// all the data. This weird heuristic reduces 30us delay...
// Update(12/22/2015):
// Not seem to work now. better switch to correct code.
// Update(12/22/2015): seem not working. better switch to correct code.
// Update(1/8/2016, r31823): Still working.
// Update(8/15/2017): Not working, performance downgraded.
//if (nw <= 0 || req->data.empty()/*note*/) {
if (nw <= 0) {
s_vars->nwaitepollout << 1;
......@@ -1840,7 +1839,7 @@ int Socket::StartInputEvent(SocketId id, uint32_t epoll_events,
// requires stronger memory fences, since reading the fd returns
// error as well, we don't pass the events.
if (s->_nevent.fetch_add(1, base::memory_order_acq_rel) == 0) {
// According to the stats, this fetch_add is very effective. In a
// According to the stats, above fetch_add is very effective. In a
// server processing 1 million requests per second, this counter
// is just 1500~1700/s
s_vars->neventthread << 1;
......@@ -1860,7 +1859,7 @@ int Socket::StartInputEvent(SocketId id, uint32_t epoll_events,
}
void DereferenceSocket(Socket* s) {
if (BAIDU_LIKELY(s != NULL)) {
if (s) {
s->Dereference();
}
}
......
......@@ -113,7 +113,11 @@ struct SocketStat {
};
struct PipelinedInfo {
PipelinedInfo() : count(0), id_wait(INVALID_BTHREAD_ID) {}
PipelinedInfo() { reset(); }
void reset() {
count = 0;
id_wait = INVALID_BTHREAD_ID;
}
uint32_t count;
bthread_id_t id_wait;
};
......
......@@ -13,29 +13,30 @@
namespace brpc {
// Utility functions to combine and extract SocketId.
inline SocketId MakeSocketId(uint32_t version, base::ResourceId<Socket> slot) {
BASE_FORCE_INLINE SocketId
MakeSocketId(uint32_t version, base::ResourceId<Socket> slot) {
return SocketId((((uint64_t)version) << 32) | slot.value);
}
inline base::ResourceId<Socket> SlotOfSocketId(SocketId sid) {
BASE_FORCE_INLINE base::ResourceId<Socket> SlotOfSocketId(SocketId sid) {
base::ResourceId<Socket> id = { (sid & 0xFFFFFFFFul) };
return id;
}
inline uint32_t VersionOfSocketId(SocketId sid) {
BASE_FORCE_INLINE uint32_t VersionOfSocketId(SocketId sid) {
return (uint32_t)(sid >> 32);
}
// Utility functions to combine and extract Socket::_versioned_ref
inline uint32_t VersionOfVRef(uint64_t vref) {
BASE_FORCE_INLINE uint32_t VersionOfVRef(uint64_t vref) {
return (uint32_t)(vref >> 32);
}
inline int32_t NRefOfVRef(uint64_t vref) {
BASE_FORCE_INLINE int32_t NRefOfVRef(uint64_t vref) {
return (int32_t)(vref & 0xFFFFFFFFul);
}
inline uint64_t MakeVRef(uint32_t version, int32_t nref) {
BASE_FORCE_INLINE uint64_t MakeVRef(uint32_t version, int32_t nref) {
// 1: Intended conversion to uint32_t, nref=-1 is 00000000FFFFFFFF
return (((uint64_t)version) << 32) | (uint32_t/*1*/)nref;
}
......
......@@ -20,7 +20,7 @@
namespace brpc {
#ifdef BAIDU_INTERNAL
DEFINE_string(trackme_server, "http://brpc.baidu.com:8765",
DEFINE_string(trackme_server, "http://brpc.baidu.com:8877",
"Where the TrackMe requests are sent to");
#else
DEFINE_string(trackme_server, "", "Where the TrackMe requests are sent to");
......
......@@ -67,24 +67,13 @@ inline TaskControl* get_or_new_task_control() {
return c;
}
inline TaskGroup* get_task_group() {
TaskGroup* g = tls_task_group;
if (g) {
return g;
}
TaskControl* c = get_or_new_task_control();
if (c) {
return c->choose_one_group();
}
return NULL;
}
__thread TaskGroup* tls_task_group_nosignal = NULL;
inline int start_from_non_worker(bthread_t* __restrict tid,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
BASE_FORCE_INLINE int
start_from_non_worker(bthread_t* __restrict tid,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
TaskControl* c = get_or_new_task_control();
if (NULL == c) {
return ENOMEM;
......@@ -99,9 +88,10 @@ inline int start_from_non_worker(bthread_t* __restrict tid,
g = c->choose_one_group();
tls_task_group_nosignal = g;
}
return g->start_background(tid, attr, fn, arg);
return g->start_background<true>(tid, attr, fn, arg);
}
return c->choose_one_group()->start_background(tid, attr, fn, arg);
return c->choose_one_group()->start_background<true>(
tid, attr, fn, arg);
}
int stop_butex_wait(bthread_t tid);
......@@ -149,7 +139,7 @@ int bthread_start_background(bthread_t* __restrict tid,
bthread::TaskGroup* g = bthread::tls_task_group;
if (g) {
// start from worker
return g->start_background(tid, attr, fn, arg);
return g->start_background<false>(tid, attr, fn, arg);
}
return bthread::start_from_non_worker(tid, attr, fn, arg);
}
......@@ -157,14 +147,13 @@ int bthread_start_background(bthread_t* __restrict tid,
void bthread_flush() __THROW {
bthread::TaskGroup* g = bthread::tls_task_group;
if (g) {
g->flush_nosignal_tasks();
return;
return g->flush_nosignal_tasks();
}
g = bthread::tls_task_group_nosignal;
if (g) {
// NOSIGNAL tasks were created in this non-worker.
bthread::tls_task_group_nosignal = NULL;
g->flush_nosignal_tasks();
return g->flush_nosignal_tasks_remote();
}
}
......@@ -172,11 +161,15 @@ int bthread_stop(bthread_t tid) __THROW {
if (bthread::stop_butex_wait(tid) < 0) {
return errno;
}
bthread::TaskGroup* g = bthread::get_task_group();
if (g) {
return g->stop_usleep(tid);
bthread::TaskGroup* g = bthread::tls_task_group;
if (!g) {
bthread::TaskControl* c = bthread::get_or_new_task_control();
if (!c) {
return ENOMEM;
}
g = c->choose_one_group();
}
return EAGAIN;
return g->stop_usleep(tid);
}
int bthread_stopped(bthread_t tid) __THROW {
......
This diff is collapsed.
......@@ -54,7 +54,7 @@ int butex_requeue(void* butex1, void* butex2);
// Different from FUTEX_WAIT, butex_wait uses absolute time.
int butex_wait(void* butex, int expected_value, const timespec* abstime);
// Same with butex_wait except that this function can not be woken up by
// Same with butex_wait except that this function cannot be woken up by
// bthread_stop(), although this function still returns -1(ESTOP) after
// wake-up.
int butex_wait_uninterruptible(void* butex, int expected_value,
......@@ -63,4 +63,3 @@ int butex_wait_uninterruptible(void* butex, int expected_value,
} // namespace bthread
#endif // BAIDU_BTHREAD_BUTEX_H
// bthread - A M:N threading library to make applications more concurrent.
// Copyright (c) 2014 Baidu.com, Inc. All Rights Reserved
// Author: Ge,Jun (gejun@baidu.com)
// Date: Mon Sep 15 10:51:15 CST 2014
#ifndef PUBLIC_BTHREAD_BTHREAD_COMLOG_INITIALIZER_H
#define PUBLIC_BTHREAD_BTHREAD_COMLOG_INITIALIZER_H
#include <com_log.h> // com_openlog_r, com_closelog_r
#include <base/macros.h>
namespace bthread {
class ComlogInitializer {
public:
ComlogInitializer() {
if (com_logstatus() != LOG_NOT_DEFINED) {
com_openlog_r();
}
}
~ComlogInitializer() {
if (com_logstatus() != LOG_NOT_DEFINED) {
com_closelog_r();
}
}
private:
DISALLOW_COPY_AND_ASSIGN(ComlogInitializer);
};
}
#endif // PUBLIC_BTHREAD_BTHREAD_COMLOG_INITIALIZER_H
......@@ -45,20 +45,26 @@ int bthread_cond_destroy(bthread_cond_t* c) {
int bthread_cond_signal(bthread_cond_t* c) {
bthread::CondInternal* ic = reinterpret_cast<bthread::CondInternal*>(c);
ic->seq->fetch_add(1, base::memory_order_release);
bthread::butex_wake(ic->seq);
// ic is probably dereferenced after fetch_add, save required fields before
// this point
base::atomic<int>* const saved_seq = ic->seq;
saved_seq->fetch_add(1, base::memory_order_release);
// don't touch ic any more
bthread::butex_wake(saved_seq);
return 0;
}
int bthread_cond_broadcast(bthread_cond_t* c) {
bthread::CondInternal* ic = reinterpret_cast<bthread::CondInternal*>(c);
bthread_mutex_t* m = ic->m.load(base::memory_order_relaxed);
base::atomic<int>* const saved_seq = ic->seq;
if (!m) {
return 0;
}
void* const saved_butex = m->butex;
// Wakeup one thread and requeue the rest on the mutex.
ic->seq->fetch_add(1, base::memory_order_release);
bthread::butex_requeue(ic->seq, m->butex);
bthread::butex_requeue(saved_seq, saved_butex);
return 0;
}
......
......@@ -25,13 +25,17 @@ CountdownEvent::~CountdownEvent() {
}
void CountdownEvent::signal(int sig) {
// Have to save _butex, *this is probably defreferenced by the wait thread
// which sees fetch_sub
void* const saved_butex = _butex;
const int prev = ((base::atomic<int>*)_butex)
->fetch_sub(sig, base::memory_order_release);
// DON'T touch *this ever after
if (prev > sig) {
return;
}
LOG_IF(ERROR, prev < sig) << "Counter is over decreased";
butex_wake_all(_butex);
butex_wake_all(saved_butex);
}
void CountdownEvent::wait() {
......
......@@ -7,6 +7,7 @@
#include <deque>
#include "base/logging.h"
#include "bthread/butex.h" // butex_*
#include "bthread/mutex.h"
#include "bthread/list_of_abafree_id.h"
#include "base/resource_pool.h"
#include "bthread/bthread.h"
......@@ -98,7 +99,7 @@ struct BAIDU_CACHELINE_ALIGNMENT Id {
// contended_ver: locked and contended
uint32_t first_ver;
uint32_t locked_ver;
base::Mutex mutex;
internal::FastPthreadMutex mutex;
void* data;
int (*on_error)(bthread_id_t, void*, int);
int (*on_error2)(bthread_id_t, void*, int, const std::string&);
......@@ -178,7 +179,7 @@ static int default_bthread_id_on_error2(
void id_status(bthread_id_t id, std::ostream &os) {
bthread::Id* const meta = address_resource(bthread::get_slot(id));
if (__builtin_expect(meta == NULL, 0)) {
if (!meta) {
os << "Invalid id=" << id.value << '\n';
return;
}
......@@ -311,7 +312,7 @@ static int id_create_impl(
int (*on_error2)(bthread_id_t, void*, int, const std::string&)) __THROW {
IdResourceId slot;
Id* const meta = get_resource(&slot);
if (__builtin_expect(meta != NULL, 1)) {
if (meta) {
meta->data = data;
meta->on_error = on_error;
meta->on_error2 = on_error2;
......@@ -336,7 +337,7 @@ static int id_create_ranged_impl(
int (*on_error)(bthread_id_t, void*, int),
int (*on_error2)(bthread_id_t, void*, int, const std::string&),
int range) __THROW {
if (__builtin_expect(range < 1 || range > ID_MAX_RANGE, 0)) {
if (range < 1 || range > ID_MAX_RANGE) {
LOG_IF(FATAL, range < 1) << "range must be positive, actually " << range;
LOG_IF(FATAL, range > ID_MAX_RANGE ) << "max of range is "
<< ID_MAX_RANGE << ", actually " << range;
......@@ -344,7 +345,7 @@ static int id_create_ranged_impl(
}
IdResourceId slot;
Id* const meta = get_resource(&slot);
if (__builtin_expect(meta != NULL, 1)) {
if (meta) {
meta->data = data;
meta->on_error = on_error;
meta->on_error2 = on_error2;
......@@ -399,7 +400,7 @@ int bthread_id_create_ranged(bthread_id_t* id, void* data,
int bthread_id_lock_and_reset_range_verbose(
bthread_id_t id, void **pdata, int range, const char *location) __THROW {
bthread::Id* const meta = address_resource(bthread::get_slot(id));
if (__builtin_expect(meta == NULL, 0)) {
if (!meta) {
return EINVAL;
}
const uint32_t id_ver = bthread::get_version(id);
......@@ -460,23 +461,24 @@ int bthread_id_error_verbose(bthread_id_t id, int error_code,
int bthread_id_about_to_destroy(bthread_id_t id) __THROW {
bthread::Id* const meta = address_resource(bthread::get_slot(id));
if (__builtin_expect(meta == NULL, 0)) {
if (!meta) {
return EINVAL;
}
const uint32_t id_ver = bthread::get_version(id);
uint32_t* butex = meta->butex;
std::unique_lock<base::Mutex> mu(meta->mutex);
meta->mutex.lock();
if (!meta->has_version(id_ver)) {
meta->mutex.unlock();
return EINVAL;
}
if (*butex == meta->first_ver) {
mu.unlock();
meta->mutex.unlock();
LOG(FATAL) << "bthread_id=" << id.value << " is not locked!";
return EPERM;
}
const bool contended = (*butex == meta->contended_ver());
*butex = meta->unlockable_ver();
mu.unlock();
meta->mutex.unlock();
if (contended) {
// wake up all waiting lockers.
bthread::butex_wake_except(butex, 0);
......@@ -486,22 +488,24 @@ int bthread_id_about_to_destroy(bthread_id_t id) __THROW {
int bthread_id_cancel(bthread_id_t id) __THROW {
bthread::Id* const meta = address_resource(bthread::get_slot(id));
if (__builtin_expect(meta == NULL, 0)) {
if (!meta) {
return EINVAL;
}
uint32_t* butex = meta->butex;
const uint32_t id_ver = bthread::get_version(id);
std::unique_lock<base::Mutex> mu(meta->mutex);
meta->mutex.lock();
if (!meta->has_version(id_ver)) {
meta->mutex.unlock();
return EINVAL;
}
if (*butex != meta->first_ver) {
meta->mutex.unlock();
return EPERM;
}
*butex = meta->end_ver();
meta->first_ver = *butex;
meta->locked_ver = *butex;
mu.unlock();
meta->mutex.unlock();
return_resource(bthread::get_slot(id));
return 0;
}
......@@ -509,7 +513,7 @@ int bthread_id_cancel(bthread_id_t id) __THROW {
int bthread_id_join(bthread_id_t id) __THROW {
const bthread::IdResourceId slot = bthread::get_slot(id);
bthread::Id* const meta = address_resource(slot);
if (__builtin_expect(meta == NULL, 0)) {
if (!meta) {
return EINVAL;
}
const uint32_t id_ver = bthread::get_version(id);
......@@ -538,20 +542,22 @@ int bthread_id_join(bthread_id_t id) __THROW {
int bthread_id_trylock(bthread_id_t id, void** pdata) __THROW {
bthread::Id* const meta = address_resource(bthread::get_slot(id));
if (__builtin_expect(meta == NULL, 0)) {
if (!meta) {
return EINVAL;
}
uint32_t* butex = meta->butex;
const uint32_t id_ver = bthread::get_version(id);
std::unique_lock<base::Mutex> mu(meta->mutex);
meta->mutex.lock();
if (!meta->has_version(id_ver)) {
meta->mutex.unlock();
return EINVAL;
}
if (*butex != meta->first_ver) {
meta->mutex.unlock();
return EBUSY;
}
*butex = meta->locked_ver;
mu.unlock();
meta->mutex.unlock();
if (pdata != NULL) {
*pdata = meta->data;
}
......@@ -565,28 +571,28 @@ int bthread_id_lock_verbose(bthread_id_t id, void** pdata,
int bthread_id_unlock(bthread_id_t id) __THROW {
bthread::Id* const meta = address_resource(bthread::get_slot(id));
if (__builtin_expect(meta == NULL, 0)) {
if (!meta) {
return EINVAL;
}
uint32_t* butex = meta->butex;
// Release fence makes sure all changes made before signal visible to
// woken-up waiters.
const uint32_t id_ver = bthread::get_version(id);
std::unique_lock<base::Mutex> mu(meta->mutex);
meta->mutex.lock();
if (!meta->has_version(id_ver)) {
mu.unlock();
meta->mutex.unlock();
LOG(FATAL) << "Invalid bthread_id=" << id.value;
return EINVAL;
}
if (*butex == meta->first_ver) {
mu.unlock();
meta->mutex.unlock();
LOG(FATAL) << "bthread_id=" << id.value << " is not locked!";
return EPERM;
}
bthread::PendingError front;
if (meta->pending_q.pop(&front)) {
meta->lock_location = front.location;
mu.unlock();
meta->mutex.unlock();
if (meta->on_error) {
return meta->on_error(front.id, meta->data, front.error_code);
} else {
......@@ -596,7 +602,7 @@ int bthread_id_unlock(bthread_id_t id) __THROW {
} else {
const bool contended = (*butex == meta->contended_ver());
*butex = meta->first_ver;
mu.unlock();
meta->mutex.unlock();
if (contended) {
// We may wake up already-reused id, but that's OK.
bthread::butex_wake(butex);
......@@ -607,29 +613,30 @@ int bthread_id_unlock(bthread_id_t id) __THROW {
int bthread_id_unlock_and_destroy(bthread_id_t id) __THROW {
bthread::Id* const meta = address_resource(bthread::get_slot(id));
if (__builtin_expect(meta == NULL, 0)) {
if (!meta) {
return EINVAL;
}
uint32_t* butex = meta->butex;
uint32_t* join_butex = meta->join_butex;
const uint32_t id_ver = bthread::get_version(id);
std::unique_lock<base::Mutex> mu(meta->mutex);
meta->mutex.lock();
if (!meta->has_version(id_ver)) {
mu.unlock();
meta->mutex.unlock();
LOG(FATAL) << "Invalid bthread_id=" << id.value;
return EINVAL;
}
if (*butex == meta->first_ver) {
mu.unlock();
meta->mutex.unlock();
LOG(FATAL) << "bthread_id=" << id.value << " is not locked!";
return EPERM;
}
*butex = meta->end_ver();
*join_butex = meta->end_ver();
meta->first_ver = *butex;
meta->locked_ver = *butex;
const uint32_t next_ver = meta->end_ver();
*butex = next_ver;
*join_butex = next_ver;
meta->first_ver = next_ver;
meta->locked_ver = next_ver;
meta->pending_q.clear();
mu.unlock();
meta->mutex.unlock();
// Notice that butex_wake* returns # of woken-up, not successful or not.
bthread::butex_wake_except(butex, 0);
bthread::butex_wake_all(join_butex);
......@@ -708,19 +715,20 @@ int bthread_id_error2_verbose(bthread_id_t id, int error_code,
const std::string& error_text,
const char *location) __THROW {
bthread::Id* const meta = address_resource(bthread::get_slot(id));
if (__builtin_expect(meta == NULL, 0)) {
if (!meta) {
return EINVAL;
}
const uint32_t id_ver = bthread::get_version(id);
uint32_t* butex = meta->butex;
std::unique_lock<base::Mutex> mu(meta->mutex);
meta->mutex.lock();
if (!meta->has_version(id_ver)) {
meta->mutex.unlock();
return EINVAL;
}
if (*butex == meta->first_ver) {
*butex = meta->locked_ver;
meta->lock_location = location;
mu.unlock();
meta->mutex.unlock();
if (meta->on_error) {
return meta->on_error(id, meta->data, error_code);
} else {
......@@ -733,6 +741,7 @@ int bthread_id_error2_verbose(bthread_id_t id, int error_code,
e.error_text = error_text;
e.location = location;
meta->pending_q.push(e);
meta->mutex.unlock();
return 0;
}
}
......
......@@ -4,11 +4,13 @@
// Author: Ge,Jun (gejun@baidu.com)
// Date: Mon Sep 15 10:51:15 CST 2014
#ifndef BAIDU_BTHREAD_CONFIG_H
#define BAIDU_BTHREAD_CONFIG_H
#ifndef BAIDU_BTHREAD_LOG_H
#define BAIDU_BTHREAD_LOG_H
#include "base/thread_local.h" // thread_local, thread_atexit
#ifdef BAIDU_INTERNAL
#include "bthread/comlog_initializer.h"
#endif
#define BT_VLOG VLOG(100)
#endif
#endif // BAIDU_BTHREAD_LOG_H
......@@ -8,7 +8,6 @@
#include <execinfo.h>
#include <dlfcn.h> // dlsym
#include <fcntl.h> // O_RDONLY
#include "bthread/config.h"
#include "base/atomicops.h"
#include "bvar/bvar.h"
#include "bvar/collector.h"
......@@ -27,6 +26,7 @@
#include "bthread/processor.h" // cpu_relax, barrier
#include "bthread/mutex.h" // bthread_mutex_t
#include "bthread/sys_futex.h"
#include "bthread/log.h"
extern "C" {
extern void* _dl_sym(void* handle, const char* symbol, void* caller);
......@@ -630,6 +630,44 @@ inline int mutex_timedlock_contended(
return 0;
}
#ifdef BTHREAD_USE_FAST_PTHREAD_MUTEX
namespace internal {
int FastPthreadMutex::lock_contended() {
base::atomic<unsigned>* whole = (base::atomic<unsigned>*)&_futex;
while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) {
if (futex_wait_private(whole, BTHREAD_MUTEX_CONTENDED, NULL) < 0
&& errno != EWOULDBLOCK) {
return errno;
}
}
return 0;
}
void FastPthreadMutex::lock() {
bthread::MutexInternal* split = (bthread::MutexInternal*)&_futex;
if (split->locked.exchange(1, base::memory_order_acquire)) {
(void)lock_contended();
}
}
bool FastPthreadMutex::try_lock() {
bthread::MutexInternal* split = (bthread::MutexInternal*)&_futex;
return !split->locked.exchange(1, base::memory_order_acquire);
}
void FastPthreadMutex::unlock() {
base::atomic<unsigned>* whole = (base::atomic<unsigned>*)&_futex;
const unsigned prev = whole->exchange(0, base::memory_order_release);
// CAUTION: the mutex may be destroyed, check comments before butex_create
if (prev != BTHREAD_MUTEX_LOCKED) {
futex_wake_private(whole, 1);
}
}
} // namespace internal
#endif // BTHREAD_USE_FAST_PTHREAD_MUTEX
} // namespace bthread
extern "C" {
......
......@@ -42,6 +42,25 @@ private:
bthread_mutex_t _mutex;
};
namespace internal {
#ifdef BTHREAD_USE_FAST_PTHREAD_MUTEX
class FastPthreadMutex {
public:
FastPthreadMutex() : _futex(0) {}
~FastPthreadMutex() {}
void lock();
void unlock();
bool try_lock();
private:
DISALLOW_COPY_AND_ASSIGN(FastPthreadMutex);
int lock_contended();
unsigned _futex;
};
#else
typedef base::Mutex FastPthreadMutex;
#endif
}
} // namespace bthread
// Specialize std::lock_guard and std::unique_lock for bthread_mutex_t
......
......@@ -4,10 +4,10 @@
// Author: Ge,Jun (gejun@baidu.com)
// Date: Tue Jul 10 17:40:58 CST 2012
#include "bthread/config.h"
#include "base/scoped_lock.h" // BAIDU_SCOPED_LOCK
#include "base/errno.h" // berror
#include "base/logging.h"
#include "base/third_party/murmurhash3/murmurhash3.h"
#include "bthread/sys_futex.h" // futex_wake_private
#include "bthread/interrupt_pthread.h"
#include "bthread/processor.h" // cpu_relax
......@@ -15,9 +15,7 @@
#include "bthread/task_control.h"
#include "bthread/timer_thread.h" // global_timer_thread
#include <gflags/gflags.h>
#ifdef BAIDU_INTERNAL
#include "base/comlog_sink.h"
#endif
#include "bthread/log.h"
DEFINE_int32(task_group_delete_delay, 1,
"delay deletion of TaskGroup for so many seconds");
......@@ -121,7 +119,6 @@ TaskControl::TaskControl()
, _signal_per_second(&_cumulated_signal_count)
, _status(print_rq_sizes_in_the_tc, this)
, _nbthreads("bthread_count")
, _pending_signal(0)
{
// calloc shall set memory to zero
CHECK(_groups) << "Fail to create array of groups";
......@@ -216,8 +213,9 @@ void TaskControl::stop_and_join() {
_stop = true;
_ngroup.exchange(0, base::memory_order_relaxed);
}
_pending_signal.fetch_or(1, base::memory_order_relaxed);
futex_wake_private(&_pending_signal, 10000);
for (int i = 0; i < PARKING_LOT_NUM; ++i) {
_pl[i].stop();
}
// Interrupt blocking operations.
for (size_t i = 0; i < _workers.size(); ++i) {
interrupt_pthread(_workers[i]);
......@@ -329,9 +327,15 @@ bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
for (size_t i = 0; i < ngroup; ++i, s += offset) {
TaskGroup* g = _groups[s % ngroup];
// g is possibly NULL because of concurrent _destroy_group
if (g != NULL && g->_rq.steal(tid)) {
stolen = true;
break;
if (g) {
if (g->_rq.steal(tid)) {
stolen = true;
break;
}
if (g->_remote_rq.pop(tid)) {
stolen = true;
break;
}
}
}
*seed = s;
......@@ -342,23 +346,23 @@ void TaskControl::signal_task(int num_task) {
if (num_task <= 0) {
return;
}
_pending_signal.fetch_add((num_task << 1), base::memory_order_release);
// Update(2016/9/12): user may block the launched bthread indefinitely by
// calling pthread-blocking functions, especially when -usercode_in_pthread
// is on, we can no longer save the additional signaling.
futex_wake_private(&_pending_signal, num_task/*note*/);
}
int TaskControl::wait_task_once(bthread_t* tid, size_t* seed, size_t offset) {
const int nsig = _pending_signal.load(base::memory_order_acquire);
if (nsig & 1) { // stopped
return -1;
}
if (steal_task(tid, seed, offset)) {
return 0;
// TODO(gejun): Current algorithm does not guarantee enough threads will
// be created to match caller's requests. But in another side, there's also
// many useless signalings according to current impl. Capping the concurrency
// is a good balance between performance and timeliness of scheduling.
if (num_task > 2) {
num_task = 2;
}
int start_index = base::fmix32(pthread_self()) % PARKING_LOT_NUM;
num_task -= _pl[start_index].signal(1);
if (num_task > 0) {
for (int i = 1; i < PARKING_LOT_NUM && num_task > 0; ++i) {
if (++start_index >= PARKING_LOT_NUM) {
start_index = 0;
}
num_task -= _pl[start_index].signal(1);
}
}
futex_wait_private(&_pending_signal, nsig, NULL);
return 1;
}
void TaskControl::print_rq_sizes(std::ostream& os) {
......@@ -406,8 +410,9 @@ int64_t TaskControl::get_cumulated_signal_count() {
BAIDU_SCOPED_LOCK(_modify_group_mutex);
const size_t ngroup = _ngroup.load(base::memory_order_relaxed);
for (size_t i = 0; i < ngroup; ++i) {
if (_groups[i]) {
c += _groups[i]->_nsignaled;
TaskGroup* g = _groups[i];
if (g) {
c += g->_nsignaled + g->_remote_nsignaled;
}
}
return c;
......
......@@ -16,6 +16,7 @@
#include "bthread/task_meta.h" // TaskMeta
#include "base/resource_pool.h" // ResourcePool
#include "bthread/work_stealing_queue.h" // WorkStealingQueue
#include "bthread/parking_lot.h"
namespace bthread {
......@@ -41,9 +42,6 @@ public:
// Tell other groups that `n' tasks was just added to caller's runqueue
void signal_task(int num_task);
// Suspend caller pthread until a task is stolen.
int wait_task_once(bthread_t* tid, size_t* seed, size_t offset);
// Stop and join worker threads in TaskControl.
void stop_and_join();
......@@ -98,8 +96,8 @@ private:
bvar::PassiveStatus<std::string> _status;
bvar::Adder<int64_t> _nbthreads;
// higher 31 bits count futex, lowest bit stands for stopping.
base::atomic<int> BAIDU_CACHELINE_ALIGNMENT _pending_signal;
static const int PARKING_LOT_NUM = 4;
ParkingLot _pl[PARKING_LOT_NUM];
};
inline bvar::LatencyRecorder& TaskControl::exposed_pending_time() {
......
This diff is collapsed.
......@@ -8,12 +8,16 @@
#define BAIDU_BTHREAD_TASK_GROUP_H
#include "base/time.h" // cpuwide_time_ns
#include "bthread/task_control.h"
#include "bthread/task_meta.h" // bthread_t, TaskMeta
#include "bthread/work_stealing_queue.h" // WorkStealingQueue
#include "bthread/remote_task_queue.h" // RemoteTaskQueue
#include "base/resource_pool.h" // ResourceId
#include "bthread/parking_lot.h"
namespace bthread {
// For exiting a bthread.
class ExitException : public std::exception {
public:
explicit ExitException(void* value) : _value(value) {}
......@@ -28,20 +32,12 @@ private:
void* _value;
};
class TaskControl;
// Thread-local group of tasks.
// Notice that most methods involving context switching are static otherwise
// pointer `this' may change after wakeup. The **pg parameters in following
// function are updated before returning.
class TaskGroup {
friend class TaskControl;
public:
static const int STOP_INDEX = 1000000;
static const int NOHINT_INDEX = -1;
static const int IDLE_INDEX = -2;
// Create task `fn(arg)' with attributes `attr' in TaskGroup *pg and put
// the identifier into `tid'. Switch to the new task and schedule old task
// to run.
......@@ -54,7 +50,10 @@ public:
// Create task `fn(arg)' with attributes `attr' in this TaskGroup, put the
// identifier into `tid'. Schedule the new thread to run.
// Called from worker: start_background<false>
// Called from non-worker: start_background<true>
// Return 0 on success, errno otherwise.
template <bool REMOTE>
int start_background(bthread_t* __restrict tid,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
......@@ -69,19 +68,27 @@ public:
// runqueue and then calling sched(pg), which has similar effect but
// slower.
static void sched_to(TaskGroup** pg, TaskMeta* next_meta);
inline static void sched_to(TaskGroup** pg, bthread_t next_tid);
static void sched_to(TaskGroup** pg, bthread_t next_tid);
static void exchange(TaskGroup** pg, bthread_t next_tid);
// The callback will be run in the beginning of next-run bthread.
// Can't be called by current bthread directly because it often needs
// the target to be suspended already.
void set_remained(void (*cb)(void*), void* arg) {
_last_context_remained = cb;
_last_context_remained_arg = arg;
}
inline static void exchange(TaskGroup** pg, bthread_t next_tid);
inline void set_remained(void (*last_context_remained)(void*), void* arg);
// Suspend caller for at least |timeout_us| microseconds.
// If |timeout_us| is 0, this function does nothing.
// If |group| is NULL or current thread is non-bthread, call usleep(3)
// instead. This function does not create thread-local TaskGroup.
// Returns: 0 when successful, -1 otherwise and errno is set.
// Returns: 0 on success, -1 otherwise and errno is set.
static int usleep(TaskGroup** pg, uint64_t timeout_us);
// Suspend caller and run another bthread. When the caller will resume
// is undefined.
// Returns 0 on success, -1 otherwise and errno is set.
static int yield(TaskGroup** pg);
// Suspend caller until bthread `tid' terminates.
......@@ -95,50 +102,72 @@ public:
// }
static bool exists(bthread_t tid);
// Put attribute associated with `tid' into `*attr'.
// Returns 0 on success, -1 otherwise and errno is set.
static int get_attr(bthread_t tid, bthread_attr_t* attr);
// Returns non-zero the `tid' is stopped, 0 otherwise.
static int stopped(bthread_t tid);
// Identifier/statistics of the bthread associated with
// TaskControl::worker_thread
// The bthread running run_main_task();
bthread_t main_tid() const { return _main_tid; }
TaskStatistics main_stat() const;
// Routine of the main task which should be called from a dedicated pthread.
void run_main_task();
// Meta/Identifier of current bthread in this group.
// Meta/Identifier of current task in this group.
TaskMeta* current_task() const { return _cur_meta; }
bthread_t current_tid() const { return _cur_meta->tid; }
// Uptime of current task in nanoseconds.
int64_t current_uptime_ns() const
{ return base::cpuwide_time_ns() - _cur_meta->cpuwide_start_ns; }
inline bool is_current_main_task() const;
inline bool is_current_pthread_task() const;
// True iff current task is the one running run_main_task()
bool is_current_main_task() const { return current_tid() == _main_tid; }
// True iff current task is in pthread-mode.
bool is_current_pthread_task() const
{ return _cur_meta->stack_container == _main_stack_container; }
inline int64_t current_uptime_ns() const;
inline int64_t cumulated_cputime_ns() const { return _cumulated_cputime_ns; }
// Active time in nanoseconds spent by this TaskGroup.
int64_t cumulated_cputime_ns() const { return _cumulated_cputime_ns; }
// Push a bthread into the runqueue
void ready_to_run(bthread_t tid);
void ready_to_run(bthread_t tid, bool nosignal);
void ready_to_run_nosignal(bthread_t tid);
// Flush tasks pushed to rq but signalled.
void flush_nosignal_tasks();
// Flush tasks in rq but signalled.
// Returns #tasks flushed.
int flush_nosignal_tasks();
// Push a bthread into the runqueue from another non-worker thread.
void ready_to_run_remote(bthread_t tid, bool nosignal = false);
void flush_nosignal_tasks_remote_locked(base::Mutex& locked_mutex);
void flush_nosignal_tasks_remote();
// Automatically decide the caller is remote or local, and call
// the corresponding function.
void ready_to_run_general(bthread_t tid, bool nosignal = false);
void flush_nosignal_tasks_general();
// The TaskControl that this TaskGroup belongs to.
TaskControl* control() const { return _control; }
// Call this instead of delete.
void destroy_self();
// Wake up `tid' if it's sleeping.
// Returns 0 on success, error code otherwise.
int stop_usleep(bthread_t tid);
inline static TaskMeta* address_meta(bthread_t tid);
// Get the meta associate with the task.
static TaskMeta* address_meta(bthread_t tid);
// The pthread where this TaskGroup is constructed. With current
// implementation, the pthread is also the worker pthread or the user
// pthread launching bthreads. butex_wait() uses this fact to get
// pthread of TaskGroup without calling pthread_self() repeatly.
pthread_t creation_pthread() const { return _creation_pthread; }
// Push a task into _rq, if _rq is full, retry after some time. This
// process make go on indefinitely.
void push_rq(bthread_t tid);
private:
friend class TaskControl;
// You shall use TaskControl::create_group to create new instance.
explicit TaskGroup(TaskControl*);
......@@ -150,14 +179,27 @@ private:
static void task_runner(intptr_t skip_remained);
// Callbacks for set_remained()
static void _release_last_context(void*);
static void _add_sleep_event(void* arg);
static void _add_sleep_event(void*);
static void ready_to_run_in_worker(void*);
static void ready_to_run_in_worker_nosignal(void*);
bool wait_task(bthread_t* tid, size_t* seed, size_t offset);
static void ready_to_run_in_worker_ignoresignal(void*);
// Wait for a task to run.
// Returns true on success, false is treated as permanent error and the
// loop calling this function should end.
bool wait_task(bthread_t* tid);
bool steal_task(bthread_t* tid) {
if (_remote_rq.pop(tid)) {
return true;
}
#ifdef BTHREAD_SAVE_PARKING_STATE
_last_pl_state = _pl->get_state();
#endif
return _control->steal_task(tid, &_steal_seed, _steal_offset);
}
#ifndef NDEBUG
int _sched_recursive_guard;
......@@ -177,13 +219,18 @@ private:
void (*_last_context_remained)(void*);
void* _last_context_remained_arg;
ParkingLot* _pl;
#ifdef BTHREAD_SAVE_PARKING_STATE
ParkingLot::State _last_pl_state;
#endif
size_t _steal_seed;
size_t _steal_offset;
StackContainer* _main_stack_container;
bthread_t _main_tid;
pthread_t _creation_pthread;
base::Mutex _rq_mutex;
WorkStealingQueue<bthread_t> _rq;
RemoteTaskQueue _remote_rq;
int _remote_num_nosignal;
int _remote_nsignaled;
};
} // namespace bthread
......
......@@ -22,7 +22,7 @@ inline uint32_t get_version(bthread_t tid) {
return (uint32_t)((tid >> 32) & 0xFFFFFFFFul);
}
TaskMeta* TaskGroup::address_meta(bthread_t tid) {
inline TaskMeta* TaskGroup::address_meta(bthread_t tid) {
// TaskMeta * m = address_resource<TaskMeta>(get_slot(tid));
// if (m != NULL && m->version == get_version(tid)) {
// return m;
......@@ -31,22 +31,19 @@ TaskMeta* TaskGroup::address_meta(bthread_t tid) {
return address_resource(get_slot(tid));
}
void TaskGroup::exchange(TaskGroup** pg, bthread_t next_tid) {
inline void TaskGroup::exchange(TaskGroup** pg, bthread_t next_tid) {
TaskGroup* g = *pg;
if (g->is_current_pthread_task()) {
return g->ready_to_run(next_tid);
}
if (!g->current_task()->about_to_quit) {
g->set_remained(ready_to_run_in_worker,
(void*)g->current_tid());
} else {
g->set_remained(ready_to_run_in_worker_nosignal,
(void*)g->current_tid());
}
g->set_remained((g->current_task()->about_to_quit
? ready_to_run_in_worker_ignoresignal
: ready_to_run_in_worker),
(void*)g->current_tid());
TaskGroup::sched_to(pg, next_tid);
}
void TaskGroup::sched_to(TaskGroup** pg, bthread_t next_tid) {
inline void TaskGroup::sched_to(TaskGroup** pg, bthread_t next_tid) {
TaskMeta* next_meta = address_meta(next_tid);
if (next_meta->stack_container == NULL) {
StackContainer* sc = get_stack(next_meta->stack_type(), task_runner);
......@@ -65,22 +62,26 @@ void TaskGroup::sched_to(TaskGroup** pg, bthread_t next_tid) {
sched_to(pg, next_meta);
}
void TaskGroup::set_remained(void (*last_context_remained)(void*), void* arg) {
_last_context_remained = last_context_remained;
_last_context_remained_arg = arg;
}
bool TaskGroup::is_current_main_task() const {
return current_tid() == _main_tid;
}
bool TaskGroup::is_current_pthread_task() const {
return _cur_meta->stack_container == _main_stack_container;
inline void TaskGroup::push_rq(bthread_t tid) {
while (!_rq.push(tid)) {
// Created too many bthreads: a promising approach is to insert the
// task into another TaskGroup, but we don't use it because:
// * There're already many bthreads to run, inserting the bthread
// into other TaskGroup does not help.
// * Insertions into other TaskGroups perform worse when all workers
// are busy at creating bthreads (proved by test_input_messenger in
// baidu-rpc)
flush_nosignal_tasks();
LOG_EVERY_SECOND(ERROR) << "_rq is full, capacity=" << _rq.capacity();
::usleep(1000);
}
}
int64_t TaskGroup::current_uptime_ns() const {
return base::cpuwide_time_ns() - _cur_meta->cpuwide_start_ns;
inline void TaskGroup::flush_nosignal_tasks_remote() {
if (_remote_num_nosignal) {
_remote_rq._mutex.lock();
flush_nosignal_tasks_remote_locked(_remote_rq._mutex);
}
}
} // namespace bthread
......
......@@ -6,15 +6,12 @@
#include <queue> // heap functions
#include "base/scoped_lock.h"
#include "base/logging.h"
#include "base/third_party/murmurhash3/murmurhash3.h" // fmix64
#include "base/third_party/murmurhash3/murmurhash3.h" // fmix32
#include "base/resource_pool.h"
#include "bvar/bvar.h"
#include "bthread/sys_futex.h"
#include "bthread/timer_thread.h"
#include "bthread/config.h"
#ifdef BAIDU_INTERNAL
#include "base/comlog_sink.h"
#endif
#include "bthread/log.h"
namespace bthread {
......@@ -59,7 +56,8 @@ class BAIDU_CACHELINE_ALIGNMENT TimerThread::Bucket {
public:
Bucket()
: _nearest_run_time(std::numeric_limits<int64_t>::max())
, _task_head(NULL) {}
, _task_head(NULL) {
}
~Bucket() {}
......@@ -70,14 +68,15 @@ public:
// Schedule a task into this bucket.
// Returns the TaskId and if it has the nearest run time.
ScheduleResult schedule(
void (*fn)(void*), void* arg, const timespec& abstime);
ScheduleResult schedule(void (*fn)(void*), void* arg,
const timespec& abstime);
// Pull all scheduled tasks.
// This function is called in timer thread.
Task* consume_tasks();
private:
base::Mutex _mutex;
internal::FastPthreadMutex _mutex;
int64_t _nearest_run_time;
Task* _task_head;
};
......@@ -152,15 +151,23 @@ int TimerThread::start(const TimerThreadOptions* options_in) {
TimerThread::Task* TimerThread::Bucket::consume_tasks() {
Task* head = NULL;
BAIDU_SCOPED_LOCK(_mutex);
head = _task_head;
_task_head = NULL;
_nearest_run_time = std::numeric_limits<int64_t>::max();
if (_task_head) { // NOTE: schedule() and consume_tasks() are sequenced
// by TimerThread._nearest_run_time and fenced by TimerThread._mutex.
// We can avoid touching the mutex and related cacheline when the
// bucket is actually empty.
BAIDU_SCOPED_LOCK(_mutex);
if (_task_head) {
head = _task_head;
_task_head = NULL;
_nearest_run_time = std::numeric_limits<int64_t>::max();
}
}
return head;
}
TimerThread::Bucket::ScheduleResult TimerThread::Bucket::schedule(
void (*fn)(void*), void* arg, const timespec& abstime) {
TimerThread::Bucket::ScheduleResult
TimerThread::Bucket::schedule(void (*fn)(void*), void* arg,
const timespec& abstime) {
base::ResourceId<Task> slot_id;
Task* task = base::get_resource<Task>(&slot_id);
if (task == NULL) {
......@@ -200,7 +207,7 @@ TimerThread::TaskId TimerThread::schedule(
}
// Hashing by pthread id is better for cache locality.
const Bucket::ScheduleResult result =
_buckets[base::fmix64(pthread_self()) % _options.num_buckets]
_buckets[base::fmix32(pthread_self()) % _options.num_buckets]
.schedule(fn, arg, abstime);
if (result.earlier) {
bool earlier = false;
......
......@@ -10,7 +10,7 @@
#include <pthread.h> // pthread_*
#include "base/atomicops.h"
#include "base/time.h" // time utilities
#include "base/synchronization/lock.h"// base::Mutex
#include "bthread/mutex.h"
namespace bthread {
......@@ -80,8 +80,7 @@ private:
TimerThreadOptions _options;
Bucket* _buckets; // list of tasks to be run
base::Mutex _mutex; // protect _nearest_run_time
internal::FastPthreadMutex _mutex; // protect _nearest_run_time
int64_t _nearest_run_time;
// the futex for wake up timer thread. can't use _nearest_run_time because
// it's 64-bit.
......
......@@ -57,7 +57,7 @@ public:
bool push(const T& x) {
const size_t b = _bottom.load(base::memory_order_relaxed);
const size_t t = _top.load(base::memory_order_acquire);
if (b - t >= _capacity) { // Full queue.
if (b >= t + _capacity) { // Full queue.
return false;
}
_buffer[b & (_capacity - 1)] = x;
......
......@@ -4,7 +4,7 @@
WORKROOT('../../../..')
CXXFLAGS('-g -pipe -Wall -W -fPIC -Wno-invalid-offsetof -fstrict-aliasing -Wno-unused-parameter -std=c++0x -include brpc/config.h')
CFLAGS('-g -pipe -Wall -W -fPIC -fstrict-aliasing -Wno-unused-parameter')
CPPFLAGS('-D__const__= -DUSE_SYMBOLIZE -DNO_TCMALLOC -DUNIT_TEST -Dprivate=public -Dprotected=public -D__STDC_FORMAT_MACROS -D__STDC_LIMIT_MACROS -D__STDC_CONSTANT_MACROS -DBVAR_NOT_LINK_DEFAULT_VARIABLES')
CPPFLAGS('-DBTHREAD_USE_FAST_PTHREAD_MUTEX -D__const__= -DUSE_SYMBOLIZE -DNO_TCMALLOC -DUNIT_TEST -Dprivate=public -Dprotected=public -D__STDC_FORMAT_MACROS -D__STDC_LIMIT_MACROS -D__STDC_CONSTANT_MACROS -DBVAR_NOT_LINK_DEFAULT_VARIABLES')
LDFLAGS('-lpthread -lrt -lssl -lcrypto -ldl -lz')
INCPATHS('..')
CONFIGS('third-64/gflags@base')
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment