Commit 13ed7114 authored by Pieter Hintjens's avatar Pieter Hintjens

Merge pull request #1117 from jlauenercern/master

Thread scheduling parameters: Use ZMQ context options instead of environment variables.
parents 00fe56c4 219310b4
......@@ -179,10 +179,14 @@ ZMQ_EXPORT void zmq_version (int *major, int *minor, int *patch);
#define ZMQ_IO_THREADS 1
#define ZMQ_MAX_SOCKETS 2
#define ZMQ_SOCKET_LIMIT 3
#define ZMQ_THREAD_PRIORITY 3
#define ZMQ_THREAD_SCHED_POLICY 4
/* Default for new contexts */
#define ZMQ_IO_THREADS_DFLT 1
#define ZMQ_MAX_SOCKETS_DFLT 1023
#define ZMQ_THREAD_PRIORITY_DFLT -1
#define ZMQ_THREAD_SCHED_POLICY_DFLT -1
ZMQ_EXPORT void *zmq_ctx_new (void);
ZMQ_EXPORT int zmq_ctx_term (void *context);
......
......@@ -57,7 +57,9 @@ zmq::ctx_t::ctx_t () :
slots (NULL),
max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)),
io_thread_count (ZMQ_IO_THREADS_DFLT),
ipv6 (false)
ipv6 (false),
thread_priority (ZMQ_THREAD_PRIORITY_DFLT),
thread_sched_policy (ZMQ_THREAD_SCHED_POLICY_DFLT)
{
#ifdef HAVE_FORK
pid = getpid();
......@@ -194,6 +196,18 @@ int zmq::ctx_t::set (int option_, int optval_)
ipv6 = (optval_ != 0);
opt_sync.unlock ();
}
else
if (option_ == ZMQ_THREAD_PRIORITY && optval_ >= 0) {
opt_sync.lock();
thread_priority = optval_;
opt_sync.unlock();
}
else
if (option_ == ZMQ_THREAD_SCHED_POLICY && optval_ >= 0) {
opt_sync.lock();
thread_sched_policy = optval_;
opt_sync.unlock();
}
else {
errno = EINVAL;
rc = -1;
......@@ -324,6 +338,12 @@ zmq::object_t *zmq::ctx_t::get_reaper ()
return reaper;
}
void zmq::ctx_t::start_thread (thread_t &thread_, thread_fn *tfn_, void *arg_) const
{
thread_.start(tfn_, arg_);
thread_.setSchedulingParameters(thread_priority, thread_sched_policy);
}
void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_)
{
slots [tid_]->send (command_);
......
......@@ -32,6 +32,7 @@
#include "stdint.hpp"
#include "options.hpp"
#include "atomic_counter.hpp"
#include "thread.hpp"
namespace zmq
{
......@@ -87,6 +88,9 @@ namespace zmq
zmq::socket_base_t *create_socket (int type_);
void destroy_socket (zmq::socket_base_t *socket_);
// Start a new thread with proper scheduling parameters.
void start_thread (thread_t &thread_, thread_fn *tfn_, void *arg_) const;
// Send command to the destination thread.
void send_command (uint32_t tid_, const command_t &command_);
......@@ -185,6 +189,10 @@ namespace zmq
// Is IPv6 enabled on this context?
bool ipv6;
// Thread scheduling parameters.
int thread_priority;
int thread_sched_policy;
// Synchronisation of access to context options.
mutex_t opt_sync;
......
......@@ -35,7 +35,8 @@
#include "config.hpp"
#include "i_poll_events.hpp"
zmq::devpoll_t::devpoll_t () :
zmq::devpoll_t::devpoll_t (const zmq::ctx_t &ctx_) :
ctx(ctx_),
stopping (false)
{
devpoll_fd = open ("/dev/poll", O_RDWR);
......@@ -125,7 +126,7 @@ void zmq::devpoll_t::reset_pollout (handle_t handle_)
void zmq::devpoll_t::start ()
{
worker.start (worker_routine, this);
ctx.start_thread (worker, worker_routine, this);
}
void zmq::devpoll_t::stop ()
......
......@@ -26,6 +26,7 @@
#include <vector>
#include "ctx.hpp"
#include "fd.hpp"
#include "thread.hpp"
#include "poller_base.hpp"
......@@ -43,7 +44,7 @@ namespace zmq
typedef fd_t handle_t;
devpoll_t ();
devpoll_t (const ctx_t &ctx_);
~devpoll_t ();
// "poller" concept.
......@@ -66,6 +67,9 @@ namespace zmq
// Main event loop.
void loop ();
// Reference to ZMQ context.
const ctx_t &ctx;
// File descriptor referring to "/dev/poll" pseudo-device.
fd_t devpoll_fd;
......
......@@ -32,7 +32,8 @@
#include "config.hpp"
#include "i_poll_events.hpp"
zmq::epoll_t::epoll_t () :
zmq::epoll_t::epoll_t (const zmq::ctx_t &ctx_) :
ctx(ctx_),
stopping (false)
{
epoll_fd = epoll_create (1);
......@@ -118,7 +119,7 @@ void zmq::epoll_t::reset_pollout (handle_t handle_)
void zmq::epoll_t::start ()
{
worker.start (worker_routine, this);
ctx.start_thread (worker, worker_routine, this);
}
void zmq::epoll_t::stop ()
......
......@@ -27,6 +27,7 @@
#include <vector>
#include <sys/epoll.h>
#include "ctx.hpp"
#include "fd.hpp"
#include "thread.hpp"
#include "poller_base.hpp"
......@@ -45,7 +46,7 @@ namespace zmq
typedef void* handle_t;
epoll_t ();
epoll_t (const ctx_t &ctx_);
~epoll_t ();
// "poller" concept.
......@@ -68,6 +69,9 @@ namespace zmq
// Main event loop.
void loop ();
// Reference to ZMQ context.
const ctx_t &ctx;
// Main epoll file descriptor
fd_t epoll_fd;
......
......@@ -27,7 +27,7 @@
zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) :
object_t (ctx_, tid_)
{
poller = new (std::nothrow) poller_t;
poller = new (std::nothrow) poller_t (*ctx_);
alloc_assert (poller);
mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
......
......@@ -42,7 +42,8 @@
#define kevent_udata_t void *
#endif
zmq::kqueue_t::kqueue_t () :
zmq::kqueue_t::kqueue_t (const zmq::ctx_t &ctx_) :
ctx(ctx_),
stopping (false)
{
// Create event queue
......@@ -144,7 +145,7 @@ void zmq::kqueue_t::reset_pollout (handle_t handle_)
void zmq::kqueue_t::start ()
{
worker.start (worker_routine, this);
ctx.start_thread (worker, worker_routine, this);
}
void zmq::kqueue_t::stop ()
......
......@@ -27,6 +27,7 @@
#include <vector>
#include <unistd.h>
#include "ctx.hpp"
#include "fd.hpp"
#include "thread.hpp"
#include "poller_base.hpp"
......@@ -45,7 +46,7 @@ namespace zmq
typedef void* handle_t;
kqueue_t ();
kqueue_t (const ctx_t &ctx_);
~kqueue_t ();
// "poller" concept.
......@@ -68,6 +69,9 @@ namespace zmq
// Main event loop.
void loop ();
// Reference to ZMQ context.
const ctx_t &ctx;
// File descriptor referring to the kernel event queue.
fd_t kqueue_fd;
......
......@@ -30,7 +30,8 @@
#include "config.hpp"
#include "i_poll_events.hpp"
zmq::poll_t::poll_t () :
zmq::poll_t::poll_t (const zmq::ctx_t &ctx_) :
ctx(ctx_),
retired (false),
stopping (false)
{
......@@ -106,7 +107,7 @@ void zmq::poll_t::reset_pollout (handle_t handle_)
void zmq::poll_t::start ()
{
worker.start (worker_routine, this);
ctx.start_thread (worker, worker_routine, this);
}
void zmq::poll_t::stop ()
......
......@@ -28,6 +28,7 @@
#include <stddef.h>
#include <vector>
#include "ctx.hpp"
#include "fd.hpp"
#include "thread.hpp"
#include "poller_base.hpp"
......@@ -46,7 +47,7 @@ namespace zmq
typedef fd_t handle_t;
poll_t ();
poll_t (const ctx_t &ctx_);
~poll_t ();
// "poller" concept.
......@@ -69,6 +70,9 @@ namespace zmq
// Main event loop.
void loop ();
// Reference to ZMQ context.
const ctx_t &ctx;
struct fd_entry_t
{
fd_t index;
......
......@@ -26,7 +26,7 @@ zmq::reaper_t::reaper_t (class ctx_t *ctx_, uint32_t tid_) :
sockets (0),
terminating (false)
{
poller = new (std::nothrow) poller_t;
poller = new (std::nothrow) poller_t (*ctx_);
alloc_assert (poller);
mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
......
......@@ -41,7 +41,8 @@
#include "config.hpp"
#include "i_poll_events.hpp"
zmq::select_t::select_t () :
zmq::select_t::select_t (const zmq::ctx_t &ctx_) :
ctx(ctx_),
maxfd (retired_fd),
retired (false),
stopping (false)
......@@ -136,7 +137,7 @@ void zmq::select_t::reset_pollout (handle_t handle_)
void zmq::select_t::start ()
{
worker.start (worker_routine, this);
ctx.start_thread (worker, worker_routine, this);
}
void zmq::select_t::stop ()
......
......@@ -38,6 +38,7 @@
#include <sys/select.h>
#endif
#include "ctx.hpp"
#include "fd.hpp"
#include "thread.hpp"
#include "poller_base.hpp"
......@@ -56,7 +57,7 @@ namespace zmq
typedef fd_t handle_t;
select_t ();
select_t (const ctx_t &ctx_);
~select_t ();
// "poller" concept.
......@@ -79,6 +80,9 @@ namespace zmq
// Main event loop.
void loop ();
// Reference to ZMQ context.
const ctx_t &ctx;
struct fd_entry_t
{
fd_t fd;
......
......@@ -59,10 +59,14 @@ void zmq::thread_t::stop ()
win_assert (rc2 != 0);
}
void zmq::thread_t::setSchedulingParameters(int priority_, int schedulingPolicy_)
{
// not implemented
}
#else
#include <signal.h>
#include <sstream>
extern "C"
{
......@@ -84,33 +88,12 @@ extern "C"
}
}
bool getenvi (const char *env_, int &result_)
{
char *str = getenv (env_);
if (str == NULL)
return false;
std::stringstream ss(str);
return ss >> result_;
}
void zmq::thread_t::start (thread_fn *tfn_, void *arg_)
{
tfn = tfn_;
arg = arg_;
int rc = pthread_create (&descriptor, NULL, thread_routine, this);
posix_assert (rc);
int prio;
if (getenvi ("ZMQ_THREAD_PRIO", prio)) {
int policy = SCHED_RR;
getenvi ("ZMQ_THREAD_POLICY", policy);
struct sched_param param;
param.sched_priority = prio;
rc = pthread_setschedparam (descriptor, policy, &param);
posix_assert (rc);
}
}
void zmq::thread_t::stop ()
......@@ -119,6 +102,28 @@ void zmq::thread_t::stop ()
posix_assert (rc);
}
void zmq::thread_t::setSchedulingParameters(int priority_, int schedulingPolicy_)
{
int policy = 0;
struct sched_param param;
int rc = pthread_getschedparam(descriptor, &policy, &param);
posix_assert (rc);
if(priority_ != -1)
{
param.sched_priority = priority_;
}
if(schedulingPolicy_ != -1)
{
policy = schedulingPolicy_;
}
rc = pthread_setschedparam(descriptor, policy, &param);
posix_assert (rc);
}
#endif
......
......@@ -55,6 +55,10 @@ namespace zmq
// Waits for thread termination.
void stop ();
// Sets the thread scheduling parameters. Only implemented for
// pthread. Has no effect on other platforms.
void setSchedulingParameters(int priority_, int schedulingPolicy_);
// These are internal members. They should be private, however then
// they would not be accessible from the main C routine of the thread.
thread_fn *tfn;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment