thread.cpp 9.74 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Martin Sustrik's avatar
Martin Sustrik committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Martin Sustrik's avatar
Martin Sustrik committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
25

26
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include "macros.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
32 33 34
#include "thread.hpp"
#include "err.hpp"

35 36
bool zmq::thread_t::get_started () const
{
37
    return _started;
38 39
}

Martin Sustrik's avatar
Martin Sustrik committed
40
#ifdef ZMQ_HAVE_WINDOWS
Martin Sustrik's avatar
Martin Sustrik committed
41

42
extern "C" {
boris@boressoft.ru's avatar
boris@boressoft.ru committed
43
#if defined _WIN32_WCE
44
static DWORD thread_routine (LPVOID arg_)
boris@boressoft.ru's avatar
boris@boressoft.ru committed
45
#else
46
static unsigned int __stdcall thread_routine (void *arg_)
boris@boressoft.ru's avatar
boris@boressoft.ru committed
47
#endif
48 49
{
    zmq::thread_t *self = (zmq::thread_t *) arg_;
50
    self->_tfn (self->_arg);
51 52
    return 0;
}
53 54
}

55
void zmq::thread_t::start (thread_fn *tfn_, void *arg_, const char *name_)
Martin Sustrik's avatar
Martin Sustrik committed
56
{
57
    LIBZMQ_UNUSED (name_);
58 59
    _tfn = tfn_;
    _arg = arg_;
60
#if defined _WIN32_WCE
61
    _descriptor =
62
      (HANDLE) CreateThread (NULL, 0, &::thread_routine, this, 0, NULL);
boris@boressoft.ru's avatar
boris@boressoft.ru committed
63
#else
64
    _descriptor =
65
      (HANDLE) _beginthreadex (NULL, 0, &::thread_routine, this, 0, NULL);
boris@boressoft.ru's avatar
boris@boressoft.ru committed
66
#endif
67 68
    win_assert (_descriptor != NULL);
    _started = true;
Martin Sustrik's avatar
Martin Sustrik committed
69 70
}

71 72
bool zmq::thread_t::is_current_thread () const
{
73
    return GetCurrentThreadId () == GetThreadId (_descriptor);
74 75
}

Martin Sustrik's avatar
Martin Sustrik committed
76
void zmq::thread_t::stop ()
Martin Sustrik's avatar
Martin Sustrik committed
77
{
78 79
    if (_started) {
        DWORD rc = WaitForSingleObject (_descriptor, INFINITE);
80
        win_assert (rc != WAIT_FAILED);
81
        BOOL rc2 = CloseHandle (_descriptor);
82 83
        win_assert (rc2 != 0);
    }
Martin Sustrik's avatar
Martin Sustrik committed
84 85
}

86
void zmq::thread_t::setSchedulingParameters (
87
  int priority_, int scheduling_policy_, const std::set<int> &affinity_cpus_)
88 89
{
    // not implemented
90
    LIBZMQ_UNUSED (priority_);
91
    LIBZMQ_UNUSED (scheduling_policy_);
92
    LIBZMQ_UNUSED (affinity_cpus_);
93 94
}

95
void zmq::thread_t::setThreadName (const char *name_)
96 97 98 99 100
{
    // not implemented
    LIBZMQ_UNUSED (name_);
}

101 102 103 104 105 106 107
#elif defined ZMQ_HAVE_VXWORKS

extern "C" {
static void *thread_routine (void *arg_)
{
    zmq::thread_t *self = (zmq::thread_t *) arg_;
    self->applySchedulingParameters ();
108
    self->_tfn (self->_arg);
109 110 111 112
    return NULL;
}
}

113
void zmq::thread_t::start (thread_fn *tfn_, void *arg_, const char *name_)
114
{
115
    LIBZMQ_UNUSED (name_);
116 117 118 119 120 121 122
    _tfn = tfn_;
    _arg = arg_;
    _descriptor = taskSpawn (NULL, DEFAULT_PRIORITY, DEFAULT_OPTIONS,
                             DEFAULT_STACK_SIZE, (FUNCPTR) thread_routine,
                             (int) this, 0, 0, 0, 0, 0, 0, 0, 0, 0);
    if (_descriptor != NULL || _descriptor > 0)
        _started = true;
123 124 125 126
}

