condition_variable.hpp 9.16 KB
Newer Older
somdoron's avatar
somdoron committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
somdoron's avatar
somdoron committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
somdoron's avatar
somdoron committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
somdoron's avatar
somdoron committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
somdoron's avatar
somdoron committed
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#ifndef __ZMQ_CONDITON_VARIABLE_HPP_INCLUDED__
#define __ZMQ_CONDITON_VARIABLE_HPP_INCLUDED__

#include "err.hpp"
#include "mutex.hpp"

//  Condition variable class encapsulates OS mutex in a platform-independent way.

#ifdef ZMQ_HAVE_WINDOWS

#include "windows.hpp"
41 42 43 44 45 46 47 48 49 50 51 52 53
#if defined(_MSC_VER)
#if _MSC_VER >= 1800
#define _SUPPORT_CONDITION_VARIABLE 1
#else
#define _SUPPORT_CONDITION_VARIABLE 0
#endif
#else
#if _cplusplus >= 201103L
#define _SUPPORT_CONDITION_VARIABLE 1
#else
#define _SUPPORT_CONDITION_VARIABLE 0
#endif
#endif
somdoron's avatar
somdoron committed
54

55
// Condition variable is supported from Windows Vista only, to use condition variable define _WIN32_WINNT to 0x0600
56
#if _WIN32_WINNT < 0x0600 && !_SUPPORT_CONDITION_VARIABLE
57 58 59

namespace zmq
{
60 61 62 63
class condition_variable_t
{
  public:
    inline condition_variable_t () { zmq_assert (false); }
64

65
    inline ~condition_variable_t () {}
66

67 68 69 70 71
    inline int wait (mutex_t *mutex_, int timeout_)
    {
        zmq_assert (false);
        return -1;
    }
72

73
    inline void broadcast () { zmq_assert (false); }
74

75 76 77 78 79
  private:
    //  Disable copy construction and assignment.
    condition_variable_t (const condition_variable_t &);
    void operator= (const condition_variable_t &);
};
80 81 82 83
}

#else

84
#if _SUPPORT_CONDITION_VARIABLE || defined(ZMQ_HAVE_WINDOWS_TARGET_XP)
85 86 87
#include <condition_variable>
#include <mutex>
#endif
Dmitriy-GH's avatar
Dmitriy-GH committed
88

somdoron's avatar
somdoron committed
89 90 91
namespace zmq
{

92
#if !defined(ZMQ_HAVE_WINDOWS_TARGET_XP) && _WIN32_WINNT >= 0x0600
93 94 95
class condition_variable_t
{
  public:
96
    inline condition_variable_t () { InitializeConditionVariable (&_cv); }
97 98 99 100

    inline ~condition_variable_t () {}

    inline int wait (mutex_t *mutex_, int timeout_)
somdoron's avatar
somdoron committed
101
    {
102
        int rc = SleepConditionVariableCS (&_cv, mutex_->get_cs (), timeout_);
somdoron's avatar
somdoron committed
103

104 105
        if (rc != 0)
            return 0;
106

107
        rc = GetLastError ();
somdoron's avatar
somdoron committed
108

109 110
        if (rc != ERROR_TIMEOUT)
            win_assert (rc);
somdoron's avatar
somdoron committed
111

112 113 114
        errno = EAGAIN;
        return -1;
    }
somdoron's avatar
somdoron committed
115

116
    inline void broadcast () { WakeAllConditionVariable (&_cv); }
somdoron's avatar
somdoron committed
117

118
  private:
119
    CONDITION_VARIABLE _cv;
somdoron's avatar
somdoron committed
120

121 122 123 124 125 126 127 128 129
    //  Disable copy construction and assignment.
    condition_variable_t (const condition_variable_t &);
    void operator= (const condition_variable_t &);
};
#else
class condition_variable_t
{
  public:
    inline condition_variable_t () {}
somdoron's avatar
somdoron committed
130

131 132 133 134
    inline ~condition_variable_t () {}

    inline int wait (mutex_t *mutex_, int timeout_)
    {
135 136
        std::unique_lock<std::mutex> lck (_mtx); // lock mtx
        mutex_->unlock ();                       // unlock mutex_
137 138
        int res = 0;
        if (timeout_ == -1) {
139
            _cv.wait (
140
              lck); // unlock mtx and wait cv.notify_all(), lock mtx after cv.notify_all()
141
        } else if (_cv.wait_for (lck, std::chrono::milliseconds (timeout_))
142 143 144 145
                   == std::cv_status::timeout) {
            // time expired
            errno = EAGAIN;
            res = -1;
somdoron's avatar
somdoron committed
146
        }
147 148 149 150
        lck.unlock ();   // unlock mtx
        mutex_->lock (); // lock mutex_
        return res;
    }
somdoron's avatar
somdoron committed
151

152 153
    inline void broadcast ()
    {
154 155
        std::unique_lock<std::mutex> lck (_mtx); // lock mtx
        _cv.notify_all ();
156
    }
somdoron's avatar
somdoron committed
157

158
  private:
159 160
    std::condition_variable _cv;
    std::mutex _mtx;
somdoron's avatar
somdoron committed
161

162 163 164 165
    //  Disable copy construction and assignment.
    condition_variable_t (const condition_variable_t &);
    void operator= (const condition_variable_t &);
};
somdoron's avatar
somdoron committed
166

Dmitriy-GH's avatar
Dmitriy-GH committed
167
#endif
somdoron's avatar
somdoron committed
168 169
}

170 171
#endif

172 173 174 175 176 177 178 179 180 181 182 183 184
#elif defined ZMQ_HAVE_VXWORKS

#include <sysLib.h>

namespace zmq
{
class condition_variable_t
{
  public:
    inline condition_variable_t () {}

    inline ~condition_variable_t ()
    {
185 186 187
        scoped_lock_t l (_listenersMutex);
        for (size_t i = 0; i < _listeners.size (); i++) {
            semDelete (_listeners[i]);
188 189 190 191 192 193 194 195 196 197 198 199 200
        }
    }

    inline int wait (mutex_t *mutex_, int timeout_)
    {
        //Atomically releases lock, blocks the current executing thread,
        //and adds it to the list of threads waiting on *this. The thread
        //will be unblocked when broadcast() is executed.
        //It may also be unblocked spuriously. When unblocked, regardless
        //of the reason, lock is reacquired and wait exits.

        SEM_ID sem = semBCreate (SEM_Q_PRIORITY, SEM_EMPTY);
        {
201 202
            scoped_lock_t l (_listenersMutex);
            _listeners.push_back (sem);
203 204 205 206 207 208 209 210 211 212 213 214 215
        }
        mutex_->unlock ();

        int rc;
        if (timeout_ < 0)
            rc = semTake (sem, WAIT_FOREVER);
        else {
            int ticksPerSec = sysClkRateGet ();
            int timeoutTicks = (timeout_ * ticksPerSec) / 1000 + 1;
            rc = semTake (sem, timeoutTicks);
        }

        {
216
            scoped_lock_t l (_listenersMutex);
217
            // remove sem from listeners
218 219 220
            for (size_t i = 0; i < _listeners.size (); i++) {
                if (_listeners[i] == sem) {
                    _listeners.erase (_listeners.begin () + i);
221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
                    break;
                }
            }
            semDelete (sem);
        }
        mutex_->lock ();

        if (rc == 0)
            return 0;

        if (rc == S_objLib_OBJ_TIMEOUT) {
            errno = EAGAIN;
            return -1;
        }

        return -1;
    }

    inline void broadcast ()
    {
241 242 243
        scoped_lock_t l (_listenersMutex);
        for (size_t i = 0; i < _listeners.size (); i++) {
            semGive (_listeners[i]);
244 245 246 247
        }
    }

