id.cpp 24.9 KB
Newer Older
gejun's avatar
gejun committed
1
// bthread - A M:N threading library to make applications more concurrent.
gejun's avatar
gejun committed
2 3 4 5 6 7 8 9 10 11 12 13 14
// Copyright (c) 2014 baidu-rpc authors.
// 
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// 
//     http://www.apache.org/licenses/LICENSE-2.0
// 
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
gejun's avatar
gejun committed
15 16 17 18 19 20 21

// Author: Ge,Jun (gejun@baidu.com)
// Date: Sun Aug  3 12:46:15 CST 2014

#include <deque>
#include "base/logging.h"
#include "bthread/butex.h"                       // butex_*
22
#include "bthread/mutex.h"
gejun's avatar
gejun committed
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
#include "bthread/list_of_abafree_id.h"
#include "base/resource_pool.h"
#include "bthread/bthread.h"

namespace bthread {

// This queue reduces the chance to allocate memory for deque
template <typename T, int N>
class SmallQueue {
public:
    SmallQueue() : _begin(0), _size(0), _full(NULL) {}
    
    void push(const T& val) {
        if (_full != NULL && !_full->empty()) {
            _full->push_back(val);
        } else if (_size < N) {
            int tail = _begin + _size;
            if (tail >= N) {
                tail -= N;
            }
            _c[tail] = val;
            ++_size;
        } else {
            if (_full == NULL) {
                _full = new std::deque<T>;
            }
            _full->push_back(val);
        }
    }
    bool pop(T* val) {
        if (_size > 0) {
            *val = _c[_begin];
            ++_begin;
            if (_begin >= N) {
                _begin -= N;
            }
            --_size;
            return true;
        } else if (_full && !_full->empty()) {
            *val = _full->front();
            _full->pop_front();
            return true;
        }
        return false;
    }
    bool empty() const {
        return _size == 0 && (_full == NULL || _full->empty());
    }

    size_t size() const {
        return _size + (_full ? _full->size() : 0);
    }

    void clear() {
        _size = 0;
        _begin = 0;
        if (_full) {
            _full->clear();
        }
    }

    ~SmallQueue() {
        delete _full;
        _full = NULL;
    }
    
private:
    DISALLOW_COPY_AND_ASSIGN(SmallQueue);
    
    int _begin;
    int _size;
    T _c[N];
    std::deque<T>* _full;
};

struct PendingError {
    bthread_id_t id;
    int error_code;
    std::string error_text;
    const char *location;

    PendingError() : id(INVALID_BTHREAD_ID), error_code(0), location(NULL) {}
};

struct BAIDU_CACHELINE_ALIGNMENT Id {
    // first_ver ~ locked_ver - 1: unlocked versions
    // locked_ver: locked
    // unlockable_ver: locked and about to be destroyed
    // contended_ver: locked and contended
    uint32_t first_ver;
    uint32_t locked_ver;
114
    internal::FastPthreadMutex mutex;
gejun's avatar
gejun committed
115 116 117
    void* data;
    int (*on_error)(bthread_id_t, void*, int);
    int (*on_error2)(bthread_id_t, void*, int, const std::string&);
gejun's avatar
gejun committed
118
    const char *lock_location;
gejun's avatar
gejun committed
119 120
    uint32_t* butex;
    uint32_t* join_butex;
gejun's avatar
gejun committed
121 122 123 124 125
    SmallQueue<PendingError, 2> pending_q;
    
    Id() {
        // Although value of the butex(as version part of bthread_id_t)
        // does not matter, we set it to 0 to make program more deterministic.
gejun's avatar
gejun committed
126 127 128 129
        butex = bthread::butex_create_checked<uint32_t>();
        join_butex = bthread::butex_create_checked<uint32_t>();
        *butex = 0;
        *join_butex = 0;
gejun's avatar
gejun committed
130 131 132
    }

    ~Id() {
gejun's avatar
gejun committed
133 134
        bthread::butex_destroy(butex);
        bthread::butex_destroy(join_butex);
gejun's avatar
gejun committed
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
    }

    inline bool has_version(uint32_t id_ver) const {
        return id_ver >= first_ver && id_ver < locked_ver;
    }
    inline uint32_t contended_ver() const { return locked_ver + 1; }
    inline uint32_t unlockable_ver() const { return locked_ver + 2; }
    inline uint32_t last_ver() const { return unlockable_ver(); }
    
    // also the next "first_ver"
    inline uint32_t end_ver() const { return last_ver() + 1; }
};

BAIDU_CASSERT(sizeof(Id) % 64 == 0, sizeof_Id_must_align);

typedef base::ResourceId<Id> IdResourceId;

inline bthread_id_t make_id(uint32_t version, IdResourceId slot) {
    const bthread_id_t tmp =
        { (((uint64_t)slot.value) << 32) | (uint64_t)version };
    return tmp;
}

inline IdResourceId get_slot(bthread_id_t id) {
    const IdResourceId tmp = { (id.value >> 32) };
    return tmp;
}

inline uint32_t get_version(bthread_id_t id) {
    return (uint32_t)(id.value & 0xFFFFFFFFul);
}

gejun's avatar
gejun committed
167
inline bool id_exists_with_true_negatives(bthread_id_t id) {
gejun's avatar
gejun committed
168 169 170 171 172
    Id* const meta = address_resource(get_slot(id));
    if (meta == NULL) {
        return false;
    }
    const uint32_t id_ver = bthread::get_version(id);
gejun's avatar
gejun committed
173
    return id_ver >= meta->first_ver && id_ver <= meta->last_ver();
gejun's avatar
gejun committed
174 175 176 177 178
}
// required by unittest
uint32_t id_value(bthread_id_t id) {
    Id* const meta = address_resource(get_slot(id));
    if (meta != NULL) {
gejun's avatar
gejun committed
179
        return *meta->butex;
gejun's avatar
gejun committed
180 181 182 183 184 185 186 187 188 189 190 191 192 193
    }
    return 0;  // valid version never be zero
}

static int default_bthread_id_on_error(bthread_id_t id, void*, int) {
    return bthread_id_unlock_and_destroy(id);
}
static int default_bthread_id_on_error2(
    bthread_id_t id, void*, int, const std::string&) {
    return bthread_id_unlock_and_destroy(id);
}

void id_status(bthread_id_t id, std::ostream &os) {
    bthread::Id* const meta = address_resource(bthread::get_slot(id));
194
    if (!meta) {
gejun's avatar
gejun committed
195 196 197 198
        os << "Invalid id=" << id.value << '\n';
        return;
    }
    const uint32_t id_ver = bthread::get_version(id);
gejun's avatar
gejun committed
199
    uint32_t* butex = meta->butex;
gejun's avatar
gejun committed
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291
    bool valid = true;
    void* data = NULL;
    int (*on_error)(bthread_id_t, void*, int) = NULL;
    int (*on_error2)(bthread_id_t, void*, int, const std::string&) = NULL;
    uint32_t first_ver = 0;
    uint32_t locked_ver = 0;
    uint32_t unlockable_ver = 0;
    uint32_t contended_ver = 0;
    const char *lock_location = NULL;
    SmallQueue<PendingError, 2> pending_q;
    uint32_t butex_value = 0;

    meta->mutex.lock();    
    if (meta->has_version(id_ver)) {
        data = meta->data;
        on_error = meta->on_error;
        on_error2 = meta->on_error2;
        first_ver = meta->first_ver;
        locked_ver = meta->locked_ver;
        unlockable_ver = meta->unlockable_ver();
        contended_ver = meta->contended_ver();
        lock_location = meta->lock_location;
        const size_t size = meta->pending_q.size();
        for (size_t i = 0; i < size; ++i) {
            PendingError front;
            meta->pending_q.pop(&front);
            meta->pending_q.push(front);
            pending_q.push(front);
        }
        butex_value = *butex;
    } else {
        valid = false;
    }
    meta->mutex.unlock();

    if (valid) {
        os << "First id: "
           << bthread::make_id(first_ver, bthread::get_slot(id)).value << '\n'
           << "Range: " << locked_ver - first_ver << '\n'
           << "Status: ";
        if (butex_value != first_ver) {
            os << "LOCKED at " << lock_location;
            if (butex_value == contended_ver) {
                os << " (CONTENDED)";
            } else if (butex_value == unlockable_ver) {
                os << " (ABOUT TO DESTROY)";
            } else {
                os << " (UNCONTENDED)";
            }
        } else {
            os << "UNLOCKED";
        }
        os << "\nPendingQ:";
        if (pending_q.empty()) {
            os << " EMPTY";
        } else {
            const size_t size = pending_q.size();
            for (size_t i = 0; i < size; ++i) {
                PendingError front;
                pending_q.pop(&front);
                os << " (" << front.location << "/E" << front.error_code
                   << '/' << front.error_text << ')';
            }
        }
        if (on_error) {
            if (on_error == default_bthread_id_on_error) {
                os << "\nOnError: unlock_and_destroy";
            } else {
                os << "\nOnError: " << (void*)on_error;
            }
        } else {
            if (on_error2 == default_bthread_id_on_error2) {
                os << "\nOnError2: unlock_and_destroy";
            } else {
                os << "\nOnError2: " << (void*)on_error2;
            }
        }
        os << "\nData: " << data;
    } else {
        os << "Invalid id=" << id.value;
    }
    os << '\n';
}

void id_pool_status(std::ostream &os) {
    os << base::describe_resources<Id>() << '\n';
}

struct IdTraits {
    static const size_t BLOCK_SIZE = 63;
    static const size_t MAX_ENTRIES = 100000;
    static const bthread_id_t ID_INIT;
gejun's avatar
gejun committed
292 293
    static bool exists(bthread_id_t id)
    { return bthread::id_exists_with_true_negatives(id); }
gejun's avatar
gejun committed
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326
};
const bthread_id_t IdTraits::ID_INIT = INVALID_BTHREAD_ID;

typedef ListOfABAFreeId<bthread_id_t, IdTraits> IdList;

struct IdResetter {
    explicit IdResetter(int ec, const std::string& et)
        : _error_code(ec), _error_text(et) {}
    void operator()(bthread_id_t & id) const {
        bthread_id_error2_verbose(
            id, _error_code, _error_text, __FILE__ ":" BAIDU_SYMBOLSTR(__LINE__));
        id = INVALID_BTHREAD_ID;
    }
private:
    int _error_code;
    const std::string& _error_text;
};

size_t get_sizes(const bthread_id_list_t* list, size_t* cnt, size_t n) {
    if (list->impl == NULL) {
        return 0;
    }
    return static_cast<bthread::IdList*>(list->impl)->get_sizes(cnt, n);
}

const int ID_MAX_RANGE = 1024;

static int id_create_impl(
    bthread_id_t* id, void* data,
    int (*on_error)(bthread_id_t, void*, int),
    int (*on_error2)(bthread_id_t, void*, int, const std::string&)) __THROW {
    IdResourceId slot;
    Id* const meta = get_resource(&slot);
327
    if (meta) {
gejun's avatar
gejun committed
328 329 330 331
        meta->data = data;
        meta->on_error = on_error;
        meta->on_error2 = on_error2;
        CHECK(meta->pending_q.empty());
gejun's avatar
gejun committed
332
        uint32_t* butex = meta->butex;
gejun's avatar
gejun committed
333 334 335 336 337
        if (0 == *butex || *butex + ID_MAX_RANGE + 2 < *butex) {
            // Skip 0 so that bthread_id_t is never 0
            // avoid overflow to make comparisons simpler.
            *butex = 1;
        }
gejun's avatar
gejun committed
338
        *meta->join_butex = *butex;
gejun's avatar
gejun committed
339 340 341 342 343 344 345 346 347 348 349 350 351
        meta->first_ver = *butex;
        meta->locked_ver = *butex + 1;
        *id = make_id(*butex, slot);
        return 0;
    }
    return ENOMEM;
}

static int id_create_ranged_impl(
    bthread_id_t* id, void* data,
    int (*on_error)(bthread_id_t, void*, int),
    int (*on_error2)(bthread_id_t, void*, int, const std::string&),
    int range) __THROW {
352
    if (range < 1 || range > ID_MAX_RANGE) {
gejun's avatar
gejun committed
353 354 355 356 357 358 359
        LOG_IF(FATAL, range < 1) << "range must be positive, actually " << range;
        LOG_IF(FATAL, range > ID_MAX_RANGE ) << "max of range is " 
                << ID_MAX_RANGE << ", actually " << range;
        return EINVAL;
    }
    IdResourceId slot;
    Id* const meta = get_resource(&slot);
360
    if (meta) {
gejun's avatar
gejun committed
361 362 363 364
        meta->data = data;
        meta->on_error = on_error;
        meta->on_error2 = on_error2;
        CHECK(meta->pending_q.empty());
gejun's avatar
gejun committed
365
        uint32_t* butex = meta->butex;
gejun's avatar
gejun committed
366 367 368 369 370
        if (0 == *butex || *butex + ID_MAX_RANGE + 2 < *butex) {
            // Skip 0 so that bthread_id_t is never 0
            // avoid overflow to make comparisons simpler.
            *butex = 1;
        }
gejun's avatar
gejun committed
371
        *meta->join_butex = *butex;
gejun's avatar
gejun committed
372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403
        meta->first_ver = *butex;
        meta->locked_ver = *butex + range;
        *id = make_id(*butex, slot);
        return 0;
    }
    return ENOMEM;
}

}  // namespace bthread

extern "C" {

int bthread_id_create(
    bthread_id_t* id, void* data,
    int (*on_error)(bthread_id_t, void*, int)) __THROW {
    return bthread::id_create_impl(
        id, data,
        (on_error ? on_error : bthread::default_bthread_id_on_error), NULL);
}

int bthread_id_create_ranged(bthread_id_t* id, void* data,
                             int (*on_error)(bthread_id_t, void*, int),
                             int range) __THROW {
    return bthread::id_create_ranged_impl(
        id, data, 
        (on_error ? on_error : bthread::default_bthread_id_on_error),
        NULL, range);
}

int bthread_id_lock_and_reset_range_verbose(
    bthread_id_t id, void **pdata, int range, const char *location) __THROW {
    bthread::Id* const meta = address_resource(bthread::get_slot(id));
404
    if (!meta) {
gejun's avatar
gejun committed
405 406 407
        return EINVAL;
    }
    const uint32_t id_ver = bthread::get_version(id);
gejun's avatar
gejun committed
408
    uint32_t* butex = meta->butex;
gejun's avatar
gejun committed
409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446
    bool ever_contended = false;
    meta->mutex.lock();
    while (meta->has_version(id_ver)) {
        if (*butex == meta->first_ver) {
            // contended locker always wakes up the butex at unlock.
            meta->lock_location = location;
            if (range == 0) {
                // fast path
            } else if (range < 0 ||
                       range > bthread::ID_MAX_RANGE ||
                       range + meta->first_ver <= meta->locked_ver) {
                LOG_IF(FATAL, range < 0) << "range must be positive, actually "
                                         << range;
                LOG_IF(FATAL, range > bthread::ID_MAX_RANGE)
                    << "max range is " << bthread::ID_MAX_RANGE
                    << ", actually " << range;
            } else {
                meta->locked_ver = meta->first_ver + range;
            }
            *butex = (ever_contended ? meta->contended_ver() : meta->locked_ver);
            meta->mutex.unlock();
            if (pdata) {
                *pdata = meta->data;
            }
            return 0;
        } else if (*butex != meta->unlockable_ver()) {
            *butex = meta->contended_ver();
            uint32_t expected_ver = *butex;
            meta->mutex.unlock();
            ever_contended = true;
            if (bthread::butex_wait(butex, expected_ver, NULL) < 0) {
                if (errno != EWOULDBLOCK && errno != ESTOP) {
                    return errno;
                }
            }
            meta->mutex.lock();
        } else { // bthread_id_about_to_destroy was called.
            meta->mutex.unlock();
gejun's avatar
gejun committed
447
            return EPERM;
gejun's avatar
gejun committed
448 449 450 451 452 453 454 455 456 457 458 459 460
        }
    }
    meta->mutex.unlock();
    return EINVAL;
}

int bthread_id_error_verbose(bthread_id_t id, int error_code, 
                             const char *location) __THROW {
    return bthread_id_error2_verbose(id, error_code, std::string(), location);
}

int bthread_id_about_to_destroy(bthread_id_t id) __THROW {
    bthread::Id* const meta = address_resource(bthread::get_slot(id));
461
    if (!meta) {
gejun's avatar
gejun committed
462 463 464
        return EINVAL;
    }
    const uint32_t id_ver = bthread::get_version(id);
gejun's avatar
gejun committed
465
    uint32_t* butex = meta->butex;
466
    meta->mutex.lock();
gejun's avatar
gejun committed
467
    if (!meta->has_version(id_ver)) {
468
        meta->mutex.unlock();
gejun's avatar
gejun committed
469 470 471
        return EINVAL;
    }
    if (*butex == meta->first_ver) {
472
        meta->mutex.unlock();
gejun's avatar
gejun committed
473 474 475 476 477
        LOG(FATAL) << "bthread_id=" << id.value << " is not locked!";
        return EPERM;
    }
    const bool contended = (*butex == meta->contended_ver());
    *butex = meta->unlockable_ver();
478
    meta->mutex.unlock();
gejun's avatar
gejun committed
479 480 481 482 483 484 485 486 487
    if (contended) {
        // wake up all waiting lockers.
        bthread::butex_wake_except(butex, 0);
    }
    return 0;
}

int bthread_id_cancel(bthread_id_t id) __THROW {
    bthread::Id* const meta = address_resource(bthread::get_slot(id));
488
    if (!meta) {
gejun's avatar
gejun committed
489 490
        return EINVAL;
    }
gejun's avatar
gejun committed
491
    uint32_t* butex = meta->butex;
gejun's avatar
gejun committed
492
    const uint32_t id_ver = bthread::get_version(id);
493
    meta->mutex.lock();
gejun's avatar
gejun committed
494
    if (!meta->has_version(id_ver)) {
495
        meta->mutex.unlock();
gejun's avatar
gejun committed
496 497 498
        return EINVAL;
    }
    if (*butex != meta->first_ver) {
499
        meta->mutex.unlock();
gejun's avatar
gejun committed
500 501 502 503 504
        return EPERM;
    }       
    *butex = meta->end_ver();
    meta->first_ver = *butex;
    meta->locked_ver = *butex;
505
    meta->mutex.unlock();
gejun's avatar
gejun committed
506 507 508 509 510 511 512
    return_resource(bthread::get_slot(id));
    return 0;
}

int bthread_id_join(bthread_id_t id) __THROW {
    const bthread::IdResourceId slot = bthread::get_slot(id);
    bthread::Id* const meta = address_resource(slot);
513
    if (!meta) {
gejun's avatar
gejun committed
514 515 516
        return EINVAL;
    }
    const uint32_t id_ver = bthread::get_version(id);
gejun's avatar
gejun committed
517
    uint32_t* join_butex = meta->join_butex;
gejun's avatar
gejun committed
518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541
    bool stopped = false;
    while (1) {
        meta->mutex.lock();
        const bool has_ver = meta->has_version(id_ver);
        const uint32_t expected_ver = *join_butex;
        meta->mutex.unlock();
        if (has_ver) {
            if (bthread::butex_wait(join_butex, expected_ver, NULL) < 0) {
                if (errno != EWOULDBLOCK && errno != ESTOP) {
                    return errno;
                }
                if (errno == ESTOP) {
                    stopped = true;
                }
            }
        } else {
            break;
        }
    }
    return stopped ? ESTOP : 0;
}

int bthread_id_trylock(bthread_id_t id, void** pdata) __THROW {
    bthread::Id* const meta = address_resource(bthread::get_slot(id));
542
    if (!meta) {
gejun's avatar
gejun committed
543 544
        return EINVAL;
    }
gejun's avatar
gejun committed
545
    uint32_t* butex = meta->butex;
gejun's avatar
gejun committed
546
    const uint32_t id_ver = bthread::get_version(id);
547
    meta->mutex.lock();
gejun's avatar
gejun committed
548
    if (!meta->has_version(id_ver)) {
549
        meta->mutex.unlock();
gejun's avatar
gejun committed
550 551 552
        return EINVAL;
    }
    if (*butex != meta->first_ver) {
553
        meta->mutex.unlock();
gejun's avatar
gejun committed
554 555 556
        return EBUSY;
    }
    *butex = meta->locked_ver;
557
    meta->mutex.unlock();
gejun's avatar
gejun committed
558 559 560 561 562 563 564 565 566 567 568 569 570
    if (pdata != NULL) {
        *pdata = meta->data;
    }
    return 0;
}

int bthread_id_lock_verbose(bthread_id_t id, void** pdata,
                            const char *location) __THROW {
    return bthread_id_lock_and_reset_range_verbose(id, pdata, 0, location);
}

int bthread_id_unlock(bthread_id_t id) __THROW {
    bthread::Id* const meta = address_resource(bthread::get_slot(id));
571
    if (!meta) {
gejun's avatar
gejun committed
572 573
        return EINVAL;
    }
gejun's avatar
gejun committed
574
    uint32_t* butex = meta->butex;
gejun's avatar
gejun committed
575 576 577
    // Release fence makes sure all changes made before signal visible to
    // woken-up waiters.
    const uint32_t id_ver = bthread::get_version(id);
578
    meta->mutex.lock();
gejun's avatar
gejun committed
579
    if (!meta->has_version(id_ver)) {
580
        meta->mutex.unlock();
gejun's avatar
gejun committed
581 582 583 584
        LOG(FATAL) << "Invalid bthread_id=" << id.value;
        return EINVAL;
    }
    if (*butex == meta->first_ver) {
585
        meta->mutex.unlock();
gejun's avatar
gejun committed
586 587 588 589 590 591
        LOG(FATAL) << "bthread_id=" << id.value << " is not locked!";
        return EPERM;
    }
    bthread::PendingError front;
    if (meta->pending_q.pop(&front)) {
        meta->lock_location = front.location;
592
        meta->mutex.unlock();
gejun's avatar
gejun committed
593 594 595 596 597 598 599 600 601
        if (meta->on_error) {
            return meta->on_error(front.id, meta->data, front.error_code);
        } else {
            return meta->on_error2(front.id, meta->data, front.error_code,
                                   front.error_text);
        }
    } else {
        const bool contended = (*butex == meta->contended_ver());
        *butex = meta->first_ver;
602
        meta->mutex.unlock();
gejun's avatar
gejun committed
603 604 605 606 607 608 609 610 611 612
        if (contended) {
            // We may wake up already-reused id, but that's OK.
            bthread::butex_wake(butex);
        }
        return 0; 
    }
}

int bthread_id_unlock_and_destroy(bthread_id_t id) __THROW {
    bthread::Id* const meta = address_resource(bthread::get_slot(id));
613
    if (!meta) {
gejun's avatar
gejun committed
614 615
        return EINVAL;
    }
gejun's avatar
gejun committed
616 617
    uint32_t* butex = meta->butex;
    uint32_t* join_butex = meta->join_butex;
gejun's avatar
gejun committed
618
    const uint32_t id_ver = bthread::get_version(id);
619
    meta->mutex.lock();
gejun's avatar
gejun committed
620
    if (!meta->has_version(id_ver)) {
621
        meta->mutex.unlock();
gejun's avatar
gejun committed
622 623 624 625
        LOG(FATAL) << "Invalid bthread_id=" << id.value;
        return EINVAL;
    }
    if (*butex == meta->first_ver) {
626
        meta->mutex.unlock();
gejun's avatar
gejun committed
627 628 629
        LOG(FATAL) << "bthread_id=" << id.value << " is not locked!";
        return EPERM;
    }
630 631 632 633 634
    const uint32_t next_ver = meta->end_ver();
    *butex = next_ver;
    *join_butex = next_ver;
    meta->first_ver = next_ver;
    meta->locked_ver = next_ver;
gejun's avatar
gejun committed
635
    meta->pending_q.clear();
636
    meta->mutex.unlock();
gejun's avatar
gejun committed
637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714
    // Notice that butex_wake* returns # of woken-up, not successful or not.
    bthread::butex_wake_except(butex, 0);
    bthread::butex_wake_all(join_butex);
    return_resource(bthread::get_slot(id));
    return 0;
}

int bthread_id_list_init(bthread_id_list_t* list,
                         unsigned /*size*/,
                         unsigned /*conflict_size*/) __THROW {
    list->impl = NULL;  // create on demand.
    // Set unused fields to zero as well.
    list->head = 0;
    list->size = 0;
    list->conflict_head = 0;
    list->conflict_size = 0;
    return 0;
}

void bthread_id_list_destroy(bthread_id_list_t* list) __THROW {
    delete static_cast<bthread::IdList*>(list->impl);
    list->impl = NULL;
}

int bthread_id_list_add(bthread_id_list_t* list, bthread_id_t id) __THROW {
    if (list->impl == NULL) {
        list->impl = new (std::nothrow) bthread::IdList;
        if (NULL == list->impl) {
            return ENOMEM;
        }
    }
    return static_cast<bthread::IdList*>(list->impl)->add(id);
}

int bthread_id_list_reset(bthread_id_list_t* list, int error_code) __THROW {
    return bthread_id_list_reset2(list, error_code, std::string());
}

void bthread_id_list_swap(bthread_id_list_t* list1, 
                          bthread_id_list_t* list2) __THROW {
    std::swap(list1->impl, list2->impl);
}

int bthread_id_list_reset_pthreadsafe(bthread_id_list_t* list, int error_code,
                                       pthread_mutex_t* mutex) __THROW {
    return bthread_id_list_reset2_pthreadsafe(
        list, error_code, std::string(), mutex);
}

int bthread_id_list_reset_bthreadsafe(bthread_id_list_t* list, int error_code,
                                      bthread_mutex_t* mutex) __THROW {
    return bthread_id_list_reset2_bthreadsafe(
        list, error_code, std::string(), mutex);
}

}  // extern "C"

int bthread_id_create2(
    bthread_id_t* id, void* data,
    int (*on_error)(bthread_id_t, void*, int, const std::string&)) __THROW {
    return bthread::id_create_impl(
        id, data, NULL,
        (on_error ? on_error : bthread::default_bthread_id_on_error2));
}

int bthread_id_create2_ranged(
    bthread_id_t* id, void* data,
    int (*on_error)(bthread_id_t, void*, int, const std::string&),
    int range) __THROW {
    return bthread::id_create_ranged_impl(
        id, data, NULL,
        (on_error ? on_error : bthread::default_bthread_id_on_error2), range);
}

int bthread_id_error2_verbose(bthread_id_t id, int error_code,
                              const std::string& error_text,
                              const char *location) __THROW {
    bthread::Id* const meta = address_resource(bthread::get_slot(id));
715
    if (!meta) {
gejun's avatar
gejun committed
716 717 718
        return EINVAL;
    }
    const uint32_t id_ver = bthread::get_version(id);
gejun's avatar
gejun committed
719
    uint32_t* butex = meta->butex;
720
    meta->mutex.lock();
gejun's avatar
gejun committed
721
    if (!meta->has_version(id_ver)) {
722
        meta->mutex.unlock();
gejun's avatar
gejun committed
723 724 725 726 727
        return EINVAL;
    }
    if (*butex == meta->first_ver) {
        *butex = meta->locked_ver;
        meta->lock_location = location;
728
        meta->mutex.unlock();
gejun's avatar
gejun committed
729 730 731 732 733 734 735 736 737 738 739 740
        if (meta->on_error) {
            return meta->on_error(id, meta->data, error_code);
        } else {
            return meta->on_error2(id, meta->data, error_code, error_text);
        }
    } else {
        bthread::PendingError e;
        e.id = id;
        e.error_code = error_code;
        e.error_text = error_text;
        e.location = location;
        meta->pending_q.push(e);
741
        meta->mutex.unlock();
gejun's avatar
gejun committed
742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802
        return 0;
    }
}

int bthread_id_list_reset2(bthread_id_list_t* list,
                           int error_code,
                           const std::string& error_text) __THROW {
    if (list->impl != NULL) {
        static_cast<bthread::IdList*>(list->impl)->apply(
            bthread::IdResetter(error_code, error_text));
    }
    return 0;
}

int bthread_id_list_reset2_pthreadsafe(bthread_id_list_t* list,
                                       int error_code,
                                       const std::string& error_text,
                                       pthread_mutex_t* mutex) __THROW {
    if (mutex == NULL) {
        return EINVAL;
    }
    if (list->impl == NULL) {
        return 0;
    }
    bthread_id_list_t tmplist;
    const int rc = bthread_id_list_init(&tmplist, 0, 0);
    if (rc != 0) {
        return rc;
    }
    // Swap out the list then reset. The critical section is very small.
    pthread_mutex_lock(mutex);
    std::swap(list->impl, tmplist.impl);
    pthread_mutex_unlock(mutex);
    const int rc2 = bthread_id_list_reset2(&tmplist, error_code, error_text);
    bthread_id_list_destroy(&tmplist);
    return rc2;
}

int bthread_id_list_reset2_bthreadsafe(bthread_id_list_t* list,
                                       int error_code,
                                       const std::string& error_text,
                                       bthread_mutex_t* mutex) __THROW {
    if (mutex == NULL) {
        return EINVAL;
    }
    if (list->impl == NULL) {
        return 0;
    }
    bthread_id_list_t tmplist;
    const int rc = bthread_id_list_init(&tmplist, 0, 0);
    if (rc != 0) {
        return rc;
    }
    // Swap out the list then reset. The critical section is very small.
    bthread_mutex_lock(mutex);
    std::swap(list->impl, tmplist.impl);
    bthread_mutex_unlock(mutex);
    const int rc2 = bthread_id_list_reset2(&tmplist, error_code, error_text);
    bthread_id_list_destroy(&tmplist);
    return rc2;
}