void zmq::thread_t::stop ()
{
127 128 129
    if (_started)
        while ((_descriptor != NULL || _descriptor > 0)
               && taskIdVerify (_descriptor) == 0) {
130 131 132 133 134
        }
}

bool zmq::thread_t::is_current_thread () const
{
135
    return taskIdSelf () == _descriptor;
136 137 138 139 140
}

void zmq::thread_t::setSchedulingParameters (
  int priority_, int schedulingPolicy_, const std::set<int> &affinity_cpus_)
{
141 142 143
    _thread_priority = priority_;
    _thread_sched_policy = schedulingPolicy_;
    _thread_affinity_cpus = affinity_cpus_;
144 145 146 147 148
}

void zmq::thread_t::
  applySchedulingParameters () // to be called in secondary thread context
{
149 150
    int priority =
      (_thread_priority >= 0 ? _thread_priority : DEFAULT_PRIORITY);
151
    priority = (priority < UCHAR_MAX ? priority : DEFAULT_PRIORITY);
152 153
    if (_descriptor != NULL || _descriptor > 0) {
        taskPrioritySet (_descriptor, priority);
154 155 156 157 158 159 160 161 162
    }
}

void zmq::thread_t::setThreadName (const char *name_)
{
    // not implemented
    LIBZMQ_UNUSED (name_);
}

Martin Sustrik's avatar
Martin Sustrik committed
163 164 165
#else

#include <signal.h>
166
#include <unistd.h>
167 168
#include <sys/time.h>
#include <sys/resource.h>
Martin Sustrik's avatar
Martin Sustrik committed
169

170 171
extern "C" {
static void *thread_routine (void *arg_)
172
{
173
#if !defined ZMQ_HAVE_OPENVMS && !defined ZMQ_HAVE_ANDROID
174 175 176 177 178 179 180
    //  Following code will guarantee more predictable latencies as it'll
    //  disallow any signal handling in the I/O thread.
    sigset_t signal_set;
    int rc = sigfillset (&signal_set);
    errno_assert (rc == 0);
    rc = pthread_sigmask (SIG_BLOCK, &signal_set, NULL);
    posix_assert (rc);
181
#endif
182 183
    zmq::thread_t *self = (zmq::thread_t *) arg_;
    self->applySchedulingParameters ();
184
    self->setThreadName (self->_name.c_str ());
185
    self->_tfn (self->_arg);
186 187
    return NULL;
}
188 189
}

190
void zmq::thread_t::start (thread_fn *tfn_, void *arg_, const char *name_)
Martin Sustrik's avatar
Martin Sustrik committed
191
{
192 193
    _tfn = tfn_;
    _arg = arg_;
194
    _name = name_;
195
    int rc = pthread_create (&_descriptor, NULL, thread_routine, this);
196
    posix_assert (rc);
197
    _started = true;
Martin Sustrik's avatar
Martin Sustrik committed
198 199
}

Martin Sustrik's avatar
Martin Sustrik committed
200
void zmq::thread_t::stop ()
Martin Sustrik's avatar
Martin Sustrik committed
201
{
202 203
    if (_started) {
        int rc = pthread_join (_descriptor, NULL);
204 205
        posix_assert (rc);
    }
Martin Sustrik's avatar
Martin Sustrik committed
206 207
}

208 209
bool zmq::thread_t::is_current_thread () const
{
210
    return pthread_self () == _descriptor;
211 212
}

213 214
void zmq::thread_t::setSchedulingParameters (
  int priority_, int schedulingPolicy_, const std::set<int> &affinity_cpus_)
215
{
216 217 218
    _thread_priority = priority_;
    _thread_sched_policy = schedulingPolicy_;
    _thread_affinity_cpus = affinity_cpus_;
219 220
}

221 222
void zmq::thread_t::
  applySchedulingParameters () // to be called in secondary thread context