  private:
248 249
    mutex_t _listenersMutex;
    std::vector<SEM_ID> _listeners;
250 251 252 253 254 255

    // Disable copy construction and assignment.
    condition_variable_t (const condition_variable_t &);
    const condition_variable_t &operator= (const condition_variable_t &);
};
}
somdoron's avatar
somdoron committed
256 257 258 259
#else

#include <pthread.h>

260 261 262 263 264 265 266
#if defined(__ANDROID_API__) && __ANDROID_API__ < 21
#define ANDROID_LEGACY
extern "C" int pthread_cond_timedwait_monotonic_np (pthread_cond_t *,
                                                    pthread_mutex_t *,
                                                    const struct timespec *);
#endif

somdoron's avatar
somdoron committed
267 268
namespace zmq
{
269 270 271 272
class condition_variable_t
{
  public:
    inline condition_variable_t ()
somdoron's avatar
somdoron committed
273
    {
274 275 276 277 278
        pthread_condattr_t attr;
        pthread_condattr_init (&attr);
#if !defined(ZMQ_HAVE_OSX) && !defined(ANDROID_LEGACY)
        pthread_condattr_setclock (&attr, CLOCK_MONOTONIC);
#endif
279
        int rc = pthread_cond_init (&_cond, &attr);
280 281
        posix_assert (rc);
    }
somdoron's avatar
somdoron committed
282

283 284
    inline ~condition_variable_t ()
    {
285
        int rc = pthread_cond_destroy (&_cond);
286 287
        posix_assert (rc);
    }
somdoron's avatar
somdoron committed
288

289 290 291
    inline int wait (mutex_t *mutex_, int timeout_)
    {
        int rc;
somdoron's avatar
somdoron committed
292

293 294
        if (timeout_ != -1) {
            struct timespec timeout;
295

296 297 298
#ifdef ZMQ_HAVE_OSX
            timeout.tv_sec = 0;
            timeout.tv_nsec = 0;
299
#else
300
            clock_gettime (CLOCK_MONOTONIC, &timeout);
301
#endif
302

303 304
            timeout.tv_sec += timeout_ / 1000;
            timeout.tv_nsec += (timeout_ % 1000) * 1000000;
305

306 307 308
            if (timeout.tv_nsec > 1000000000) {
                timeout.tv_sec++;
                timeout.tv_nsec -= 1000000000;
somdoron's avatar
somdoron committed
309
            }
310 311
#ifdef ZMQ_HAVE_OSX
            rc = pthread_cond_timedwait_relative_np (
312
              &_cond, mutex_->get_mutex (), &timeout);
313 314
#elif defined(ANDROID_LEGACY)
            rc = pthread_cond_timedwait_monotonic_np (
315
              &_cond, mutex_->get_mutex (), &timeout);
316
#else
317 318
            rc =
              pthread_cond_timedwait (&_cond, mutex_->get_mutex (), &timeout);
319
#endif
320
        } else
321
            rc = pthread_cond_wait (&_cond, mutex_->get_mutex ());
somdoron's avatar
somdoron committed
322

323 324
        if (rc == 0)
            return 0;
somdoron's avatar
somdoron committed
325

326 327
        if (rc == ETIMEDOUT) {
            errno = EAGAIN;
somdoron's avatar
somdoron committed
328 329 330
            return -1;
        }

331 332 333
        posix_assert (rc);
        return -1;
    }
somdoron's avatar
somdoron committed
334

335 336
    inline void broadcast ()
    {
337
        int rc = pthread_cond_broadcast (&_cond);
338 339
        posix_assert (rc);
    }
somdoron's avatar
somdoron committed
340

341
  private:
342
    pthread_cond_t _cond;
somdoron's avatar
somdoron committed
343

344 345 346 347
    // Disable copy construction and assignment.
    condition_variable_t (const condition_variable_t &);
    const condition_variable_t &operator= (const condition_variable_t &);
};
somdoron's avatar
somdoron committed
348 349 350 351 352 353
}

#endif


#endif