// bthread - A M:N threading library to make applications more concurrent.
// Copyright (c) 2014 Baidu, Inc.
// 
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// 
//     http://www.apache.org/licenses/LICENSE-2.0
// 
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Author: Ge,Jun (gejun@baidu.com)
// Date: Sun Aug  3 12:46:15 CST 2014

#include <pthread.h>
#include <execinfo.h>
#include <dlfcn.h>                               // dlsym
#include <fcntl.h>                               // O_RDONLY
#include "butil/atomicops.h"
#include "bvar/bvar.h"
#include "bvar/collector.h"
#include "butil/macros.h"                         // BAIDU_CASSERT
#include "butil/containers/flat_map.h"
#include "butil/iobuf.h"
#include "butil/fd_guard.h"
#include "butil/files/file.h"
#include "butil/files/file_path.h"
#include "butil/file_util.h"
#include "butil/unique_ptr.h"
#include "butil/third_party/murmurhash3/murmurhash3.h"
#include "butil/logging.h"
#include "butil/object_pool.h"
#include "bthread/butex.h"                       // butex_*
#include "bthread/processor.h"                   // cpu_relax, barrier
#include "bthread/mutex.h"                       // bthread_mutex_t
#include "bthread/sys_futex.h"
#include "bthread/log.h"

extern "C" {
extern void* _dl_sym(void* handle, const char* symbol, void* caller);
}

namespace bthread {
// Warm up backtrace before main().
void* dummy_buf[4];
const int ALLOW_UNUSED dummy_bt = backtrace(dummy_buf, arraysize(dummy_buf));

// For controlling contentions collected per second.
static bvar::CollectorSpeedLimit g_cp_sl = BVAR_COLLECTOR_SPEED_LIMIT_INITIALIZER;

const size_t MAX_CACHED_CONTENTIONS = 512;
// Skip frames which are always same: the unlock function and submit_contention()
const int SKIPPED_STACK_FRAMES = 2;

struct SampledContention : public bvar::Collected {
    // time taken by lock and unlock, normalized according to sampling_range
    int64_t duration_ns;
    // number of samples, normalized according to to sampling_range
    double count;
    int nframes;          // #elements in stack
    void* stack[26];      // backtrace.

    // Implement bvar::Collected
    void dump_and_destroy(size_t round);
    void destroy();
    bvar::CollectorSpeedLimit* speed_limit() { return &g_cp_sl; }

    // For combining samples with hashmap.
    size_t hash_code() const {
        if (nframes == 0) {
            return 0;
        }
        uint32_t code = 1;
        uint32_t seed = nframes;
        butil::MurmurHash3_x86_32(stack, sizeof(void*) * nframes, seed, &code);
        return code;
    }
};

BAIDU_CASSERT(sizeof(SampledContention) == 256, be_friendly_to_allocator);

// Functor to compare contentions.
struct ContentionEqual {
    bool operator()(const SampledContention* c1,
                    const SampledContention* c2) const {
        return c1->hash_code() == c2->hash_code() &&
            c1->nframes == c2->nframes &&
            memcmp(c1->stack, c2->stack, sizeof(void*) * c1->nframes) == 0;
    }
};

// Functor to hash contentions.
struct ContentionHash {
    size_t operator()(const SampledContention* c) const {
        return c->hash_code();
    }
};

// The global context for contention profiler.
class ContentionProfiler {
public:
    typedef butil::FlatMap<SampledContention*, SampledContention*,
                          ContentionHash, ContentionEqual> ContentionMap;

    explicit ContentionProfiler(const char* name);
    ~ContentionProfiler();
    
    void dump_and_destroy(SampledContention* c);

    // Write buffered data into resulting file. If `ending' is true, append
    // content of /proc/self/maps and retry writing until buffer is empty.
    void flush_to_disk(bool ending);

