thread.cpp 9.44 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Martin Sustrik's avatar
Martin Sustrik committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Martin Sustrik's avatar
Martin Sustrik committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
25

26
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include "macros.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
32 33 34
#include "thread.hpp"
#include "err.hpp"

35 36
bool zmq::thread_t::get_started () const
{
37
    return _started;
38 39
}

Martin Sustrik's avatar
Martin Sustrik committed
40
#ifdef ZMQ_HAVE_WINDOWS
Martin Sustrik's avatar
Martin Sustrik committed
41

42
extern "C" {
boris@boressoft.ru's avatar
boris@boressoft.ru committed
43
#if defined _WIN32_WCE
44
static DWORD thread_routine (LPVOID arg_)
boris@boressoft.ru's avatar
boris@boressoft.ru committed
45
#else
46
static unsigned int __stdcall thread_routine (void *arg_)
boris@boressoft.ru's avatar
boris@boressoft.ru committed
47
#endif
48 49
{
    zmq::thread_t *self = (zmq::thread_t *) arg_;
50
    self->_tfn (self->_arg);
51 52
    return 0;
}
53 54
}

Martin Sustrik's avatar
Martin Sustrik committed
55
void zmq::thread_t::start (thread_fn *tfn_, void *arg_)
Martin Sustrik's avatar
Martin Sustrik committed
56
{
57 58
    _tfn = tfn_;
    _arg = arg_;
59
#if defined _WIN32_WCE
60
    _descriptor =
61
      (HANDLE) CreateThread (NULL, 0, &::thread_routine, this, 0, NULL);
boris@boressoft.ru's avatar
boris@boressoft.ru committed
62
#else
63
    _descriptor =
64
      (HANDLE) _beginthreadex (NULL, 0, &::thread_routine, this, 0, NULL);
boris@boressoft.ru's avatar
boris@boressoft.ru committed
65
#endif
66 67
    win_assert (_descriptor != NULL);
    _started = true;
Martin Sustrik's avatar
Martin Sustrik committed
68 69
}

70 71
bool zmq::thread_t::is_current_thread () const
{
72
    return GetCurrentThreadId () == GetThreadId (_descriptor);
73 74
}

Martin Sustrik's avatar
Martin Sustrik committed
75
void zmq::thread_t::stop ()
Martin Sustrik's avatar
Martin Sustrik committed
76
{
77 78
    if (_started) {
        DWORD rc = WaitForSingleObject (_descriptor, INFINITE);
79
        win_assert (rc != WAIT_FAILED);
80
        BOOL rc2 = CloseHandle (_descriptor);
81 82
        win_assert (rc2 != 0);
    }
Martin Sustrik's avatar
Martin Sustrik committed
83 84
}

85
void zmq::thread_t::setSchedulingParameters (
86
  int priority_, int scheduling_policy_, const std::set<int> &affinity_cpus_)
87 88
{
    // not implemented
89
    LIBZMQ_UNUSED (priority_);
90
    LIBZMQ_UNUSED (scheduling_policy_);
91
    LIBZMQ_UNUSED (affinity_cpus_);
92 93
}

94
void zmq::thread_t::setThreadName (const char *name_)
95 96 97 98 99
{
    // not implemented
    LIBZMQ_UNUSED (name_);
}

100 101 102 103 104 105 106
#elif defined ZMQ_HAVE_VXWORKS

extern "C" {
static void *thread_routine (void *arg_)
{
    zmq::thread_t *self = (zmq::thread_t *) arg_;
    self->applySchedulingParameters ();
107
    self->_tfn (self->_arg);
108 109 110 111 112 113
    return NULL;
}
}

void zmq::thread_t::start (thread_fn *tfn_, void *arg_)
{
114 115 116 117 118 119 120
    _tfn = tfn_;
    _arg = arg_;
    _descriptor = taskSpawn (NULL, DEFAULT_PRIORITY, DEFAULT_OPTIONS,
                             DEFAULT_STACK_SIZE, (FUNCPTR) thread_routine,
                             (int) this, 0, 0, 0, 0, 0, 0, 0, 0, 0);
    if (_descriptor != NULL || _descriptor > 0)
        _started = true;
121 122 123 124
}

