id.cpp 24.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

gejun's avatar
gejun committed
18 19 20 21 22
// bthread - A M:N threading library to make applications more concurrent.

// Date: Sun Aug  3 12:46:15 CST 2014

#include <deque>
23
#include "butil/logging.h"
gejun's avatar
gejun committed
24
#include "bthread/butex.h"                       // butex_*
25
#include "bthread/mutex.h"
gejun's avatar
gejun committed
26
#include "bthread/list_of_abafree_id.h"
27
#include "butil/resource_pool.h"
gejun's avatar
gejun committed
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
#include "bthread/bthread.h"

namespace bthread {

// This queue reduces the chance to allocate memory for deque
template <typename T, int N>
class SmallQueue {
public:
    SmallQueue() : _begin(0), _size(0), _full(NULL) {}
    
    void push(const T& val) {
        if (_full != NULL && !_full->empty()) {
            _full->push_back(val);
        } else if (_size < N) {
            int tail = _begin + _size;
            if (tail >= N) {
                tail -= N;
            }
            _c[tail] = val;
            ++_size;
        } else {
            if (_full == NULL) {
                _full = new std::deque<T>;
            }
            _full->push_back(val);
        }
    }
    bool pop(T* val) {
        if (_size > 0) {
            *val = _c[_begin];
            ++_begin;
            if (_begin >= N) {
                _begin -= N;
            }
            --_size;
            return true;
        } else if (_full && !_full->empty()) {
            *val = _full->front();
            _full->pop_front();
            return true;
        }
        return false;
    }
    bool empty() const {
        return _size == 0 && (_full == NULL || _full->empty());
    }

    size_t size() const {
        return _size + (_full ? _full->size() : 0);
    }

    void clear() {
        _size = 0;
        _begin = 0;
        if (_full) {
            _full->clear();
        }
    }

    ~SmallQueue() {
        delete _full;
        _full = NULL;
    }
    
private:
    DISALLOW_COPY_AND_ASSIGN(SmallQueue);
    
    int _begin;
    int _size;
    T _c[N];
    std::deque<T>* _full;
};

struct PendingError {
    bthread_id_t id;
    int error_code;
    std::string error_text;
    const char *location;

    PendingError() : id(INVALID_BTHREAD_ID), error_code(0), location(NULL) {}
};

struct BAIDU_CACHELINE_ALIGNMENT Id {
    // first_ver ~ locked_ver - 1: unlocked versions
    // locked_ver: locked
    // unlockable_ver: locked and about to be destroyed
    // contended_ver: locked and contended
    uint32_t first_ver;
    uint32_t locked_ver;
117
    internal::FastPthreadMutex mutex;
gejun's avatar
gejun committed
118 119 120
    void* data;
    int (*on_error)(bthread_id_t, void*, int);
    int (*on_error2)(bthread_id_t, void*, int, const std::string&);
gejun's avatar
gejun committed
121
    const char *lock_location;
gejun's avatar
gejun committed
122 123
    uint32_t* butex;
    uint32_t* join_butex;
gejun's avatar
gejun committed
124 125 126 127 128
    SmallQueue<PendingError, 2> pending_q;
    
    Id() {
        // Although value of the butex(as version part of bthread_id_t)
        // does not matter, we set it to 0 to make program more deterministic.
gejun's avatar
gejun committed
129 130 131 132
        butex = bthread::butex_create_checked<uint32_t>();
        join_butex = bthread::butex_create_checked<uint32_t>();
        *butex = 0;
        *join_butex = 0;
gejun's avatar
gejun committed
133 134 135
    }

    ~Id() {
gejun's avatar
gejun committed
136 137
        bthread::butex_destroy(butex);
        bthread::butex_destroy(join_butex);
gejun's avatar
gejun committed
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
    }

    inline bool has_version(uint32_t id_ver) const {
        return id_ver >= first_ver && id_ver < locked_ver;
    }
    inline uint32_t contended_ver() const { return locked_ver + 1; }
    inline uint32_t unlockable_ver() const { return locked_ver + 2; }
    inline uint32_t last_ver() const { return unlockable_ver(); }
    
    // also the next "first_ver"
    inline uint32_t end_ver() const { return last_ver() + 1; }
};

BAIDU_CASSERT(sizeof(Id) % 64 == 0, sizeof_Id_must_align);

153
typedef butil::ResourceId<Id> IdResourceId;
gejun's avatar
gejun committed
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169

inline bthread_id_t make_id(uint32_t version, IdResourceId slot) {
    const bthread_id_t tmp =
        { (((uint64_t)slot.value) << 32) | (uint64_t)version };
    return tmp;
}

inline IdResourceId get_slot(bthread_id_t id) {
    const IdResourceId tmp = { (id.value >> 32) };
    return tmp;
}

inline uint32_t get_version(bthread_id_t id) {
    return (uint32_t)(id.value & 0xFFFFFFFFul);
}

gejun's avatar
gejun committed
170
inline bool id_exists_with_true_negatives(bthread_id_t id) {
gejun's avatar
gejun committed
171 172 173 174 175
    Id* const meta = address_resource(get_slot(id));
    if (meta == NULL) {
        return false;
    }
    const uint32_t id_ver = bthread::get_version(id);
gejun's avatar
gejun committed
176
    return id_ver >= meta->first_ver && id_ver <= meta->last_ver();
gejun's avatar
gejun committed
177 178 179 180 181
}
// required by unittest
uint32_t id_value(bthread_id_t id) {
    Id* const meta = address_resource(get_slot(id));
    if (meta != NULL) {
gejun's avatar
gejun committed
182
        return *meta->butex;
gejun's avatar
gejun committed
183 184 185 186 187 188 189 190 191 192 193 194 195 196
    }
    return 0;  // valid version never be zero
}

static int default_bthread_id_on_error(bthread_id_t id, void*, int) {
    return bthread_id_unlock_and_destroy(id);
}
static int default_bthread_id_on_error2(
    bthread_id_t id, void*, int, const std::string&) {
    return bthread_id_unlock_and_destroy(id);
}

void id_status(bthread_id_t id, std::ostream &os) {
    bthread::Id* const meta = address_resource(bthread::get_slot(id));
197
    if (!meta) {
gejun's avatar
gejun committed
198 199 200 201
        os << "Invalid id=" << id.value << '\n';
        return;
    }
    const uint32_t id_ver = bthread::get_version(id);
gejun's avatar
gejun committed
202
    uint32_t* butex = meta->butex;
gejun's avatar
gejun committed
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
    bool valid = true;
    void* data = NULL;
    int (*on_error)(bthread_id_t, void*, int) = NULL;
    int (*on_error2)(bthread_id_t, void*, int, const std::string&) = NULL;
    uint32_t first_ver = 0;
    uint32_t locked_ver = 0;
    uint32_t unlockable_ver = 0;
    uint32_t contended_ver = 0;
    const char *lock_location = NULL;
    SmallQueue<PendingError, 2> pending_q;
    uint32_t butex_value = 0;

    meta->mutex.lock();    
    if (meta->has_version(id_ver)) {
        data = meta->data;
        on_error = meta->on_error;
        on_error2 = meta->on_error2;
        first_ver = meta->first_ver;
        locked_ver = meta->locked_ver;
        unlockable_ver = meta->unlockable_ver();
        contended_ver = meta->contended_ver();
        lock_location = meta->lock_location;
        const size_t size = meta->pending_q.size();
        for (size_t i = 0; i < size; ++i) {
            PendingError front;
            meta->pending_q.pop(&front);
            meta->pending_q.push(front);
            pending_q.push(front);
        }
        butex_value = *butex;
    } else {
        valid = false;
    }
    meta->mutex.unlock();

    if (valid) {
        os << "First id: "
           << bthread::make_id(first_ver, bthread::get_slot(id)).value << '\n'
           << "Range: " << locked_ver - first_ver << '\n'
           << "Status: ";
        if (butex_value != first_ver) {
            os << "LOCKED at " << lock_location;
            if (butex_value == contended_ver) {
                os << " (CONTENDED)";
            } else if (butex_value == unlockable_ver) {
                os << " (ABOUT TO DESTROY)";
            } else {
                os << " (UNCONTENDED)";
            }
        } else {
            os << "UNLOCKED";
        }
        os << "\nPendingQ:";
        if (pending_q.empty()) {
            os << " EMPTY";
        } else {
            const size_t size = pending_q.size();
            for (size_t i = 0; i < size; ++i) {
                PendingError front;
                pending_q.pop(&front);
                os << " (" << front.location << "/E" << front.error_code
                   << '/' << front.error_text << ')';
            }
        }
        if (on_error) {
            if (on_error == default_bthread_id_on_error) {
                os << "\nOnError: unlock_and_destroy";
            } else {
                os << "\nOnError: " << (void*)on_error;
            }
        } else {
            if (on_error2 == default_bthread_id_on_error2) {
                os << "\nOnError2: unlock_and_destroy";
            } else {
                os << "\nOnError2: " << (void*)on_error2;
            }
        }
        os << "\nData: " << data;
    } else {
        os << "Invalid id=" << id.value;
    }
    os << '\n';
}

void id_pool_status(std::ostream &os) {
288
    os << butil::describe_resources<Id>() << '\n';
gejun's avatar
gejun committed
289 290 291 292 293 294
}

struct IdTraits {
    static const size_t BLOCK_SIZE = 63;
    static const size_t MAX_ENTRIES = 100000;
    static const bthread_id_t ID_INIT;
gejun's avatar
gejun committed
295 296
    static bool exists(bthread_id_t id)
    { return bthread::id_exists_with_true_negatives(id); }
gejun's avatar
gejun committed
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326
};
const bthread_id_t IdTraits::ID_INIT = INVALID_BTHREAD_ID;

typedef ListOfABAFreeId<bthread_id_t, IdTraits> IdList;

struct IdResetter {
    explicit IdResetter(int ec, const std::string& et)
        : _error_code(ec), _error_text(et) {}
    void operator()(bthread_id_t & id) const {
        bthread_id_error2_verbose(
            id, _error_code, _error_text, __FILE__ ":" BAIDU_SYMBOLSTR(__LINE__));
        id = INVALID_BTHREAD_ID;
    }
private:
    int _error_code;
    const std::string& _error_text;
};

size_t get_sizes(const bthread_id_list_t* list, size_t* cnt, size_t n) {
    if (list->impl == NULL) {
        return 0;
    }
    return static_cast<bthread::IdList*>(list->impl)->get_sizes(cnt, n);
}

const int ID_MAX_RANGE = 1024;

static int id_create_impl(
    bthread_id_t* id, void* data,
    int (*on_error)(bthread_id_t, void*, int),
gejun's avatar
gejun committed
327
    int (*on_error2)(bthread_id_t, void*, int, const std::string&)) {
gejun's avatar
gejun committed
328 329
    IdResourceId slot;
    Id* const meta = get_resource(&slot);
330
    if (meta) {
gejun's avatar
gejun committed
331 332 333 334
        meta->data = data;
        meta->on_error = on_error;
        meta->on_error2 = on_error2;
        CHECK(meta->pending_q.empty());
gejun's avatar
gejun committed
335
        uint32_t* butex = meta->butex;
gejun's avatar
gejun committed
336 337 338 339 340
        if (0 == *butex || *butex + ID_MAX_RANGE + 2 < *butex) {
            // Skip 0 so that bthread_id_t is never 0
            // avoid overflow to make comparisons simpler.
            *butex = 1;
        }
gejun's avatar
gejun committed
341
        *meta->join_butex = *butex;
gejun's avatar
gejun committed
342 343 344 345 346 347 348 349 350 351 352 353
        meta->first_ver = *butex;
        meta->locked_ver = *butex + 1;
        *id = make_id(*butex, slot);
        return 0;
    }
    return ENOMEM;
}

static int id_create_ranged_impl(
    bthread_id_t* id, void* data,
    int (*on_error)(bthread_id_t, void*, int),
    int (*on_error2)(bthread_id_t, void*, int, const std::string&),
gejun's avatar
gejun committed
354
    int range) {
355
    if (range < 1 || range > ID_MAX_RANGE) {
gejun's avatar
gejun committed
356 357 358 359 360 361 362
        LOG_IF(FATAL, range < 1) << "range must be positive, actually " << range;
        LOG_IF(FATAL, range > ID_MAX_RANGE ) << "max of range is " 
                << ID_MAX_RANGE << ", actually " << range;
        return EINVAL;
    }
    IdResourceId slot;
    Id* const meta = get_resource(&slot);
363
    if (meta) {
gejun's avatar
gejun committed
364 365 366 367
        meta->data = data;
        meta->on_error = on_error;
        meta->on_error2 = on_error2;
        CHECK(meta->pending_q.empty());
gejun's avatar
gejun committed
368
        uint32_t* butex = meta->butex;
gejun's avatar
gejun committed
369 370 371 372 373
        if (0 == *butex || *butex + ID_MAX_RANGE + 2 < *butex) {
            // Skip 0 so that bthread_id_t is never 0
            // avoid overflow to make comparisons simpler.
            *butex = 1;
        }
gejun's avatar
gejun committed
374
        *meta->join_butex = *butex;
gejun's avatar
gejun committed
375 376 377 378 379 380 381 382 383 384 385 386 387 388
        meta->first_ver = *butex;
        meta->locked_ver = *butex + range;
        *id = make_id(*butex, slot);
        return 0;
    }
    return ENOMEM;
}

}  // namespace bthread

extern "C" {

int bthread_id_create(
    bthread_id_t* id, void* data,
gejun's avatar
gejun committed
389
    int (*on_error)(bthread_id_t, void*, int)) {
gejun's avatar
gejun committed
390 391 392 393 394 395 396
    return bthread::id_create_impl(
        id, data,
        (on_error ? on_error : bthread::default_bthread_id_on_error), NULL);
}

int bthread_id_create_ranged(bthread_id_t* id, void* data,
                             int (*on_error)(bthread_id_t, void*, int),
gejun's avatar
gejun committed
397
                             int range) {
gejun's avatar
gejun committed
398 399 400 401 402 403 404
    return bthread::id_create_ranged_impl(
        id, data, 
        (on_error ? on_error : bthread::default_bthread_id_on_error),
        NULL, range);
}

int bthread_id_lock_and_reset_range_verbose(
gejun's avatar
gejun committed
405
    bthread_id_t id, void **pdata, int range, const char *location) {
gejun's avatar
gejun committed
406
    bthread::Id* const meta = address_resource(bthread::get_slot(id));
407
    if (!meta) {
gejun's avatar
gejun committed
408 409 410
        return EINVAL;
    }
    const uint32_t id_ver = bthread::get_version(id);
gejun's avatar
gejun committed
411
    uint32_t* butex = meta->butex;
gejun's avatar
gejun committed
412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441
    bool ever_contended = false;
    meta->mutex.lock();
    while (meta->has_version(id_ver)) {
        if (*butex == meta->first_ver) {
            // contended locker always wakes up the butex at unlock.
            meta->lock_location = location;
            if (range == 0) {
                // fast path
            } else if (range < 0 ||
                       range > bthread::ID_MAX_RANGE ||
                       range + meta->first_ver <= meta->locked_ver) {
                LOG_IF(FATAL, range < 0) << "range must be positive, actually "
                                         << range;
                LOG_IF(FATAL, range > bthread::ID_MAX_RANGE)
                    << "max range is " << bthread::ID_MAX_RANGE
                    << ", actually " << range;
            } else {
                meta->locked_ver = meta->first_ver + range;
            }
            *butex = (ever_contended ? meta->contended_ver() : meta->locked_ver);
            meta->mutex.unlock();
            if (pdata) {
                *pdata = meta->data;
            }
            return 0;
        } else if (*butex != meta->unlockable_ver()) {
            *butex = meta->contended_ver();
            uint32_t expected_ver = *butex;
            meta->mutex.unlock();
            ever_contended = true;
442 443 444
            if (bthread::butex_wait(butex, expected_ver, NULL) < 0 &&
                errno != EWOULDBLOCK && errno != EINTR) {
                return errno;
gejun's avatar
gejun committed
445 446 447 448
            }
            meta->mutex.lock();
        } else { // bthread_id_about_to_destroy was called.
            meta->mutex.unlock();
gejun's avatar
gejun committed
449
            return EPERM;
gejun's avatar
gejun committed
450 451 452 453 454 455 456
        }
    }
    meta->mutex.unlock();
    return EINVAL;
}

int bthread_id_error_verbose(bthread_id_t id, int error_code, 
gejun's avatar
gejun committed
457
                             const char *location) {
gejun's avatar
gejun committed
458 459 460
    return bthread_id_error2_verbose(id, error_code, std::string(), location);
}

gejun's avatar
gejun committed
461
int bthread_id_about_to_destroy(bthread_id_t id) {
gejun's avatar
gejun committed
462
    bthread::Id* const meta = address_resource(bthread::get_slot(id));
463
    if (!meta) {
gejun's avatar
gejun committed
464 465 466
        return EINVAL;
    }
    const uint32_t id_ver = bthread::get_version(id);
gejun's avatar
gejun committed
467
    uint32_t* butex = meta->butex;
468
    meta->mutex.lock();
gejun's avatar
gejun committed
469
    if (!meta->has_version(id_ver)) {
470
        meta->mutex.unlock();
gejun's avatar
gejun committed
471 472 473
        return EINVAL;
    }
    if (*butex == meta->first_ver) {
474
        meta->mutex.unlock();
gejun's avatar
gejun committed
475 476 477 478 479
        LOG(FATAL) << "bthread_id=" << id.value << " is not locked!";
        return EPERM;
    }
    const bool contended = (*butex == meta->contended_ver());
    *butex = meta->unlockable_ver();
480
    meta->mutex.unlock();
gejun's avatar
gejun committed
481 482 483 484 485 486 487
    if (contended) {
        // wake up all waiting lockers.
        bthread::butex_wake_except(butex, 0);
    }
    return 0;
}

gejun's avatar
gejun committed
488
int bthread_id_cancel(bthread_id_t id) {
gejun's avatar
gejun committed
489
    bthread::Id* const meta = address_resource(bthread::get_slot(id));
490
    if (!meta) {
gejun's avatar
gejun committed
491 492
        return EINVAL;
    }
gejun's avatar
gejun committed
493
    uint32_t* butex = meta->butex;
gejun's avatar
gejun committed
494
    const uint32_t id_ver = bthread::get_version(id);
495
    meta->mutex.lock();
gejun's avatar
gejun committed
496
    if (!meta->has_version(id_ver)) {
497
        meta->mutex.unlock();
gejun's avatar
gejun committed
498 499 500
        return EINVAL;
    }
    if (*butex != meta->first_ver) {
501
        meta->mutex.unlock();
gejun's avatar
gejun committed
502 503 504 505 506
        return EPERM;
    }       
    *butex = meta->end_ver();
    meta->first_ver = *butex;
    meta->locked_ver = *butex;
507
    meta->mutex.unlock();
gejun's avatar
gejun committed
508 509 510 511
    return_resource(bthread::get_slot(id));
    return 0;
}

gejun's avatar
gejun committed
512
int bthread_id_join(bthread_id_t id) {
gejun's avatar
gejun committed
513 514
    const bthread::IdResourceId slot = bthread::get_slot(id);
    bthread::Id* const meta = address_resource(slot);
515
    if (!meta) {
516
        // The id is not created yet, this join is definitely wrong.
gejun's avatar
gejun committed
517 518 519
        return EINVAL;
    }
    const uint32_t id_ver = bthread::get_version(id);
gejun's avatar
gejun committed
520
    uint32_t* join_butex = meta->join_butex;
gejun's avatar
gejun committed
521 522 523 524 525
    while (1) {
        meta->mutex.lock();
        const bool has_ver = meta->has_version(id_ver);
        const uint32_t expected_ver = *join_butex;
        meta->mutex.unlock();
526
        if (!has_ver) {
gejun's avatar
gejun committed
527 528
            break;
        }
529 530 531 532
        if (bthread::butex_wait(join_butex, expected_ver, NULL) < 0 &&
            errno != EWOULDBLOCK && errno != EINTR) {
            return errno;
        }
gejun's avatar
gejun committed
533
    }
534
    return 0;
gejun's avatar
gejun committed
535 536
}

gejun's avatar
gejun committed
537
int bthread_id_trylock(bthread_id_t id, void** pdata) {
gejun's avatar
gejun committed
538
    bthread::Id* const meta = address_resource(bthread::get_slot(id));
539
    if (!meta) {
gejun's avatar
gejun committed
540 541
        return EINVAL;
    }
gejun's avatar
gejun committed
542
    uint32_t* butex = meta->butex;
gejun's avatar
gejun committed
543
    const uint32_t id_ver = bthread::get_version(id);
544
    meta->mutex.lock();
gejun's avatar
gejun committed
545
    if (!meta->has_version(id_ver)) {
546
        meta->mutex.unlock();
gejun's avatar
gejun committed
547 548 549
        return EINVAL;
    }
    if (*butex != meta->first_ver) {
550
        meta->mutex.unlock();
gejun's avatar
gejun committed
551 552 553
        return EBUSY;
    }
    *butex = meta->locked_ver;
554
    meta->mutex.unlock();
gejun's avatar
gejun committed
555 556 557 558 559 560 561
    if (pdata != NULL) {
        *pdata = meta->data;
    }
    return 0;
}

int bthread_id_lock_verbose(bthread_id_t id, void** pdata,
gejun's avatar
gejun committed
562
                            const char *location) {
gejun's avatar
gejun committed
563 564 565
    return bthread_id_lock_and_reset_range_verbose(id, pdata, 0, location);
}

gejun's avatar
gejun committed
566
int bthread_id_unlock(bthread_id_t id) {
gejun's avatar
gejun committed
567
    bthread::Id* const meta = address_resource(bthread::get_slot(id));
568
    if (!meta) {
gejun's avatar
gejun committed
569 570
        return EINVAL;
    }
gejun's avatar
gejun committed
571
    uint32_t* butex = meta->butex;
gejun's avatar
gejun committed
572 573 574
    // Release fence makes sure all changes made before signal visible to
    // woken-up waiters.
    const uint32_t id_ver = bthread::get_version(id);
575
    meta->mutex.lock();
gejun's avatar
gejun committed
576
    if (!meta->has_version(id_ver)) {
577
        meta->mutex.unlock();
gejun's avatar
gejun committed
578 579 580 581
        LOG(FATAL) << "Invalid bthread_id=" << id.value;
        return EINVAL;
    }
    if (*butex == meta->first_ver) {
582
        meta->mutex.unlock();
gejun's avatar
gejun committed
583 584 585 586 587 588
        LOG(FATAL) << "bthread_id=" << id.value << " is not locked!";
        return EPERM;
    }
    bthread::PendingError front;
    if (meta->pending_q.pop(&front)) {
        meta->lock_location = front.location;
589
        meta->mutex.unlock();
gejun's avatar
gejun committed
590 591 592 593 594 595 596 597 598
        if (meta->on_error) {
            return meta->on_error(front.id, meta->data, front.error_code);
        } else {
            return meta->on_error2(front.id, meta->data, front.error_code,
                                   front.error_text);
        }
    } else {
        const bool contended = (*butex == meta->contended_ver());
        *butex = meta->first_ver;
599
        meta->mutex.unlock();
gejun's avatar
gejun committed
600 601 602 603 604 605 606 607
        if (contended) {
            // We may wake up already-reused id, but that's OK.
            bthread::butex_wake(butex);
        }
        return 0; 
    }
}

gejun's avatar
gejun committed
608
int bthread_id_unlock_and_destroy(bthread_id_t id) {
gejun's avatar
gejun committed
609
    bthread::Id* const meta = address_resource(bthread::get_slot(id));
610
    if (!meta) {
gejun's avatar
gejun committed
611 612
        return EINVAL;
    }
gejun's avatar
gejun committed
613 614
    uint32_t* butex = meta->butex;
    uint32_t* join_butex = meta->join_butex;
gejun's avatar
gejun committed
615
    const uint32_t id_ver = bthread::get_version(id);
616
    meta->mutex.lock();
gejun's avatar
gejun committed
617
    if (!meta->has_version(id_ver)) {
618
        meta->mutex.unlock();
gejun's avatar
gejun committed
619 620 621 622
        LOG(FATAL) << "Invalid bthread_id=" << id.value;
        return EINVAL;
    }
    if (*butex == meta->first_ver) {
623
        meta->mutex.unlock();
gejun's avatar
gejun committed
624 625 626
        LOG(FATAL) << "bthread_id=" << id.value << " is not locked!";
        return EPERM;
    }
627 628 629 630 631
    const uint32_t next_ver = meta->end_ver();
    *butex = next_ver;
    *join_butex = next_ver;
    meta->first_ver = next_ver;
    meta->locked_ver = next_ver;
gejun's avatar
gejun committed
632
    meta->pending_q.clear();
633
    meta->mutex.unlock();
gejun's avatar
gejun committed
634 635 636 637 638 639 640 641 642
    // Notice that butex_wake* returns # of woken-up, not successful or not.
    bthread::butex_wake_except(butex, 0);
    bthread::butex_wake_all(join_butex);
    return_resource(bthread::get_slot(id));
    return 0;
}

int bthread_id_list_init(bthread_id_list_t* list,
                         unsigned /*size*/,
gejun's avatar
gejun committed
643
                         unsigned /*conflict_size*/) {
gejun's avatar
gejun committed
644 645 646 647 648 649 650 651 652
    list->impl = NULL;  // create on demand.
    // Set unused fields to zero as well.
    list->head = 0;
    list->size = 0;
    list->conflict_head = 0;
    list->conflict_size = 0;
    return 0;
}

gejun's avatar
gejun committed
653
void bthread_id_list_destroy(bthread_id_list_t* list) {
gejun's avatar
gejun committed
654 655 656 657
    delete static_cast<bthread::IdList*>(list->impl);
    list->impl = NULL;
}

gejun's avatar
gejun committed
658
int bthread_id_list_add(bthread_id_list_t* list, bthread_id_t id) {
gejun's avatar
gejun committed
659 660 661 662 663 664 665 666 667
    if (list->impl == NULL) {
        list->impl = new (std::nothrow) bthread::IdList;
        if (NULL == list->impl) {
            return ENOMEM;
        }
    }
    return static_cast<bthread::IdList*>(list->impl)->add(id);
}

gejun's avatar
gejun committed
668
int bthread_id_list_reset(bthread_id_list_t* list, int error_code) {
gejun's avatar
gejun committed
669 670 671 672
    return bthread_id_list_reset2(list, error_code, std::string());
}

void bthread_id_list_swap(bthread_id_list_t* list1, 
gejun's avatar
gejun committed
673
                          bthread_id_list_t* list2) {
gejun's avatar
gejun committed
674 675 676 677
    std::swap(list1->impl, list2->impl);
}

int bthread_id_list_reset_pthreadsafe(bthread_id_list_t* list, int error_code,
gejun's avatar
gejun committed
678
                                       pthread_mutex_t* mutex) {
gejun's avatar
gejun committed
679 680 681 682 683
    return bthread_id_list_reset2_pthreadsafe(
        list, error_code, std::string(), mutex);
}

int bthread_id_list_reset_bthreadsafe(bthread_id_list_t* list, int error_code,
gejun's avatar
gejun committed
684
                                      bthread_mutex_t* mutex) {
gejun's avatar
gejun committed
685 686 687 688 689 690 691 692
    return bthread_id_list_reset2_bthreadsafe(
        list, error_code, std::string(), mutex);
}

}  // extern "C"

int bthread_id_create2(
    bthread_id_t* id, void* data,
gejun's avatar
gejun committed
693
    int (*on_error)(bthread_id_t, void*, int, const std::string&)) {
gejun's avatar
gejun committed
694 695 696 697 698 699 700 701
    return bthread::id_create_impl(
        id, data, NULL,
        (on_error ? on_error : bthread::default_bthread_id_on_error2));
}

int bthread_id_create2_ranged(
    bthread_id_t* id, void* data,
    int (*on_error)(bthread_id_t, void*, int, const std::string&),
gejun's avatar
gejun committed
702
    int range) {
gejun's avatar
gejun committed
703 704 705 706 707 708 709
    return bthread::id_create_ranged_impl(
        id, data, NULL,
        (on_error ? on_error : bthread::default_bthread_id_on_error2), range);
}

int bthread_id_error2_verbose(bthread_id_t id, int error_code,
                              const std::string& error_text,
gejun's avatar
gejun committed
710
                              const char *location) {
gejun's avatar
gejun committed
711
    bthread::Id* const meta = address_resource(bthread::get_slot(id));
712
    if (!meta) {
gejun's avatar
gejun committed
713 714 715
        return EINVAL;
    }
    const uint32_t id_ver = bthread::get_version(id);
gejun's avatar
gejun committed
716
    uint32_t* butex = meta->butex;
717
    meta->mutex.lock();
gejun's avatar
gejun committed
718
    if (!meta->has_version(id_ver)) {
719
        meta->mutex.unlock();
gejun's avatar
gejun committed
720 721 722 723 724
        return EINVAL;
    }
    if (*butex == meta->first_ver) {
        *butex = meta->locked_ver;
        meta->lock_location = location;
725
        meta->mutex.unlock();
gejun's avatar
gejun committed
726 727 728 729 730 731 732 733 734 735 736 737
        if (meta->on_error) {
            return meta->on_error(id, meta->data, error_code);
        } else {
            return meta->on_error2(id, meta->data, error_code, error_text);
        }
    } else {
        bthread::PendingError e;
        e.id = id;
        e.error_code = error_code;
        e.error_text = error_text;
        e.location = location;
        meta->pending_q.push(e);
738
        meta->mutex.unlock();
gejun's avatar
gejun committed
739 740 741 742 743 744
        return 0;
    }
}

int bthread_id_list_reset2(bthread_id_list_t* list,
                           int error_code,
gejun's avatar
gejun committed
745
                           const std::string& error_text) {
gejun's avatar
gejun committed
746 747 748 749 750 751 752 753 754 755
    if (list->impl != NULL) {
        static_cast<bthread::IdList*>(list->impl)->apply(
            bthread::IdResetter(error_code, error_text));
    }
    return 0;
}

int bthread_id_list_reset2_pthreadsafe(bthread_id_list_t* list,
                                       int error_code,
                                       const std::string& error_text,
gejun's avatar
gejun committed
756
                                       pthread_mutex_t* mutex) {
gejun's avatar
gejun committed
757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779
    if (mutex == NULL) {
        return EINVAL;
    }
    if (list->impl == NULL) {
        return 0;
    }
    bthread_id_list_t tmplist;
    const int rc = bthread_id_list_init(&tmplist, 0, 0);
    if (rc != 0) {
        return rc;
    }
    // Swap out the list then reset. The critical section is very small.
    pthread_mutex_lock(mutex);
    std::swap(list->impl, tmplist.impl);
    pthread_mutex_unlock(mutex);
    const int rc2 = bthread_id_list_reset2(&tmplist, error_code, error_text);
    bthread_id_list_destroy(&tmplist);
    return rc2;
}

int bthread_id_list_reset2_bthreadsafe(bthread_id_list_t* list,
                                       int error_code,
                                       const std::string& error_text,
gejun's avatar
gejun committed
780
                                       bthread_mutex_t* mutex) {
gejun's avatar
gejun committed
781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799
    if (mutex == NULL) {
        return EINVAL;
    }
    if (list->impl == NULL) {
        return 0;
    }
    bthread_id_list_t tmplist;
    const int rc = bthread_id_list_init(&tmplist, 0, 0);
    if (rc != 0) {
        return rc;
    }
    // Swap out the list then reset. The critical section is very small.
    bthread_mutex_lock(mutex);
    std::swap(list->impl, tmplist.impl);
    bthread_mutex_unlock(mutex);
    const int rc2 = bthread_id_list_reset2(&tmplist, error_code, error_text);
    bthread_id_list_destroy(&tmplist);
    return rc2;
}