    void init_if_needed();
private:
    bool _init;  // false before first dump_and_destroy is called
    bool _first_write;      // true if buffer was not written to file yet.
    std::string _filename;  // the file storing profiling result.
    butil::IOBuf _disk_buf;  // temp buf before saving the file.
    ContentionMap _dedup_map; // combining same samples to make result smaller.
};

ContentionProfiler::ContentionProfiler(const char* name)
    : _init(false)
    , _first_write(true)
    , _filename(name) {
}

ContentionProfiler::~ContentionProfiler() {
    if (!_init) {
        // Don't write file if dump_and_destroy was never called. We may create
        // such instances in ContentionProfilerStart.
        return;
    }
    flush_to_disk(true);
}

void ContentionProfiler::init_if_needed() {
    if (!_init) {
        // Already output nanoseconds, always set cycles/second to 1000000000.
        _disk_buf.append("--- contention\ncycles/second=1000000000\n");
        CHECK_EQ(0, _dedup_map.init(1024, 60));
        _init = true;
    }
}
    
void ContentionProfiler::dump_and_destroy(SampledContention* c) {
    init_if_needed();
    // Categorize the contention.
    SampledContention** p_c2 = _dedup_map.seek(c);
    if (p_c2) {
        // Most contentions are caused by several hotspots, this should be
        // the common branch.
        SampledContention* c2 = *p_c2;
        c2->duration_ns += c->duration_ns;
        c2->count += c->count;
        c->destroy();
    } else {
        _dedup_map.insert(c, c);
    }
    if (_dedup_map.size() > MAX_CACHED_CONTENTIONS) {
        flush_to_disk(false);
    }
}

void ContentionProfiler::flush_to_disk(bool ending) {
    BT_VLOG << "flush_to_disk(ending=" << ending << ")";
    
    // Serialize contentions in _dedup_map into _disk_buf.
    if (!_dedup_map.empty()) {
        BT_VLOG << "dedup_map=" << _dedup_map.size();
        butil::IOBufBuilder os;
        for (ContentionMap::const_iterator
                 it = _dedup_map.begin(); it != _dedup_map.end(); ++it) {
            SampledContention* c = it->second;
            os << c->duration_ns << ' ' << (size_t)ceil(c->count) << " @";
            for (int i = SKIPPED_STACK_FRAMES; i < c->nframes; ++i) {
                os << ' ' << (void*)c->stack[i];
            }
            os << '\n';
            c->destroy();
        }
        _dedup_map.clear();
        _disk_buf.append(os.buf());
    }

    // Append /proc/self/maps to the end of the contention file, required by
    // pprof.pl, otherwise the functions in sys libs are not interpreted.
    if (ending) {
        BT_VLOG << "Append /proc/self/maps";
        // Failures are not critical, don't return directly.
        butil::IOPortal mem_maps;
        const butil::fd_guard fd(open("/proc/self/maps", O_RDONLY));
        if (fd >= 0) {
            while (true) {
                ssize_t nr = mem_maps.append_from_file_descriptor(fd, 8192);
                if (nr < 0) {
                    if (errno == EINTR) {
                        continue;
                    }
                    PLOG(ERROR) << "Fail to read /proc/self/maps";
                    break;
                }
                if (nr == 0) {
                    _disk_buf.append(mem_maps);
                    break;
                }
            }
        } else {
            PLOG(ERROR) << "Fail to open /proc/self/maps";
        }
    }
    // Write _disk_buf into _filename
    butil::File::Error error;
    butil::FilePath path(_filename);
    butil::FilePath dir = path.DirName();
    if (!butil::CreateDirectoryAndGetError(dir, &error)) {
        LOG(ERROR) << "Fail to create directory=`" << dir.value()
                   << "', " << error;
        return;
    }
    // Truncate on first write, append on later writes.
    int flag = O_APPEND;
    if (_first_write) {
        _first_write = false;
        flag = O_TRUNC;
    }
    butil::fd_guard fd(open(_filename.c_str(), O_WRONLY|O_CREAT|flag, 0666));
    if (fd < 0) {
        PLOG(ERROR) << "Fail to open " << _filename;
        return;
    }
    // Write once normally, write until empty in the end.
    do {
        ssize_t nw = _disk_buf.cut_into_file_descriptor(fd);
        if (nw < 0) {
            if (errno == EINTR) {
                continue;
            }
            PLOG(ERROR) << "Fail to write into " << _filename;
            return;
        }
        BT_VLOG << "Write " << nw << " bytes into " << _filename;
    } while (!_disk_buf.empty() && ending);
}

// If contention profiler is on, this variable will be set with a valid
// instance. NULL otherwise.
static ContentionProfiler* BAIDU_CACHELINE_ALIGNMENT g_cp = NULL;
// Need this version to solve an issue that non-empty entries left by
// previous contention profilers should be detected and overwritten.
static uint64_t g_cp_version = 0;
// Protecting accesss to g_cp.
static pthread_mutex_t g_cp_mutex = PTHREAD_MUTEX_INITIALIZER;

// The map storing information for profiling pthread_mutex. Different from
// bthread_mutex, we can't save stuff into pthread_mutex, we neither can
// save the info in TLS reliably, since a mutex can be unlocked in a different
// thread from the one locked (although rare)
// This map must be very fast, since it's accessed inside the lock.
// Layout of the map:
//  * Align each entry by cacheline so that different threads do not collide.
//  * Hash the mutex into the map by its address. If the entry is occupied,
//    cancel sampling.
// The canceling rate should be small provided that programs are unlikely to
// lock a lot of mutexes simultaneously.
const size_t MUTEX_MAP_SIZE = 1024;
BAIDU_CASSERT((MUTEX_MAP_SIZE & (MUTEX_MAP_SIZE - 1)) == 0, must_be_power_of_2);
struct BAIDU_CACHELINE_ALIGNMENT MutexMapEntry {
    butil::static_atomic<uint64_t> versioned_mutex;
    bthread_contention_site_t csite;
};
static MutexMapEntry g_mutex_map[MUTEX_MAP_SIZE] = {}; // zero-initialize

void SampledContention::dump_and_destroy(size_t /*round*/) {
    if (g_cp) {
        // Must be protected with mutex to avoid race with deletion of ctx.
        // dump_and_destroy is called from dumping thread only so this mutex
        // is not contended at most of time.
        BAIDU_SCOPED_LOCK(g_cp_mutex);
        if (g_cp) {
            g_cp->dump_and_destroy(this);
            return;
        }
    }
    destroy();
}

void SampledContention::destroy() {
    butil::return_object(this);
}

// Remember the conflict hashes for troubleshooting, should be 0 at most of time.
static butil::static_atomic<int64_t> g_nconflicthash = BUTIL_STATIC_ATOMIC_INIT(0);
static int64_t get_nconflicthash(void*) {
    return g_nconflicthash.load(butil::memory_order_relaxed);
}

// Start profiling contention.
bool ContentionProfilerStart(const char* filename) {
    if (filename == NULL) {
        LOG(ERROR) << "Parameter [filename] is NULL";
        return false;
    }
    // g_cp is also the flag marking start/stop.
    if (g_cp) {
        return false;
    }

    // Create related global bvar lazily.
    static bvar::PassiveStatus<int64_t> g_nconflicthash_var
        ("contention_profiler_conflict_hash", get_nconflicthash, NULL);
    static bvar::DisplaySamplingRatio g_sampling_ratio_var(
        "contention_profiler_sampling_ratio", &g_cp_sl);
    
    // Optimistic locking. A not-used ContentionProfiler does not write file.
    std::unique_ptr<ContentionProfiler> ctx(new ContentionProfiler(filename));
    {
        BAIDU_SCOPED_LOCK(g_cp_mutex);
        if (g_cp) {
            return false;
        }
        g_cp = ctx.release();
        ++g_cp_version;  // invalidate non-empty entries that may exist.
    }
    return true;
}

// Stop contention profiler.
void ContentionProfilerStop() {
    ContentionProfiler* ctx = NULL;
    if (g_cp) {
        std::unique_lock<pthread_mutex_t> mu(g_cp_mutex);
        if (g_cp) {
            ctx = g_cp;
            g_cp = NULL;
            mu.unlock();

            // make sure it's initialiazed in case no sample was gathered,
            // otherwise nothing will be written and succeeding pprof will fail.
            ctx->init_if_needed();
            // Deletion is safe because usages of g_cp are inside g_cp_mutex.
            delete ctx;
            return;
        }
    }
    LOG(ERROR) << "Contention profiler is not started!";
}

BUTIL_FORCE_INLINE bool
is_contention_site_valid(const bthread_contention_site_t& cs) {
    return cs.sampling_range;
}

BUTIL_FORCE_INLINE void
make_contention_site_invalid(bthread_contention_site_t* cs) {
    cs->sampling_range = 0;
}

// Replace pthread_mutex_lock and pthread_mutex_unlock:
// First call to sys_pthread_mutex_lock sets sys_pthread_mutex_lock to the
// real function so that next calls go to the real function directly. This
// technique avoids calling pthread_once each time.
typedef int (*MutexOp)(pthread_mutex_t*);
int first_sys_pthread_mutex_lock(pthread_mutex_t* mutex);
int first_sys_pthread_mutex_unlock(pthread_mutex_t* mutex);
static MutexOp sys_pthread_mutex_lock = first_sys_pthread_mutex_lock;
static MutexOp sys_pthread_mutex_unlock = first_sys_pthread_mutex_unlock;
static pthread_once_t init_sys_mutex_lock_once = PTHREAD_ONCE_INIT;

// dlsym may call malloc to allocate space for dlerror and causes contention
// profiler to deadlock at boostraping when the program is linked with
// libunwind. The deadlock bt:
//   #0  0x00007effddc99b80 in __nanosleep_nocancel () at ../sysdeps/unix/syscall-template.S:81
//   #1  0x00000000004b4df7 in butil::internal::SpinLockDelay(int volatile*, int, int) ()
//   #2  0x00000000004b4d57 in SpinLock::SlowLock() ()
//   #3  0x00000000004b4a63 in tcmalloc::ThreadCache::InitModule() ()
//   #4  0x00000000004aa2b5 in tcmalloc::ThreadCache::GetCache() ()
//   #5  0x000000000040c6c5 in (anonymous namespace)::do_malloc_no_errno(unsigned long) [clone.part.16] ()
//   #6  0x00000000006fc125 in tc_calloc ()
//   #7  0x00007effdd245690 in _dlerror_run (operate=operate@entry=0x7effdd245130 <dlsym_doit>, args=args@entry=0x7fff483dedf0) at dlerror.c:141
//   #8  0x00007effdd245198 in __dlsym (handle=<optimized out>, name=<optimized out>) at dlsym.c:70
//   #9  0x0000000000666517 in bthread::init_sys_mutex_lock () at bthread/mutex.cpp:358
//   #10 0x00007effddc97a90 in pthread_once () at ../nptl/sysdeps/unix/sysv/linux/x86_64/pthread_once.S:103
//   #11 0x000000000066649f in bthread::first_sys_pthread_mutex_lock (mutex=0xbaf880 <_ULx86_64_lock>) at bthread/mutex.cpp:366
//   #12 0x00000000006678bc in pthread_mutex_lock_impl (mutex=0xbaf880 <_ULx86_64_lock>) at bthread/mutex.cpp:489
//   #13 pthread_mutex_lock (__mutex=__mutex@entry=0xbaf880 <_ULx86_64_lock>) at bthread/mutex.cpp:751
//   #14 0x00000000004c6ea1 in _ULx86_64_init () at x86_64/Gglobal.c:83
//   #15 0x00000000004c44fb in _ULx86_64_init_local (cursor=0x7fff483df340, uc=0x7fff483def90) at x86_64/Ginit_local.c:47
//   #16 0x00000000004b5012 in GetStackTrace(void**, int, int) ()
//   #17 0x00000000004b2095 in tcmalloc::PageHeap::GrowHeap(unsigned long) ()
//   #18 0x00000000004b23a3 in tcmalloc::PageHeap::New(unsigned long) ()
//   #19 0x00000000004ad457 in tcmalloc::CentralFreeList::Populate() ()
//   #20 0x00000000004ad628 in tcmalloc::CentralFreeList::FetchFromSpansSafe() ()
//   #21 0x00000000004ad6a3 in tcmalloc::CentralFreeList::RemoveRange(void**, void**, int) ()
//   #22 0x00000000004b3ed3 in tcmalloc::ThreadCache::FetchFromCentralCache(unsigned long, unsigned long) ()
//   #23 0x00000000006fbb9a in tc_malloc ()
// Call _dl_sym which is a private function in glibc to workaround the malloc
// causing deadlock temporarily. This fix is hardly portable.
static void init_sys_mutex_lock() {
    // TODO: may need dlvsym when GLIBC has multiple versions of a same symbol.
    // http://blog.fesnel.com/blog/2009/08/25/preloading-with-multiple-symbol-versions
    sys_pthread_mutex_lock = (MutexOp)_dl_sym(RTLD_NEXT, "pthread_mutex_lock", (void*)init_sys_mutex_lock);
    sys_pthread_mutex_unlock = (MutexOp)_dl_sym(RTLD_NEXT, "pthread_mutex_unlock", (void*)init_sys_mutex_lock);
}

// Make sure pthread functions are ready before main().
const int ALLOW_UNUSED dummy = pthread_once(&init_sys_mutex_lock_once, init_sys_mutex_lock);

int first_sys_pthread_mutex_lock(pthread_mutex_t* mutex) {
    pthread_once(&init_sys_mutex_lock_once, init_sys_mutex_lock);
    return sys_pthread_mutex_lock(mutex);
}
int first_sys_pthread_mutex_unlock(pthread_mutex_t* mutex) {
    pthread_once(&init_sys_mutex_lock_once, init_sys_mutex_lock);
    return sys_pthread_mutex_unlock(mutex);
}

inline uint64_t hash_mutex_ptr(const pthread_mutex_t* m) {
    return butil::fmix64((uint64_t)m);
}

// Mark being inside locking so that pthread_mutex calls inside collecting
// code are never sampled, otherwise deadlock may occur.
static __thread bool tls_inside_lock = false;

// Speed up with TLS:
//   Most pthread_mutex are locked and unlocked in the same thread. Putting
//   contention information in TLS avoids collisions that may occur in
//   g_mutex_map. However when user unlocks in another thread, the info cached
//   in the locking thread is not removed, making the space bloated. We use a
//   simple strategy to solve the issue: If a thread has enough thread-local
//   space to store the info, save it, otherwise save it in g_mutex_map. For
//   a program that locks and unlocks in the same thread and does not lock a
//   lot of mutexes simulateneously, this strategy always uses the TLS.
#ifndef DONT_SPEEDUP_PTHREAD_CONTENTION_PROFILER_WITH_TLS
const int TLS_MAX_COUNT = 3;
struct MutexAndContentionSite {
    pthread_mutex_t* mutex;
    bthread_contention_site_t csite;
};
struct TLSPthreadContentionSites {
    int count;
    uint64_t cp_version;
    MutexAndContentionSite list[TLS_MAX_COUNT];
};
static __thread TLSPthreadContentionSites tls_csites = {0,0,{}};
#endif  // DONT_SPEEDUP_PTHREAD_CONTENTION_PROFILER_WITH_TLS

// Guaranteed in linux/win.
const int PTR_BITS = 48;

inline bthread_contention_site_t*
add_pthread_contention_site(pthread_mutex_t* mutex) {
    MutexMapEntry& entry = g_mutex_map[hash_mutex_ptr(mutex) & (MUTEX_MAP_SIZE - 1)];
    butil::static_atomic<uint64_t>& m = entry.versioned_mutex;
    uint64_t expected = m.load(butil::memory_order_relaxed);
    // If the entry is not used or used by previous profiler, try to CAS it.
    if (expected == 0 ||
        (expected >> PTR_BITS) != (g_cp_version & ((1 << (64 - PTR_BITS)) - 1))) {
        uint64_t desired = (g_cp_version << PTR_BITS) | (uint64_t)mutex;
        if (m.compare_exchange_strong(
                expected, desired, butil::memory_order_acquire)) {
            return &entry.csite;
        }
    }
    g_nconflicthash.fetch_add(1, butil::memory_order_relaxed);
    return NULL;
}

inline bool remove_pthread_contention_site(
    pthread_mutex_t* mutex, bthread_contention_site_t* saved_csite) {
    MutexMapEntry& entry = g_mutex_map[hash_mutex_ptr(mutex) & (MUTEX_MAP_SIZE - 1)];
    butil::static_atomic<uint64_t>& m = entry.versioned_mutex;
    if ((m.load(butil::memory_order_relaxed) & ((((uint64_t)1) << PTR_BITS) - 1))
        != (uint64_t)mutex) {
        // This branch should be the most common case since most locks are
        // neither contended nor sampled. We have one memory indirection and
        // several bitwise operations here, the cost should be ~ 5-50ns
        return false;
    }
    // Although this branch is inside a contended lock, we should also make it
    // as simple as possible because altering the critical section too much
    // may make unpredictable impact to thread interleaving status, which
    // makes profiling result less accurate.
    *saved_csite = entry.csite;
    make_contention_site_invalid(&entry.csite);
    m.store(0, butil::memory_order_release);
    return true;
}

// Submit the contention along with the callsite('s stacktrace)
void submit_contention(const bthread_contention_site_t& csite, int64_t now_ns) {
    tls_inside_lock = true;
    SampledContention* sc = butil::get_object<SampledContention>();
    // Normalize duration_us and count so that they're addable in later
    // processings. Notice that sampling_range is adjusted periodically by
    // collecting thread.
    sc->duration_ns = csite.duration_ns * bvar::COLLECTOR_SAMPLING_BASE
        / csite.sampling_range;
    sc->count = bvar::COLLECTOR_SAMPLING_BASE / (double)csite.sampling_range;
    sc->nframes = backtrace(sc->stack, arraysize(sc->stack)); // may lock
    sc->submit(now_ns / 1000);  // may lock
    tls_inside_lock = false;
}

BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(pthread_mutex_t* mutex) {
    // Don't change behavior of lock when profiler is off.
    if (!g_cp ||
        // collecting code including backtrace() and submit() may call
        // pthread_mutex_lock and cause deadlock. Don't sample.
        tls_inside_lock) {
        return sys_pthread_mutex_lock(mutex);
    }
    // Don't slow down non-contended locks.
    int rc = pthread_mutex_trylock(mutex);
    if (rc != EBUSY) {
        return rc;
    }
    // Ask bvar::Collector if this (contended) locking should be sampled
    const size_t sampling_range = bvar::is_collectable(&g_cp_sl);

    bthread_contention_site_t* csite = NULL;
#ifndef DONT_SPEEDUP_PTHREAD_CONTENTION_PROFILER_WITH_TLS
    TLSPthreadContentionSites& fast_alt = tls_csites;
    if (fast_alt.cp_version != g_cp_version) {
        fast_alt.cp_version = g_cp_version;
        fast_alt.count = 0;
    }
    if (fast_alt.count < TLS_MAX_COUNT) {
        MutexAndContentionSite& entry = fast_alt.list[fast_alt.count++];
        entry.mutex = mutex;
        csite = &entry.csite;
        if (!sampling_range) {
            make_contention_site_invalid(&entry.csite);
            return sys_pthread_mutex_lock(mutex);
        }
    }
#endif
    if (!sampling_range) {  // don't sample
        return sys_pthread_mutex_lock(mutex);
    }
    // Lock and monitor the waiting time.
    const int64_t start_ns = butil::cpuwide_time_ns();
    rc = sys_pthread_mutex_lock(mutex);
    if (!rc) { // Inside lock
        if (!csite) {
            csite = add_pthread_contention_site(mutex);
            if (csite == NULL) {
                return rc;
            }
        }
        csite->duration_ns = butil::cpuwide_time_ns() - start_ns;
        csite->sampling_range = sampling_range;
    } // else rare
    return rc;
}

BUTIL_FORCE_INLINE int pthread_mutex_unlock_impl(pthread_mutex_t* mutex) {
    // Don't change behavior of unlock when profiler is off.
    if (!g_cp || tls_inside_lock) {
        // This branch brings an issue that an entry created by
        // add_pthread_contention_site may not be cleared. Thus we add a 
        // 16-bit rolling version in the entry to find out such entry.
        return sys_pthread_mutex_unlock(mutex);
    }
    int64_t unlock_start_ns = 0;
    bool miss_in_tls = true;
    bthread_contention_site_t saved_csite = {0,0};
#ifndef DONT_SPEEDUP_PTHREAD_CONTENTION_PROFILER_WITH_TLS
    TLSPthreadContentionSites& fast_alt = tls_csites;
    for (int i = fast_alt.count - 1; i >= 0; --i) {
        if (fast_alt.list[i].mutex == mutex) {
            if (is_contention_site_valid(fast_alt.list[i].csite)) {
                saved_csite = fast_alt.list[i].csite;
                unlock_start_ns = butil::cpuwide_time_ns();
            }
            fast_alt.list[i] = fast_alt.list[--fast_alt.count];
            miss_in_tls = false;
            break;
        }
    }
#endif
    // Check the map to see if the lock is sampled. Notice that we're still
    // inside critical section.
    if (miss_in_tls) {
        if (remove_pthread_contention_site(mutex, &saved_csite)) {
            unlock_start_ns = butil::cpuwide_time_ns();
        }
    }
    const int rc = sys_pthread_mutex_unlock(mutex);
    // [Outside lock]
    if (unlock_start_ns) {
        const int64_t unlock_end_ns = butil::cpuwide_time_ns();
        saved_csite.duration_ns += unlock_end_ns - unlock_start_ns;
        submit_contention(saved_csite, unlock_end_ns);
    }
    return rc;
}

// Implement bthread_mutex_t related functions
struct MutexInternal {
    butil::static_atomic<unsigned char> locked;
    butil::static_atomic<unsigned char> contended;
    unsigned short padding;
};

const MutexInternal MUTEX_CONTENDED_RAW = {{1},{1},0};
const MutexInternal MUTEX_LOCKED_RAW = {{1},{0},0};
// Define as macros rather than constants which can't be put in read-only
// section and affected by initialization-order fiasco.
#define BTHREAD_MUTEX_CONTENDED (*(const unsigned*)&bthread::MUTEX_CONTENDED_RAW)
#define BTHREAD_MUTEX_LOCKED (*(const unsigned*)&bthread::MUTEX_LOCKED_RAW)

BAIDU_CASSERT(sizeof(unsigned) == sizeof(MutexInternal),
              sizeof_mutex_internal_must_equal_unsigned);

inline int mutex_lock_contended(bthread_mutex_t* m) {
    butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)m->butex;
    while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) {
        if (bthread::butex_wait(whole, BTHREAD_MUTEX_CONTENDED, NULL) < 0 &&
            errno != EWOULDBLOCK && errno != EINTR/*note*/) {
            // a mutex lock should ignore interrruptions in general since
            // user code is unlikely to check the return value.
            return errno;
        }
    }
    return 0;
}

inline int mutex_timedlock_contended(
    bthread_mutex_t* m, const struct timespec* __restrict abstime) {
    butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)m->butex;
    while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) {
        if (bthread::butex_wait(whole, BTHREAD_MUTEX_CONTENDED, abstime) < 0 &&
            errno != EWOULDBLOCK && errno != EINTR/*note*/) {
            // a mutex lock should ignore interrruptions in general since
            // user code is unlikely to check the return value.
            return errno;
        }
    }
    return 0;
}

#ifdef BTHREAD_USE_FAST_PTHREAD_MUTEX
namespace internal {

int FastPthreadMutex::lock_contended() {
    butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)&_futex;
    while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) {
        if (futex_wait_private(whole, BTHREAD_MUTEX_CONTENDED, NULL) < 0
            && errno != EWOULDBLOCK) {
            return errno;
        }
    }
    return 0;
}

void FastPthreadMutex::lock() {
    bthread::MutexInternal* split = (bthread::MutexInternal*)&_futex;
    if (split->locked.exchange(1, butil::memory_order_acquire)) {
        (void)lock_contended();
    }
}

bool FastPthreadMutex::try_lock() {
    bthread::MutexInternal* split = (bthread::MutexInternal*)&_futex;
    return !split->locked.exchange(1, butil::memory_order_acquire);
}

void FastPthreadMutex::unlock() {
    butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)&_futex;
    const unsigned prev = whole->exchange(0, butil::memory_order_release);
    // CAUTION: the mutex may be destroyed, check comments before butex_create
    if (prev != BTHREAD_MUTEX_LOCKED) {
        futex_wake_private(whole, 1);
    }
}

} // namespace internal
#endif // BTHREAD_USE_FAST_PTHREAD_MUTEX

} // namespace bthread

extern "C" {

int bthread_mutex_init(bthread_mutex_t* __restrict m,
                       const bthread_mutexattr_t* __restrict) __THROW {
    bthread::make_contention_site_invalid(&m->csite);
    m->butex = bthread::butex_create_checked<unsigned>();
    if (!m->butex) {
        return ENOMEM;
    }
    *m->butex = 0;
    return 0;
}

int bthread_mutex_destroy(bthread_mutex_t* m) __THROW {
    bthread::butex_destroy(m->butex);
    return 0;
}

int bthread_mutex_trylock(bthread_mutex_t* m) __THROW {
    bthread::MutexInternal* split = (bthread::MutexInternal*)m->butex;
    if (!split->locked.exchange(1, butil::memory_order_acquire)) {
        return 0;
    }
    return EBUSY;
}

int bthread_mutex_lock_contended(bthread_mutex_t* m) {
    return bthread::mutex_lock_contended(m);
}

int bthread_mutex_lock(bthread_mutex_t* m) __THROW {
    bthread::MutexInternal* split = (bthread::MutexInternal*)m->butex;
    if (!split->locked.exchange(1, butil::memory_order_acquire)) {
        return 0;
    }
    // Don't sample when contention profiler is off.
    if (!bthread::g_cp) {
        return bthread::mutex_lock_contended(m);
    }
    // Ask Collector if this (contended) locking should be sampled.
    const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl);
    if (!sampling_range) { // Don't sample
        return bthread::mutex_lock_contended(m);
    }
    // Start sampling.
    const int64_t start_ns = butil::cpuwide_time_ns();
    // NOTE: Don't modify m->csite outside lock since multiple threads are
    // still contending with each other.
    const int rc = bthread::mutex_lock_contended(m);
    if (!rc) { // Inside lock
        m->csite.duration_ns = butil::cpuwide_time_ns() - start_ns;
        m->csite.sampling_range = sampling_range;
    } // else rare
    return rc;
}

int bthread_mutex_timedlock(bthread_mutex_t* __restrict m,
                            const struct timespec* __restrict abstime) __THROW {
    bthread::MutexInternal* split = (bthread::MutexInternal*)m->butex;
    if (!split->locked.exchange(1, butil::memory_order_acquire)) {
        return 0;
    }
    // Don't sample when contention profiler is off.
    if (!bthread::g_cp) {
        return bthread::mutex_timedlock_contended(m, abstime);
    }
    // Ask Collector if this (contended) locking should be sampled.
    const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl);
    if (!sampling_range) { // Don't sample
        return bthread::mutex_timedlock_contended(m, abstime);
    }
    // Start sampling.
    const int64_t start_ns = butil::cpuwide_time_ns();
    // NOTE: Don't modify m->csite outside lock since multiple threads are
    // still contending with each other.
    const int rc = bthread::mutex_timedlock_contended(m, abstime);
    if (!rc) { // Inside lock
        m->csite.duration_ns = butil::cpuwide_time_ns() - start_ns;
        m->csite.sampling_range = sampling_range;
    } else if (rc == ETIMEDOUT) {
        // Failed to lock due to ETIMEDOUT, submit the elapse directly.
        const int64_t end_ns = butil::cpuwide_time_ns();
        const bthread_contention_site_t csite = {end_ns - start_ns, sampling_range};
        bthread::submit_contention(csite, end_ns);
    }
    return rc;
}

int bthread_mutex_unlock(bthread_mutex_t* m) __THROW {
    butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)m->butex;
    bthread_contention_site_t saved_csite = {0, 0};
    if (bthread::is_contention_site_valid(m->csite)) {
        saved_csite = m->csite;
        bthread::make_contention_site_invalid(&m->csite);
    }
    const unsigned prev = whole->exchange(0, butil::memory_order_release);
    // CAUTION: the mutex may be destroyed, check comments before butex_create
    if (prev == BTHREAD_MUTEX_LOCKED) {
        return 0;
    }
    // Wakeup one waiter
    if (!bthread::is_contention_site_valid(saved_csite)) {
        bthread::butex_wake(whole);
        return 0;
    }
    const int64_t unlock_start_ns = butil::cpuwide_time_ns();
    bthread::butex_wake(whole);
    const int64_t unlock_end_ns = butil::cpuwide_time_ns();
    saved_csite.duration_ns += unlock_end_ns - unlock_start_ns;
    bthread::submit_contention(saved_csite, unlock_end_ns);
    return 0;
}

int pthread_mutex_lock (pthread_mutex_t *__mutex) __THROW {
    return bthread::pthread_mutex_lock_impl(__mutex);
}
int pthread_mutex_unlock (pthread_mutex_t *__mutex) __THROW {
    return bthread::pthread_mutex_unlock_impl(__mutex);
}

}  // extern "C"