Commit 219310b4 authored by Joel Lauener's avatar Joel Lauener

Thread scheduling parameters: Use ZMQ context options instead of

environment variables.
parent 00fe56c4
...@@ -179,10 +179,14 @@ ZMQ_EXPORT void zmq_version (int *major, int *minor, int *patch); ...@@ -179,10 +179,14 @@ ZMQ_EXPORT void zmq_version (int *major, int *minor, int *patch);
#define ZMQ_IO_THREADS 1 #define ZMQ_IO_THREADS 1
#define ZMQ_MAX_SOCKETS 2 #define ZMQ_MAX_SOCKETS 2
#define ZMQ_SOCKET_LIMIT 3 #define ZMQ_SOCKET_LIMIT 3
#define ZMQ_THREAD_PRIORITY 3
#define ZMQ_THREAD_SCHED_POLICY 4
/* Default for new contexts */ /* Default for new contexts */
#define ZMQ_IO_THREADS_DFLT 1 #define ZMQ_IO_THREADS_DFLT 1
#define ZMQ_MAX_SOCKETS_DFLT 1023 #define ZMQ_MAX_SOCKETS_DFLT 1023
#define ZMQ_THREAD_PRIORITY_DFLT -1
#define ZMQ_THREAD_SCHED_POLICY_DFLT -1
ZMQ_EXPORT void *zmq_ctx_new (void); ZMQ_EXPORT void *zmq_ctx_new (void);
ZMQ_EXPORT int zmq_ctx_term (void *context); ZMQ_EXPORT int zmq_ctx_term (void *context);
......
...@@ -57,7 +57,9 @@ zmq::ctx_t::ctx_t () : ...@@ -57,7 +57,9 @@ zmq::ctx_t::ctx_t () :
slots (NULL), slots (NULL),
max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)), max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)),
io_thread_count (ZMQ_IO_THREADS_DFLT), io_thread_count (ZMQ_IO_THREADS_DFLT),
ipv6 (false) ipv6 (false),
thread_priority (ZMQ_THREAD_PRIORITY_DFLT),
thread_sched_policy (ZMQ_THREAD_SCHED_POLICY_DFLT)
{ {
#ifdef HAVE_FORK #ifdef HAVE_FORK
pid = getpid(); pid = getpid();
...@@ -194,6 +196,18 @@ int zmq::ctx_t::set (int option_, int optval_) ...@@ -194,6 +196,18 @@ int zmq::ctx_t::set (int option_, int optval_)
ipv6 = (optval_ != 0); ipv6 = (optval_ != 0);
opt_sync.unlock (); opt_sync.unlock ();
} }
else
if (option_ == ZMQ_THREAD_PRIORITY && optval_ >= 0) {
opt_sync.lock();
thread_priority = optval_;
opt_sync.unlock();
}
else
if (option_ == ZMQ_THREAD_SCHED_POLICY && optval_ >= 0) {
opt_sync.lock();
thread_sched_policy = optval_;
opt_sync.unlock();
}
else { else {
errno = EINVAL; errno = EINVAL;
rc = -1; rc = -1;
...@@ -324,6 +338,12 @@ zmq::object_t *zmq::ctx_t::get_reaper () ...@@ -324,6 +338,12 @@ zmq::object_t *zmq::ctx_t::get_reaper ()
return reaper; return reaper;
} }
void zmq::ctx_t::start_thread (thread_t &thread_, thread_fn *tfn_, void *arg_) const
{
thread_.start(tfn_, arg_);
thread_.setSchedulingParameters(thread_priority, thread_sched_policy);
}
void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_) void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_)
{ {
slots [tid_]->send (command_); slots [tid_]->send (command_);
......
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
#include "stdint.hpp" #include "stdint.hpp"
#include "options.hpp" #include "options.hpp"
#include "atomic_counter.hpp" #include "atomic_counter.hpp"
#include "thread.hpp"
namespace zmq namespace zmq
{ {
...@@ -87,6 +88,9 @@ namespace zmq ...@@ -87,6 +88,9 @@ namespace zmq
zmq::socket_base_t *create_socket (int type_); zmq::socket_base_t *create_socket (int type_);
void destroy_socket (zmq::socket_base_t *socket_); void destroy_socket (zmq::socket_base_t *socket_);
// Start a new thread with proper scheduling parameters.
void start_thread (thread_t &thread_, thread_fn *tfn_, void *arg_) const;
// Send command to the destination thread. // Send command to the destination thread.
void send_command (uint32_t tid_, const command_t &command_); void send_command (uint32_t tid_, const command_t &command_);
...@@ -185,6 +189,10 @@ namespace zmq ...@@ -185,6 +189,10 @@ namespace zmq
// Is IPv6 enabled on this context? // Is IPv6 enabled on this context?
bool ipv6; bool ipv6;
// Thread scheduling parameters.
int thread_priority;
int thread_sched_policy;
// Synchronisation of access to context options. // Synchronisation of access to context options.
mutex_t opt_sync; mutex_t opt_sync;
......
...@@ -35,7 +35,8 @@ ...@@ -35,7 +35,8 @@
#include "config.hpp" #include "config.hpp"
#include "i_poll_events.hpp" #include "i_poll_events.hpp"
zmq::devpoll_t::devpoll_t () : zmq::devpoll_t::devpoll_t (const zmq::ctx_t &ctx_) :
ctx(ctx_),
stopping (false) stopping (false)
{ {
devpoll_fd = open ("/dev/poll", O_RDWR); devpoll_fd = open ("/dev/poll", O_RDWR);
...@@ -125,7 +126,7 @@ void zmq::devpoll_t::reset_pollout (handle_t handle_) ...@@ -125,7 +126,7 @@ void zmq::devpoll_t::reset_pollout (handle_t handle_)
void zmq::devpoll_t::start () void zmq::devpoll_t::start ()
{ {
worker.start (worker_routine, this); ctx.start_thread (worker, worker_routine, this);
} }
void zmq::devpoll_t::stop () void zmq::devpoll_t::stop ()
......
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
#include <vector> #include <vector>
#include "ctx.hpp"
#include "fd.hpp" #include "fd.hpp"
#include "thread.hpp" #include "thread.hpp"
#include "poller_base.hpp" #include "poller_base.hpp"
...@@ -43,7 +44,7 @@ namespace zmq ...@@ -43,7 +44,7 @@ namespace zmq
typedef fd_t handle_t; typedef fd_t handle_t;
devpoll_t (); devpoll_t (const ctx_t &ctx_);
~devpoll_t (); ~devpoll_t ();
// "poller" concept. // "poller" concept.
...@@ -66,6 +67,9 @@ namespace zmq ...@@ -66,6 +67,9 @@ namespace zmq
// Main event loop. // Main event loop.
void loop (); void loop ();
// Reference to ZMQ context.
const ctx_t &ctx;
// File descriptor referring to "/dev/poll" pseudo-device. // File descriptor referring to "/dev/poll" pseudo-device.
fd_t devpoll_fd; fd_t devpoll_fd;
......
...@@ -32,7 +32,8 @@ ...@@ -32,7 +32,8 @@
#include "config.hpp" #include "config.hpp"
#include "i_poll_events.hpp" #include "i_poll_events.hpp"
zmq::epoll_t::epoll_t () : zmq::epoll_t::epoll_t (const zmq::ctx_t &ctx_) :
ctx(ctx_),
stopping (false) stopping (false)
{ {
epoll_fd = epoll_create (1); epoll_fd = epoll_create (1);
...@@ -118,7 +119,7 @@ void zmq::epoll_t::reset_pollout (handle_t handle_) ...@@ -118,7 +119,7 @@ void zmq::epoll_t::reset_pollout (handle_t handle_)
void zmq::epoll_t::start () void zmq::epoll_t::start ()
{ {
worker.start (worker_routine, this); ctx.start_thread (worker, worker_routine, this);
} }
void zmq::epoll_t::stop () void zmq::epoll_t::stop ()
......
...@@ -27,6 +27,7 @@ ...@@ -27,6 +27,7 @@
#include <vector> #include <vector>
#include <sys/epoll.h> #include <sys/epoll.h>
#include "ctx.hpp"
#include "fd.hpp" #include "fd.hpp"
#include "thread.hpp" #include "thread.hpp"
#include "poller_base.hpp" #include "poller_base.hpp"
...@@ -45,7 +46,7 @@ namespace zmq ...@@ -45,7 +46,7 @@ namespace zmq
typedef void* handle_t; typedef void* handle_t;
epoll_t (); epoll_t (const ctx_t &ctx_);
~epoll_t (); ~epoll_t ();
// "poller" concept. // "poller" concept.
...@@ -68,6 +69,9 @@ namespace zmq ...@@ -68,6 +69,9 @@ namespace zmq
// Main event loop. // Main event loop.
void loop (); void loop ();
// Reference to ZMQ context.
const ctx_t &ctx;
// Main epoll file descriptor // Main epoll file descriptor
fd_t epoll_fd; fd_t epoll_fd;
......
...@@ -27,7 +27,7 @@ ...@@ -27,7 +27,7 @@
zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) : zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) :
object_t (ctx_, tid_) object_t (ctx_, tid_)
{ {
poller = new (std::nothrow) poller_t; poller = new (std::nothrow) poller_t (*ctx_);
alloc_assert (poller); alloc_assert (poller);
mailbox_handle = poller->add_fd (mailbox.get_fd (), this); mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
......
...@@ -42,7 +42,8 @@ ...@@ -42,7 +42,8 @@
#define kevent_udata_t void * #define kevent_udata_t void *
#endif #endif
zmq::kqueue_t::kqueue_t () : zmq::kqueue_t::kqueue_t (const zmq::ctx_t &ctx_) :
ctx(ctx_),
stopping (false) stopping (false)
{ {
// Create event queue // Create event queue
...@@ -144,7 +145,7 @@ void zmq::kqueue_t::reset_pollout (handle_t handle_) ...@@ -144,7 +145,7 @@ void zmq::kqueue_t::reset_pollout (handle_t handle_)
void zmq::kqueue_t::start () void zmq::kqueue_t::start ()
{ {
worker.start (worker_routine, this); ctx.start_thread (worker, worker_routine, this);
} }
void zmq::kqueue_t::stop () void zmq::kqueue_t::stop ()
......
...@@ -27,6 +27,7 @@ ...@@ -27,6 +27,7 @@
#include <vector> #include <vector>
#include <unistd.h> #include <unistd.h>
#include "ctx.hpp"
#include "fd.hpp" #include "fd.hpp"
#include "thread.hpp" #include "thread.hpp"
#include "poller_base.hpp" #include "poller_base.hpp"
...@@ -45,7 +46,7 @@ namespace zmq ...@@ -45,7 +46,7 @@ namespace zmq
typedef void* handle_t; typedef void* handle_t;
kqueue_t (); kqueue_t (const ctx_t &ctx_);
~kqueue_t (); ~kqueue_t ();
// "poller" concept. // "poller" concept.
...@@ -68,6 +69,9 @@ namespace zmq ...@@ -68,6 +69,9 @@ namespace zmq
// Main event loop. // Main event loop.
void loop (); void loop ();
// Reference to ZMQ context.
const ctx_t &ctx;
// File descriptor referring to the kernel event queue. // File descriptor referring to the kernel event queue.
fd_t kqueue_fd; fd_t kqueue_fd;
......
...@@ -30,7 +30,8 @@ ...@@ -30,7 +30,8 @@
#include "config.hpp" #include "config.hpp"
#include "i_poll_events.hpp" #include "i_poll_events.hpp"
zmq::poll_t::poll_t () : zmq::poll_t::poll_t (const zmq::ctx_t &ctx_) :
ctx(ctx_),
retired (false), retired (false),
stopping (false) stopping (false)
{ {
...@@ -106,7 +107,7 @@ void zmq::poll_t::reset_pollout (handle_t handle_) ...@@ -106,7 +107,7 @@ void zmq::poll_t::reset_pollout (handle_t handle_)
void zmq::poll_t::start () void zmq::poll_t::start ()
{ {
worker.start (worker_routine, this); ctx.start_thread (worker, worker_routine, this);
} }
void zmq::poll_t::stop () void zmq::poll_t::stop ()
......
...@@ -28,6 +28,7 @@ ...@@ -28,6 +28,7 @@
#include <stddef.h> #include <stddef.h>
#include <vector> #include <vector>
#include "ctx.hpp"
#include "fd.hpp" #include "fd.hpp"
#include "thread.hpp" #include "thread.hpp"
#include "poller_base.hpp" #include "poller_base.hpp"
...@@ -46,7 +47,7 @@ namespace zmq ...@@ -46,7 +47,7 @@ namespace zmq
typedef fd_t handle_t; typedef fd_t handle_t;
poll_t (); poll_t (const ctx_t &ctx_);
~poll_t (); ~poll_t ();
// "poller" concept. // "poller" concept.
...@@ -69,6 +70,9 @@ namespace zmq ...@@ -69,6 +70,9 @@ namespace zmq
// Main event loop. // Main event loop.
void loop (); void loop ();
// Reference to ZMQ context.
const ctx_t &ctx;
struct fd_entry_t struct fd_entry_t
{ {
fd_t index; fd_t index;
......
...@@ -26,7 +26,7 @@ zmq::reaper_t::reaper_t (class ctx_t *ctx_, uint32_t tid_) : ...@@ -26,7 +26,7 @@ zmq::reaper_t::reaper_t (class ctx_t *ctx_, uint32_t tid_) :
sockets (0), sockets (0),
terminating (false) terminating (false)
{ {
poller = new (std::nothrow) poller_t; poller = new (std::nothrow) poller_t (*ctx_);
alloc_assert (poller); alloc_assert (poller);
mailbox_handle = poller->add_fd (mailbox.get_fd (), this); mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
......
...@@ -41,7 +41,8 @@ ...@@ -41,7 +41,8 @@
#include "config.hpp" #include "config.hpp"
#include "i_poll_events.hpp" #include "i_poll_events.hpp"
zmq::select_t::select_t () : zmq::select_t::select_t (const zmq::ctx_t &ctx_) :
ctx(ctx_),
maxfd (retired_fd), maxfd (retired_fd),
retired (false), retired (false),
stopping (false) stopping (false)
...@@ -136,7 +137,7 @@ void zmq::select_t::reset_pollout (handle_t handle_) ...@@ -136,7 +137,7 @@ void zmq::select_t::reset_pollout (handle_t handle_)
void zmq::select_t::start () void zmq::select_t::start ()
{ {
worker.start (worker_routine, this); ctx.start_thread (worker, worker_routine, this);
} }
void zmq::select_t::stop () void zmq::select_t::stop ()
......
...@@ -38,6 +38,7 @@ ...@@ -38,6 +38,7 @@
#include <sys/select.h> #include <sys/select.h>
#endif #endif
#include "ctx.hpp"
#include "fd.hpp" #include "fd.hpp"
#include "thread.hpp" #include "thread.hpp"
#include "poller_base.hpp" #include "poller_base.hpp"
...@@ -56,7 +57,7 @@ namespace zmq ...@@ -56,7 +57,7 @@ namespace zmq
typedef fd_t handle_t; typedef fd_t handle_t;
select_t (); select_t (const ctx_t &ctx_);
~select_t (); ~select_t ();
// "poller" concept. // "poller" concept.
...@@ -79,6 +80,9 @@ namespace zmq ...@@ -79,6 +80,9 @@ namespace zmq
// Main event loop. // Main event loop.
void loop (); void loop ();
// Reference to ZMQ context.
const ctx_t &ctx;
struct fd_entry_t struct fd_entry_t
{ {
fd_t fd; fd_t fd;
......
...@@ -59,10 +59,14 @@ void zmq::thread_t::stop () ...@@ -59,10 +59,14 @@ void zmq::thread_t::stop ()
win_assert (rc2 != 0); win_assert (rc2 != 0);
} }
void zmq::thread_t::setSchedulingParameters(int priority_, int schedulingPolicy_)
{
// not implemented
}
#else #else
#include <signal.h> #include <signal.h>
#include <sstream>
extern "C" extern "C"
{ {
...@@ -84,33 +88,12 @@ extern "C" ...@@ -84,33 +88,12 @@ extern "C"
} }
} }
bool getenvi (const char *env_, int &result_)
{
char *str = getenv (env_);
if (str == NULL)
return false;
std::stringstream ss(str);
return ss >> result_;
}
void zmq::thread_t::start (thread_fn *tfn_, void *arg_) void zmq::thread_t::start (thread_fn *tfn_, void *arg_)
{ {
tfn = tfn_; tfn = tfn_;
arg = arg_; arg = arg_;
int rc = pthread_create (&descriptor, NULL, thread_routine, this); int rc = pthread_create (&descriptor, NULL, thread_routine, this);
posix_assert (rc); posix_assert (rc);
int prio;
if (getenvi ("ZMQ_THREAD_PRIO", prio)) {
int policy = SCHED_RR;
getenvi ("ZMQ_THREAD_POLICY", policy);
struct sched_param param;
param.sched_priority = prio;
rc = pthread_setschedparam (descriptor, policy, &param);
posix_assert (rc);
}
} }
void zmq::thread_t::stop () void zmq::thread_t::stop ()
...@@ -119,6 +102,28 @@ void zmq::thread_t::stop () ...@@ -119,6 +102,28 @@ void zmq::thread_t::stop ()
posix_assert (rc); posix_assert (rc);
} }
void zmq::thread_t::setSchedulingParameters(int priority_, int schedulingPolicy_)
{
int policy = 0;
struct sched_param param;
int rc = pthread_getschedparam(descriptor, &policy, &param);
posix_assert (rc);
if(priority_ != -1)
{
param.sched_priority = priority_;
}
if(schedulingPolicy_ != -1)
{
policy = schedulingPolicy_;
}
rc = pthread_setschedparam(descriptor, policy, &param);
posix_assert (rc);
}
#endif #endif
......
...@@ -55,6 +55,10 @@ namespace zmq ...@@ -55,6 +55,10 @@ namespace zmq
// Waits for thread termination. // Waits for thread termination.
void stop (); void stop ();
// Sets the thread scheduling parameters. Only implemented for
// pthread. Has no effect on other platforms.
void setSchedulingParameters(int priority_, int schedulingPolicy_);
// These are internal members. They should be private, however then // These are internal members. They should be private, however then
// they would not be accessible from the main C routine of the thread. // they would not be accessible from the main C routine of the thread.
thread_fn *tfn; thread_fn *tfn;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment