id.cpp 24.7 KB
Newer Older
gejun's avatar
gejun committed
1
// bthread - A M:N threading library to make applications more concurrent.
gejun's avatar
gejun committed
2
// Copyright (c) 2014 Baidu, Inc.
gejun's avatar
gejun committed
3 4 5 6 7 8 9 10 11 12 13 14
// 
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// 
//     http://www.apache.org/licenses/LICENSE-2.0
// 
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
gejun's avatar
gejun committed
15 16 17 18 19

// Author: Ge,Jun (gejun@baidu.com)
// Date: Sun Aug  3 12:46:15 CST 2014

#include <deque>
20
#include "butil/logging.h"
gejun's avatar
gejun committed
21
#include "bthread/butex.h"                       // butex_*
22
#include "bthread/mutex.h"
gejun's avatar
gejun committed
23
#include "bthread/list_of_abafree_id.h"
24
#include "butil/resource_pool.h"
gejun's avatar
gejun committed
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
#include "bthread/bthread.h"

namespace bthread {

// This queue reduces the chance to allocate memory for deque
template <typename T, int N>
class SmallQueue {
public:
    SmallQueue() : _begin(0), _size(0), _full(NULL) {}
    
    void push(const T& val) {
        if (_full != NULL && !_full->empty()) {
            _full->push_back(val);
        } else if (_size < N) {
            int tail = _begin + _size;
            if (tail >= N) {
                tail -= N;
            }
            _c[tail] = val;
            ++_size;
        } else {
            if (_full == NULL) {
                _full = new std::deque<T>;
            }
            _full->push_back(val);
        }
    }
    bool pop(T* val) {
        if (_size > 0) {
            *val = _c[_begin];
            ++_begin;
            if (_begin >= N) {
                _begin -= N;
            }
            --_size;
            return true;
        } else if (_full && !_full->empty()) {
            *val = _full->front();
            _full->pop_front();
            return true;
        }
        return false;
    }
    bool empty() const {
        return _size == 0 && (_full == NULL || _full->empty());
    }

    size_t size() const {
        return _size + (_full ? _full->size() : 0);
    }

    void clear() {
        _size = 0;
        _begin = 0;
        if (_full) {
            _full->clear();
        }
    }

    ~SmallQueue() {
        delete _full;
        _full = NULL;
    }
    
private:
    DISALLOW_COPY_AND_ASSIGN(SmallQueue);
    
    int _begin;
    int _size;
    T _c[N];
    std::deque<T>* _full;
};

struct PendingError {
    bthread_id_t id;
    int error_code;
    std::string error_text;
    const char *location;

    PendingError() : id(INVALID_BTHREAD_ID), error_code(0), location(NULL) {}
};

struct BAIDU_CACHELINE_ALIGNMENT Id {
    // first_ver ~ locked_ver - 1: unlocked versions
    // locked_ver: locked
    // unlockable_ver: locked and about to be destroyed
    // contended_ver: locked and contended
    uint32_t first_ver;
    uint32_t locked_ver;
114
    internal::FastPthreadMutex mutex;
gejun's avatar
gejun committed
115 116 117
    void* data;
    int (*on_error)(bthread_id_t, void*, int);
    int (*on_error2)(bthread_id_t, void*, int, const std::string&);
gejun's avatar
gejun committed
118
    const char *lock_location;
gejun's avatar
gejun committed
119 120
    uint32_t* butex;
    uint32_t* join_butex;
gejun's avatar
gejun committed
121 122 123 124 125
    SmallQueue<PendingError, 2> pending_q;
    
    Id() {
        // Although value of the butex(as version part of bthread_id_t)
        // does not matter, we set it to 0 to make program more deterministic.
gejun's avatar
gejun committed
126 127 128 129
        butex = bthread::butex_create_checked<uint32_t>();
        join_butex = bthread::butex_create_checked<uint32_t>();
        *butex = 0;
        *join_butex = 0;
gejun's avatar
gejun committed
130 131 132
    }