void zmq::thread_t::stop ()
{
125 126 127
    if (_started)
        while ((_descriptor != NULL || _descriptor > 0)
               && taskIdVerify (_descriptor) == 0) {
128 129 130 131 132
        }
}

bool zmq::thread_t::is_current_thread () const
{
133
    return taskIdSelf () == _descriptor;
134 135 136 137 138
}

void zmq::thread_t::setSchedulingParameters (
  int priority_, int schedulingPolicy_, const std::set<int> &affinity_cpus_)
{
139 140 141
    _thread_priority = priority_;
    _thread_sched_policy = schedulingPolicy_;
    _thread_affinity_cpus = affinity_cpus_;
142 143 144 145 146
}

void zmq::thread_t::
  applySchedulingParameters () // to be called in secondary thread context
{
147 148
    int priority =
      (_thread_priority >= 0 ? _thread_priority : DEFAULT_PRIORITY);
149
    priority = (priority < UCHAR_MAX ? priority : DEFAULT_PRIORITY);
150 151
    if (_descriptor != NULL || _descriptor > 0) {
        taskPrioritySet (_descriptor, priority);
152 153 154 155 156 157 158 159 160
    }
}

void zmq::thread_t::setThreadName (const char *name_)
{
    // not implemented
    LIBZMQ_UNUSED (name_);
}

Martin Sustrik's avatar
Martin Sustrik committed
161 162 163
#else

#include <signal.h>
164
#include <unistd.h>
165 166
#include <sys/time.h>
#include <sys/resource.h>
Martin Sustrik's avatar
Martin Sustrik committed
167

168 169
extern "C" {
static void *thread_routine (void *arg_)
170
{
171
#if !defined ZMQ_HAVE_OPENVMS && !defined ZMQ_HAVE_ANDROID
172 173 174 175 176 177 178
    //  Following code will guarantee more predictable latencies as it'll
    //  disallow any signal handling in the I/O thread.
    sigset_t signal_set;
    int rc = sigfillset (&signal_set);
    errno_assert (rc == 0);
    rc = pthread_sigmask (SIG_BLOCK, &signal_set, NULL);
    posix_assert (rc);
179
#endif
180 181
    zmq::thread_t *self = (zmq::thread_t *) arg_;
    self->applySchedulingParameters ();
182
    self->_tfn (self->_arg);
183 184
    return NULL;
}
185 186
}

Martin Sustrik's avatar
Martin Sustrik committed
187
void zmq::thread_t::start (thread_fn *tfn_, void *arg_)
Martin Sustrik's avatar
Martin Sustrik committed
188
{
189 190 191
    _tfn = tfn_;
    _arg = arg_;
    int rc = pthread_create (&_descriptor, NULL, thread_routine, this);
192
    posix_assert (rc);
193
    _started = true;
Martin Sustrik's avatar
Martin Sustrik committed
194 195
}

Martin Sustrik's avatar
Martin Sustrik committed
196
void zmq::thread_t::stop ()
Martin Sustrik's avatar
Martin Sustrik committed
197
{
198 199
    if (_started) {
        int rc = pthread_join (_descriptor, NULL);
200 201
        posix_assert (rc);
    }
Martin Sustrik's avatar
Martin Sustrik committed
202 203
}

204 205
bool zmq::thread_t::is_current_thread () const
{
206
    return pthread_self () == _descriptor;
207 208
}

209 210
void zmq::thread_t::setSchedulingParameters (
  int priority_, int schedulingPolicy_, const std::set<int> &affinity_cpus_)
211
{
212 213 214
    _thread_priority = priority_;
    _thread_sched_policy = schedulingPolicy_;
    _thread_affinity_cpus = affinity_cpus_;
215 216
}

217 218
void zmq::thread_t::
  applySchedulingParameters () // to be called in secondary thread context
219
{
220 221
#if defined _POSIX_THREAD_PRIORITY_SCHEDULING                                  \
  && _POSIX_THREAD_PRIORITY_SCHEDULING >= 0
222 223 224
    int policy = 0;
    struct sched_param param;

225 226 227
#if _POSIX_THREAD_PRIORITY_SCHEDULING == 0                                     \
  && defined _SC_THREAD_PRIORITY_SCHEDULING
    if (sysconf (_SC_THREAD_PRIORITY_SCHEDULING) < 0) {
228 229 230
        return;
    }
#endif
ilue's avatar
ilue committed
231
    int rc = pthread_getschedparam (pthread_self (), &policy, &param);
232 233
    posix_assert (rc);

234 235
    if (_thread_sched_policy != ZMQ_THREAD_SCHED_POLICY_DFLT) {
        policy = _thread_sched_policy;
236 237
    }

238 239 240 241 242
    /* Quoting docs:
       "Linux allows the static priority range 1 to 99 for the SCHED_FIFO and
       SCHED_RR policies, and the priority 0 for the remaining policies."
       Other policies may use the "nice value" in place of the priority:
    */
243 244
    bool use_nice_instead_priority =
      (policy != SCHED_FIFO) && (policy != SCHED_RR);
245

246
    if (_thread_priority != ZMQ_THREAD_PRIORITY_DFLT) {
247
        if (use_nice_instead_priority)
248 249
            param.sched_priority =
              0; // this is the only supported priority for most scheduling policies
250
        else
251
            param.sched_priority =
252
              _thread_priority; // user should provide a value between 1 and 99
253 254
    }

255
#ifdef __NetBSD__
256 257
    if (policy == SCHED_OTHER)
        param.sched_priority = -1;
258 259
#endif

ilue's avatar
ilue committed
260
    rc = pthread_setschedparam (pthread_self (), policy, &param);
261

262
#if defined(__FreeBSD_kernel__) || defined(__FreeBSD__)
263
    // If this feature is unavailable at run-time, don't abort.
264 265
    if (rc == ENOSYS)
        return;
266 267
#endif

268
    posix_assert (rc);
269

270
#if !defined ZMQ_HAVE_VXWORKS
271
    if (use_nice_instead_priority
272
        && _thread_priority != ZMQ_THREAD_PRIORITY_DFLT) {
273 274 275
        // assume the user wants to decrease the thread's nice value
        // i.e., increase the chance of this thread being scheduled: try setting that to
        // maximum priority.
276
        rc = nice (-20);
277 278 279 280 281

        errno_assert (rc != -1);
        // IMPORTANT: EPERM is typically returned for unprivileged processes: that's because
        //            CAP_SYS_NICE capability is required or RLIMIT_NICE resource limit should be changed to avoid EPERM!
    }
282
#endif
283 284

#ifdef ZMQ_HAVE_PTHREAD_SET_AFFINITY
285
    if (!_thread_affinity_cpus.empty ()) {
286
        cpu_set_t cpuset;
287
        CPU_ZERO (&cpuset);
288 289
        for (std::set<int>::const_iterator it = _thread_affinity_cpus.begin ();
             it != _thread_affinity_cpus.end (); it++) {
290
            CPU_SET ((int) (*it), &cpuset);
291
        }
292 293
        rc =
          pthread_setaffinity_np (pthread_self (), sizeof (cpu_set_t), &cpuset);
294 295 296
        posix_assert (rc);
    }
#endif
297
#endif
298 299
}

300
void zmq::thread_t::setThreadName (const char *name_)
301
{
302
    /* The thread name is a cosmetic string, added to ease debugging of
303 304 305 306 307 308
 * multi-threaded applications. It is not a big issue if this value
 * can not be set for any reason (such as Permission denied in some
 * cases where the application changes its EUID, etc.) The value of
 * "int rc" is retained where available, to help debuggers stepping
 * through code to see its value - but otherwise it is ignored.
 */
309 310 311
    if (!name_)
        return;

312
#if defined(ZMQ_HAVE_PTHREAD_SETNAME_1)
313 314 315
    int rc = pthread_setname_np (name_);
    if (rc)
        return;
316
#elif defined(ZMQ_HAVE_PTHREAD_SETNAME_2)
317
    int rc = pthread_setname_np (_descriptor, name_);
318 319
    if (rc)
        return;
320
#elif defined(ZMQ_HAVE_PTHREAD_SETNAME_3)
321
    int rc = pthread_setname_np (_descriptor, name_, NULL);
322 323
    if (rc)
        return;
324
#elif defined(ZMQ_HAVE_PTHREAD_SET_NAME)
325
    pthread_set_name_np (_descriptor, name_);
326 327 328
#endif
}

Martin Sustrik's avatar
Martin Sustrik committed
329
#endif