223
{
224 225
#if defined _POSIX_THREAD_PRIORITY_SCHEDULING                                  \
  && _POSIX_THREAD_PRIORITY_SCHEDULING >= 0
226 227 228
    int policy = 0;
    struct sched_param param;

229 230 231
#if _POSIX_THREAD_PRIORITY_SCHEDULING == 0                                     \
  && defined _SC_THREAD_PRIORITY_SCHEDULING
    if (sysconf (_SC_THREAD_PRIORITY_SCHEDULING) < 0) {
232 233 234
        return;
    }
#endif
ilue's avatar
ilue committed
235
    int rc = pthread_getschedparam (pthread_self (), &policy, &param);
236 237
    posix_assert (rc);

238 239
    if (_thread_sched_policy != ZMQ_THREAD_SCHED_POLICY_DFLT) {
        policy = _thread_sched_policy;
240 241
    }

242 243 244 245 246
    /* Quoting docs:
       "Linux allows the static priority range 1 to 99 for the SCHED_FIFO and
       SCHED_RR policies, and the priority 0 for the remaining policies."
       Other policies may use the "nice value" in place of the priority:
    */
247 248
    bool use_nice_instead_priority =
      (policy != SCHED_FIFO) && (policy != SCHED_RR);
249

250
    if (_thread_priority != ZMQ_THREAD_PRIORITY_DFLT) {
251
        if (use_nice_instead_priority)
252 253
            param.sched_priority =
              0; // this is the only supported priority for most scheduling policies
254
        else
255
            param.sched_priority =
256
              _thread_priority; // user should provide a value between 1 and 99
257 258
    }

259
#ifdef __NetBSD__
260 261
    if (policy == SCHED_OTHER)
        param.sched_priority = -1;
262 263
#endif

ilue's avatar
ilue committed
264
    rc = pthread_setschedparam (pthread_self (), policy, &param);
265

266
#if defined(__FreeBSD_kernel__) || defined(__FreeBSD__)
267
    // If this feature is unavailable at run-time, don't abort.
268 269
    if (rc == ENOSYS)
        return;
270 271
#endif

272
    posix_assert (rc);
273

274
#if !defined ZMQ_HAVE_VXWORKS
275
    if (use_nice_instead_priority
276
        && _thread_priority != ZMQ_THREAD_PRIORITY_DFLT) {
277 278 279
        // assume the user wants to decrease the thread's nice value
        // i.e., increase the chance of this thread being scheduled: try setting that to
        // maximum priority.
280
        rc = nice (-20);
281 282 283 284 285

        errno_assert (rc != -1);
        // IMPORTANT: EPERM is typically returned for unprivileged processes: that's because
        //            CAP_SYS_NICE capability is required or RLIMIT_NICE resource limit should be changed to avoid EPERM!
    }
286
#endif
287 288

#ifdef ZMQ_HAVE_PTHREAD_SET_AFFINITY
289
    if (!_thread_affinity_cpus.empty ()) {
290
        cpu_set_t cpuset;
291
        CPU_ZERO (&cpuset);
292 293
        for (std::set<int>::const_iterator it = _thread_affinity_cpus.begin ();
             it != _thread_affinity_cpus.end (); it++) {
294
            CPU_SET ((int) (*it), &cpuset);
295
        }
296 297
        rc =
          pthread_setaffinity_np (pthread_self (), sizeof (cpu_set_t), &cpuset);
298 299 300
        posix_assert (rc);
    }
#endif
301
#endif
302 303
}

304
void zmq::thread_t::setThreadName (const char *name_)
305
{
306
    /* The thread name is a cosmetic string, added to ease debugging of
307 308 309 310 311 312
 * multi-threaded applications. It is not a big issue if this value
 * can not be set for any reason (such as Permission denied in some
 * cases where the application changes its EUID, etc.) The value of
 * "int rc" is retained where available, to help debuggers stepping
 * through code to see its value - but otherwise it is ignored.
 */
313 314 315
    if (!name_)
        return;

316 317 318 319 320
        /* Fails with permission denied on Android 5/6 */
#if defined(ZMQ_HAVE_ANDROID)
    return;
#endif

321
#if defined(ZMQ_HAVE_PTHREAD_SETNAME_1)
322 323 324
    int rc = pthread_setname_np (name_);
    if (rc)
        return;
325
#elif defined(ZMQ_HAVE_PTHREAD_SETNAME_2)
326
    int rc = pthread_setname_np (pthread_self (), name_);
327 328
    if (rc)
        return;
329
#elif defined(ZMQ_HAVE_PTHREAD_SETNAME_3)
330
    int rc = pthread_setname_np (pthread_self (), name_, NULL);
331 332
    if (rc)
        return;
333
#elif defined(ZMQ_HAVE_PTHREAD_SET_NAME)
334
    pthread_set_name_np (pthread_self (), name_);
335 336 337
#endif
}

Martin Sustrik's avatar
Martin Sustrik committed
338
#endif