    ~Id() {
gejun's avatar
gejun committed
133 134
        bthread::butex_destroy(butex);
        bthread::butex_destroy(join_butex);
gejun's avatar
gejun committed
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
    }

    inline bool has_version(uint32_t id_ver) const {
        return id_ver >= first_ver && id_ver < locked_ver;
    }
    inline uint32_t contended_ver() const { return locked_ver + 1; }
    inline uint32_t unlockable_ver() const { return locked_ver + 2; }
    inline uint32_t last_ver() const { return unlockable_ver(); }
    
    // also the next "first_ver"
    inline uint32_t end_ver() const { return last_ver() + 1; }
};

BAIDU_CASSERT(sizeof(Id) % 64 == 0, sizeof_Id_must_align);

150
typedef butil::ResourceId<Id> IdResourceId;
gejun's avatar
gejun committed
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166

inline bthread_id_t make_id(uint32_t version, IdResourceId slot) {
    const bthread_id_t tmp =
        { (((uint64_t)slot.value) << 32) | (uint64_t)version };
    return tmp;
}

inline IdResourceId get_slot(bthread_id_t id) {
    const IdResourceId tmp = { (id.value >> 32) };
    return tmp;
}

inline uint32_t get_version(bthread_id_t id) {
    return (uint32_t)(id.value & 0xFFFFFFFFul);
}

gejun's avatar
gejun committed
167
inline bool id_exists_with_true_negatives(bthread_id_t id) {
gejun's avatar
gejun committed
168 169 170 171 172
    Id* const meta = address_resource(get_slot(id));
    if (meta == NULL) {
        return false;
    }
    const uint32_t id_ver = bthread::get_version(id);
gejun's avatar
gejun committed
173
    return id_ver >= meta->first_ver && id_ver <= meta->last_ver();
gejun's avatar
gejun committed
174 175 176 177 178
}
// required by unittest
uint32_t id_value(bthread_id_t id) {
    Id* const meta = address_resource(get_slot(id));
    if (meta != NULL) {
gejun's avatar
gejun committed
179
        return *meta->butex;
gejun's avatar
gejun committed
180 181 182 183 184 185 186 187 188 189 190 191 192 193
    }
    return 0;  // valid version never be zero
}

static int default_bthread_id_on_error(bthread_id_t id, void*, int) {
    return bthread_id_unlock_and_destroy(id);
}
static int default_bthread_id_on_error2(
    bthread_id_t id, void*, int, const std::string&) {
    return bthread_id_unlock_and_destroy(id);
}

void id_status(bthread_id_t id, std::ostream &os) {
    bthread::Id* const meta = address_resource(bthread::get_slot(id));
194
    if (!meta) {
gejun's avatar
gejun committed
195 196 197 198
        os << "Invalid id=" << id.value << '\n';
        return;
    }
    const uint32_t id_ver = bthread::get_version(id);
gejun's avatar
gejun committed
199
    uint32_t* butex = meta->butex;
gejun's avatar
gejun committed
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
    bool valid = true;
    void* data = NULL;
    int (*on_error)(bthread_id_t, void*, int) = NULL;
    int (*on_error2)(bthread_id_t, void*, int, const std::string&) = NULL;
    uint32_t first_ver = 0;
    uint32_t locked_ver = 0;
    uint32_t unlockable_ver = 0;
    uint32_t contended_ver = 0;
    const char *lock_location = NULL;
    SmallQueue<PendingError, 2> pending_q;
    uint32_t butex_value = 0;

    meta->mutex.lock();    
    if (meta->has_version(id_ver)) {
        data = meta->data;
        on_error = meta->on_error;
        on_error2 = meta->on_error2;
        first_ver = meta->first_ver;
        locked_ver = meta->locked_ver;
        unlockable_ver = meta->unlockable_ver();
        contended_ver = meta->contended_ver();
        lock_location = meta->lock_location;
        const size_t size = meta->pending_q.size();
        for (size_t i = 0; i < size; ++i) {
            PendingError front;
            meta->pending_q.pop(&front);
            meta->pending_q.push(front);
            pending_q.push(front);
        }
        butex_value = *butex;
    } else {
        valid = false;
    }
    meta->mutex.unlock();

    if (valid) {
        os << "First id: "
           << bthread::make_id(first_ver, bthread::get_slot(id)).value << '\n'
           << "Range: " << locked_ver - first_ver << '\n'
           << "Status: ";
        if (butex_value != first_ver) {
            os << "LOCKED at " << lock_location;
            if (butex_value == contended_ver) {
                os << " (CONTENDED)";
            } else if (butex_value == unlockable_ver) {
                os << " (ABOUT TO DESTROY)";
            } else {
                os << " (UNCONTENDED)";
            }
        } else {
            os << "UNLOCKED";
        }
        os << "\nPendingQ:";
        if (pending_q.empty()) {
            os << " EMPTY";
        } else {
            const size_t size = pending_q.size();
            for (size_t i = 0; i < size; ++i) {
                PendingError front;
                pending_q.pop(&front);
                os << " (" << front.location << "/E" << front.error_code
                   << '/' << front.error_text << ')';
            }
        }
        if (on_error) {
            if (on_error == default_bthread_id_on_error) {
                os << "\nOnError: unlock_and_destroy";
            } else {
                os << "\nOnError: " << (void*)on_error;
            }
        } else {
            if (on_error2 == default_bthread_id_on_error2) {
                os << "\nOnError2: unlock_and_destroy";
            } else {
                os << "\nOnError2: " << (void*)on_error2;
            }
        }
        os << "\nData: " << data;
    } else {
        os << "Invalid id=" << id.value;
    }
    os << '\n';
}

void id_pool_status(std::ostream &os) {
285
    os << butil::describe_resources<Id>() << '\n';
gejun's avatar
gejun committed
286 287 288 289 290 291
}

struct IdTraits {
    static const size_t BLOCK_SIZE = 63;
    static const size_t MAX_ENTRIES = 100000;
    static const bthread_id_t ID_INIT;
gejun's avatar
gejun committed
292 293
    static bool exists(bthread_id_t id)
    { return bthread::id_exists_with_true_negatives(id); }
gejun's avatar
gejun committed
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326
};
const bthread_id_t IdTraits::ID_INIT = INVALID_BTHREAD_ID;

typedef ListOfABAFreeId<bthread_id_t, IdTraits> IdList;

struct IdResetter {
    explicit IdResetter(int ec, const std::string& et)
        : _error_code(ec), _error_text(et) {}
    void operator()(bthread_id_t & id) const {
        bthread_id_error2_verbose(
            id, _error_code, _error_text, __FILE__ ":" BAIDU_SYMBOLSTR(__LINE__));
        id = INVALID_BTHREAD_ID;
    }
private:
    int _error_code;
    const std::string& _error_text;
};

size_t get_sizes(const bthread_id_list_t* list, size_t* cnt, size_t n) {
    if (list->impl == NULL) {
        return 0;
    }
    return static_cast<bthread::IdList*>(list->impl)->get_sizes(cnt, n);
}

const int ID_MAX_RANGE = 1024;

static int id_create_impl(
    bthread_id_t* id, void* data,
    int (*on_error)(bthread_id_t, void*, int),
    int (*on_error2)(bthread_id_t, void*, int, const std::string&)) __THROW {
    IdResourceId slot;
    Id* const meta = get_resource(&slot);
327
    if (meta) {
gejun's avatar
gejun committed
328 329 330 331
        meta->data = data;
        meta->on_error = on_error;
        meta->on_error2 = on_error2;
        CHECK(meta->pending_q.empty());
gejun's avatar
gejun committed
332
        uint32_t* butex = meta->butex;
gejun's avatar
gejun committed
333 334 335 336 337
        if (0 == *butex || *butex + ID_MAX_RANGE + 2 < *butex) {
            // Skip 0 so that bthread_id_t is never 0
            // avoid overflow to make comparisons simpler.
            *butex = 1;
        }
gejun's avatar
gejun committed
338
        *meta->join_butex = *butex;
gejun's avatar
gejun committed
339 340 341 342 343 344 345 346 347 348 349 350 351
        meta->first_ver = *butex;
        meta->locked_ver = *butex + 1;
        *id = make_id(*butex, slot);
        return 0;
    }
    return ENOMEM;
}

static int id_create_ranged_impl(
    bthread_id_t* id, void* data,
    int (*on_error)(bthread_id_t, void*, int),
    int (*on_error2)(bthread_id_t, void*, int, const std::string&),
    int range) __THROW {
352
    if (range < 1 || range > ID_MAX_RANGE) {
gejun's avatar
gejun committed
353 354 355 356 357 358 359
        LOG_IF(FATAL, range < 1) << "range must be positive, actually " << range;
        LOG_IF(FATAL, range > ID_MAX_RANGE ) << "max of range is " 
                << ID_MAX_RANGE << ", actually " << range;
        return EINVAL;
    }
    IdResourceId slot;
    Id* const meta = get_resource(&slot);
360
    if (meta) {
gejun's avatar
gejun committed
361 362 363 364
        meta->data = data;
        meta->on_error = on_error;
        meta->on_error2 = on_error2;
        CHECK(meta->pending_q.empty());
gejun's avatar
gejun committed
365
        uint32_t* butex = meta->butex;
gejun's avatar
gejun committed
366 367 368 369 370
        if (0 == *butex || *butex + ID_MAX_RANGE + 2 < *butex) {
            // Skip 0 so that bthread_id_t is never 0
            // avoid overflow to make comparisons simpler.
            *butex = 1;
        }
gejun's avatar
gejun committed
371
        *meta->join_butex = *butex;
gejun's avatar
gejun committed
372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403
        meta->first_ver = *butex;
        meta->locked_ver = *butex + range;
        *id = make_id(*butex, slot);
        return 0;
    }
    return ENOMEM;
}

}  // namespace bthread

extern "C" {

int bthread_id_create(
    bthread_id_t* id, void* data,
    int (*on_error)(bthread_id_t, void*, int)) __THROW {
    return bthread::id_create_impl(
        id, data,
        (on_error ? on_error : bthread::default_bthread_id_on_error), NULL);
}

int bthread_id_create_ranged(bthread_id_t* id, void* data,
                             int (*on_error)(bthread_id_t, void*, int),
                             int range) __THROW {
    return bthread::id_create_ranged_impl(
        id, data, 
        (on_error ? on_error : bthread::default_bthread_id_on_error),
        NULL, range);
}

int bthread_id_lock_and_reset_range_verbose(
    bthread_id_t id, void **pdata, int range, const char *location) __THROW {
    bthread::Id* const meta = address_resource(bthread::get_slot(id));
404
    if (!meta) {
gejun's avatar
gejun committed
405 406 407
        return EINVAL;
    }
    const uint32_t id_ver = bthread::get_version(id);
gejun's avatar
gejun committed
408
    uint32_t* butex = meta->butex;
gejun's avatar
gejun committed
409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438
    bool ever_contended = false;
    meta->mutex.lock();
    while (meta->has_version(id_ver)) {
        if (*butex == meta->first_ver) {
            // contended locker always wakes up the butex at unlock.
            meta->lock_location = location;
            if (range == 0) {
                // fast path
            } else if (range < 0 ||
                       range > bthread::ID_MAX_RANGE ||
                       range + meta->first_ver <= meta->locked_ver) {
                LOG_IF(FATAL, range < 0) << "range must be positive, actually "
                                         << range;
                LOG_IF(FATAL, range > bthread::ID_MAX_RANGE)
                    << "max range is " << bthread::ID_MAX_RANGE
                    << ", actually " << range;
            } else {
                meta->locked_ver = meta->first_ver + range;
            }
            *butex = (ever_contended ? meta->contended_ver() : meta->locked_ver);
            meta->mutex.unlock();
            if (pdata) {
                *pdata = meta->data;
            }
            return 0;
        } else if (*butex != meta->unlockable_ver()) {
            *butex = meta->contended_ver();
            uint32_t expected_ver = *butex;
            meta->mutex.unlock();
            ever_contended = true;
439 440 441
            if (bthread::butex_wait(butex, expected_ver, NULL) < 0 &&
                errno != EWOULDBLOCK && errno != EINTR) {
                return errno;
gejun's avatar
gejun committed
442 443 444 445
            }
            meta->mutex.lock();
        } else { // bthread_id_about_to_destroy was called.
            meta->mutex.unlock();
gejun's avatar
gejun committed
446
            return EPERM;
gejun's avatar
gejun committed
447 448 449 450 451 452 453 454 455 456 457 458 459
        }
    }
    meta->mutex.unlock();
    return EINVAL;
}

int bthread_id_error_verbose(bthread_id_t id, int error_code, 
                             const char *location) __THROW {
    return bthread_id_error2_verbose(id, error_code, std::string(), location);
}

int bthread_id_about_to_destroy(bthread_id_t id) __THROW {
    bthread::Id* const meta = address_resource(bthread::get_slot(id));
460
    if (!meta) {
gejun's avatar
gejun committed
461 462 463
        return EINVAL;
    }
    const uint32_t id_ver = bthread::get_version(id);
gejun's avatar
gejun committed
464
    uint32_t* butex = meta->butex;
465
    meta->mutex.lock();
gejun's avatar
gejun committed
466
    if (!meta->has_version(id_ver)) {
467
        meta->mutex.unlock();
gejun's avatar
gejun committed
468 469 470
        return EINVAL;
    }
    if (*butex == meta->first_ver) {
471
        meta->mutex.unlock();
gejun's avatar
gejun committed
472 473 474 475 476
        LOG(FATAL) << "bthread_id=" << id.value << " is not locked!";
        return EPERM;
    }
    const bool contended = (*butex == meta->contended_ver());
    *butex = meta->unlockable_ver();
477
    meta->mutex.unlock();
gejun's avatar
gejun committed
478 479 480 481 482 483 484 485 486
    if (contended) {
        // wake up all waiting lockers.
        bthread::butex_wake_except(butex, 0);
    }
    return 0;
}

int bthread_id_cancel(bthread_id_t id) __THROW {
    bthread::Id* const meta = address_resource(bthread::get_slot(id));
487
    if (!meta) {
gejun's avatar
gejun committed
488 489
        return EINVAL;
    }
gejun's avatar
gejun committed
490
    uint32_t* butex = meta->butex;
gejun's avatar
gejun committed
491
    const uint32_t id_ver = bthread::get_version(id);
492
    meta->mutex.lock();
gejun's avatar
gejun committed
493
    if (!meta->has_version(id_ver)) {
494
        meta->mutex.unlock();
gejun's avatar
gejun committed
495 496 497
        return EINVAL;
    }
    if (*butex != meta->first_ver) {
498
        meta->mutex.unlock();
gejun's avatar
gejun committed
499 500 501 502 503
        return EPERM;
    }       
    *butex = meta->end_ver();
    meta->first_ver = *butex;
    meta->locked_ver = *butex;
504
    meta->mutex.unlock();
gejun's avatar
gejun committed
505 506 507 508 509 510 511
    return_resource(bthread::get_slot(id));
    return 0;
}

int bthread_id_join(bthread_id_t id) __THROW {
    const bthread::IdResourceId slot = bthread::get_slot(id);
    bthread::Id* const meta = address_resource(slot);
512
    if (!meta) {
513
        // The id is not created yet, this join is definitely wrong.
gejun's avatar
gejun committed
514 515 516
        return EINVAL;
    }
    const uint32_t id_ver = bthread::get_version(id);
gejun's avatar
gejun committed
517
    uint32_t* join_butex = meta->join_butex;
gejun's avatar
gejun committed
518 519 520 521 522
    while (1) {
        meta->mutex.lock();
        const bool has_ver = meta->has_version(id_ver);
        const uint32_t expected_ver = *join_butex;
        meta->mutex.unlock();
523
        if (!has_ver) {
gejun's avatar
gejun committed
524 525
            break;
        }
526 527 528 529
        if (bthread::butex_wait(join_butex, expected_ver, NULL) < 0 &&
            errno != EWOULDBLOCK && errno != EINTR) {
            return errno;
        }
gejun's avatar
gejun committed
530
    }
531
    return 0;
gejun's avatar
gejun committed
532 533 534 535
}

int bthread_id_trylock(bthread_id_t id, void** pdata) __THROW {
    bthread::Id* const meta = address_resource(bthread::get_slot(id));
536
    if (!meta) {
gejun's avatar
gejun committed
537 538
        return EINVAL;
    }
gejun's avatar
gejun committed
539
    uint32_t* butex = meta->butex;
gejun's avatar
gejun committed
540
    const uint32_t id_ver = bthread::get_version(id);
541
    meta->mutex.lock();
gejun's avatar
gejun committed
542
    if (!meta->has_version(id_ver)) {
543
        meta->mutex.unlock();
gejun's avatar
gejun committed
544 545 546
        return EINVAL;
    }
    if (*butex != meta->first_ver) {
547
        meta->mutex.unlock();
gejun's avatar
gejun committed
548 549 550
        return EBUSY;
    }
    *butex = meta->locked_ver;
551
    meta->mutex.unlock();
gejun's avatar
gejun committed
552 553 554 555 556 557 558 559 560 561 562 563 564
    if (pdata != NULL) {
        *pdata = meta->data;
    }
    return 0;
}

int bthread_id_lock_verbose(bthread_id_t id, void** pdata,
                            const char *location) __THROW {
    return bthread_id_lock_and_reset_range_verbose(id, pdata, 0, location);
}

int bthread_id_unlock(bthread_id_t id) __THROW {
    bthread::Id* const meta = address_resource(bthread::get_slot(id));
565
    if (!meta) {
gejun's avatar
gejun committed
566 567
        return EINVAL;
    }
gejun's avatar
gejun committed
568
    uint32_t* butex = meta->butex;
gejun's avatar
gejun committed
569 570 571
    // Release fence makes sure all changes made before signal visible to
    // woken-up waiters.
    const uint32_t id_ver = bthread::get_version(id);
572
    meta->mutex.lock();
gejun's avatar
gejun committed
573
    if (!meta->has_version(id_ver)) {
574
        meta->mutex.unlock();
gejun's avatar
gejun committed
575 576 577 578
        LOG(FATAL) << "Invalid bthread_id=" << id.value;
        return EINVAL;
    }
    if (*butex == meta->first_ver) {
579
        meta->mutex.unlock();
gejun's avatar
gejun committed
580 581 582 583 584 585
        LOG(FATAL) << "bthread_id=" << id.value << " is not locked!";
        return EPERM;
    }
    bthread::PendingError front;
    if (meta->pending_q.pop(&front)) {
        meta->lock_location = front.location;
586
        meta->mutex.unlock();
gejun's avatar
gejun committed
587 588 589 590 591 592 593 594 595
        if (meta->on_error) {
            return meta->on_error(front.id, meta->data, front.error_code);
        } else {
            return meta->on_error2(front.id, meta->data, front.error_code,
                                   front.error_text);
        }
    } else {
        const bool contended = (*butex == meta->contended_ver());
        *butex = meta->first_ver;
596
        meta->mutex.unlock();
gejun's avatar
gejun committed
597 598 599 600 601 602 603 604 605 606
        if (contended) {
            // We may wake up already-reused id, but that's OK.
            bthread::butex_wake(butex);
        }
        return 0; 
    }
}

int bthread_id_unlock_and_destroy(bthread_id_t id) __THROW {
    bthread::Id* const meta = address_resource(bthread::get_slot(id));
607
    if (!meta) {
gejun's avatar
gejun committed
608 609
        return EINVAL;
    }
gejun's avatar
gejun committed
610 611
    uint32_t* butex = meta->butex;
    uint32_t* join_butex = meta->join_butex;
gejun's avatar
gejun committed
612
    const uint32_t id_ver = bthread::get_version(id);
613
    meta->mutex.lock();
gejun's avatar
gejun committed
614
    if (!meta->has_version(id_ver)) {
615
        meta->mutex.unlock();
gejun's avatar
gejun committed
616 617 618 619
        LOG(FATAL) << "Invalid bthread_id=" << id.value;
        return EINVAL;
    }
    if (*butex == meta->first_ver) {
620
        meta->mutex.unlock();
gejun's avatar
gejun committed
621 622 623
        LOG(FATAL) << "bthread_id=" << id.value << " is not locked!";
        return EPERM;
    }
624 625 626 627 628
    const uint32_t next_ver = meta->end_ver();
    *butex = next_ver;
    *join_butex = next_ver;
    meta->first_ver = next_ver;
    meta->locked_ver = next_ver;
gejun's avatar
gejun committed
629
    meta->pending_q.clear();
630
    meta->mutex.unlock();
gejun's avatar
gejun committed
631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708
    // Notice that butex_wake* returns # of woken-up, not successful or not.
    bthread::butex_wake_except(butex, 0);
    bthread::butex_wake_all(join_butex);
    return_resource(bthread::get_slot(id));
    return 0;
}

int bthread_id_list_init(bthread_id_list_t* list,
                         unsigned /*size*/,
                         unsigned /*conflict_size*/) __THROW {
    list->impl = NULL;  // create on demand.
    // Set unused fields to zero as well.
    list->head = 0;
    list->size = 0;
    list->conflict_head = 0;
    list->conflict_size = 0;
    return 0;
}

void bthread_id_list_destroy(bthread_id_list_t* list) __THROW {
    delete static_cast<bthread::IdList*>(list->impl);
    list->impl = NULL;
}

int bthread_id_list_add(bthread_id_list_t* list, bthread_id_t id) __THROW {
    if (list->impl == NULL) {
        list->impl = new (std::nothrow) bthread::IdList;
        if (NULL == list->impl) {
            return ENOMEM;
        }
    }
    return static_cast<bthread::IdList*>(list->impl)->add(id);
}

int bthread_id_list_reset(bthread_id_list_t* list, int error_code) __THROW {
    return bthread_id_list_reset2(list, error_code, std::string());
}

void bthread_id_list_swap(bthread_id_list_t* list1, 
                          bthread_id_list_t* list2) __THROW {
    std::swap(list1->impl, list2->impl);
}

int bthread_id_list_reset_pthreadsafe(bthread_id_list_t* list, int error_code,
                                       pthread_mutex_t* mutex) __THROW {
    return bthread_id_list_reset2_pthreadsafe(
        list, error_code, std::string(), mutex);
}

int bthread_id_list_reset_bthreadsafe(bthread_id_list_t* list, int error_code,
                                      bthread_mutex_t* mutex) __THROW {
    return bthread_id_list_reset2_bthreadsafe(
        list, error_code, std::string(), mutex);
}

}  // extern "C"

int bthread_id_create2(
    bthread_id_t* id, void* data,
    int (*on_error)(bthread_id_t, void*, int, const std::string&)) __THROW {
    return bthread::id_create_impl(
        id, data, NULL,
        (on_error ? on_error : bthread::default_bthread_id_on_error2));
}

int bthread_id_create2_ranged(
    bthread_id_t* id, void* data,
    int (*on_error)(bthread_id_t, void*, int, const std::string&),
    int range) __THROW {
    return bthread::id_create_ranged_impl(
        id, data, NULL,
        (on_error ? on_error : bthread::default_bthread_id_on_error2), range);
}

int bthread_id_error2_verbose(bthread_id_t id, int error_code,
                              const std::string& error_text,
                              const char *location) __THROW {
    bthread::Id* const meta = address_resource(bthread::get_slot(id));
709
    if (!meta) {
gejun's avatar
gejun committed
710 711 712
        return EINVAL;
    }
    const uint32_t id_ver = bthread::get_version(id);
gejun's avatar
gejun committed
713
    uint32_t* butex = meta->butex;
714
    meta->mutex.lock();
gejun's avatar
gejun committed
715
    if (!meta->has_version(id_ver)) {
716
        meta->mutex.unlock();
gejun's avatar
gejun committed
717 718 719 720 721
        return EINVAL;
    }
    if (*butex == meta->first_ver) {
        *butex = meta->locked_ver;
        meta->lock_location = location;
722
        meta->mutex.unlock();
gejun's avatar
gejun committed
723 724 725 726 727 728 729 730 731 732 733 734
        if (meta->on_error) {
            return meta->on_error(id, meta->data, error_code);
        } else {
            return meta->on_error2(id, meta->data, error_code, error_text);
        }
    } else {
        bthread::PendingError e;
        e.id = id;
        e.error_code = error_code;
        e.error_text = error_text;
        e.location = location;
        meta->pending_q.push(e);
735
        meta->mutex.unlock();
gejun's avatar
gejun committed
736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796
        return 0;
    }
}

int bthread_id_list_reset2(bthread_id_list_t* list,
                           int error_code,
                           const std::string& error_text) __THROW {
    if (list->impl != NULL) {
        static_cast<bthread::IdList*>(list->impl)->apply(
            bthread::IdResetter(error_code, error_text));
    }
    return 0;
}

int bthread_id_list_reset2_pthreadsafe(bthread_id_list_t* list,
                                       int error_code,
                                       const std::string& error_text,
                                       pthread_mutex_t* mutex) __THROW {
    if (mutex == NULL) {
        return EINVAL;
    }
    if (list->impl == NULL) {
        return 0;
    }
    bthread_id_list_t tmplist;
    const int rc = bthread_id_list_init(&tmplist, 0, 0);
    if (rc != 0) {
        return rc;
    }
    // Swap out the list then reset. The critical section is very small.
    pthread_mutex_lock(mutex);
    std::swap(list->impl, tmplist.impl);
    pthread_mutex_unlock(mutex);
    const int rc2 = bthread_id_list_reset2(&tmplist, error_code, error_text);
    bthread_id_list_destroy(&tmplist);
    return rc2;
}

int bthread_id_list_reset2_bthreadsafe(bthread_id_list_t* list,
                                       int error_code,
                                       const std::string& error_text,
                                       bthread_mutex_t* mutex) __THROW {
    if (mutex == NULL) {
        return EINVAL;
    }
    if (list->impl == NULL) {
        return 0;
    }
    bthread_id_list_t tmplist;
    const int rc = bthread_id_list_init(&tmplist, 0, 0);
    if (rc != 0) {
        return rc;
    }
    // Swap out the list then reset. The critical section is very small.
    bthread_mutex_lock(mutex);
    std::swap(list->impl, tmplist.impl);
    bthread_mutex_unlock(mutex);
    const int rc2 = bthread_id_list_reset2(&tmplist, error_code, error_text);
    bthread_id_list_destroy(&tmplist);
    return rc2;
}