Commit f87d3ab2 authored by Simon Giesecke's avatar Simon Giesecke Committed by Luca Boccassi

Problem: pollers unnecessarily depend on whole ctx_t, but actually use only start_thread method

Solution: extract thread_ctx_t from ctx_t
parent 687f6a69
...@@ -76,9 +76,7 @@ zmq::ctx_t::ctx_t () : ...@@ -76,9 +76,7 @@ zmq::ctx_t::ctx_t () :
max_msgsz (INT_MAX), max_msgsz (INT_MAX),
io_thread_count (ZMQ_IO_THREADS_DFLT), io_thread_count (ZMQ_IO_THREADS_DFLT),
blocky (true), blocky (true),
ipv6 (false), ipv6 (false)
thread_priority (ZMQ_THREAD_PRIORITY_DFLT),
thread_sched_policy (ZMQ_THREAD_SCHED_POLICY_DFLT)
{ {
#ifdef HAVE_FORK #ifdef HAVE_FORK
pid = getpid (); pid = getpid ();
...@@ -241,29 +239,6 @@ int zmq::ctx_t::set (int option_, int optval_) ...@@ -241,29 +239,6 @@ int zmq::ctx_t::set (int option_, int optval_)
} else if (option_ == ZMQ_IPV6 && optval_ >= 0) { } else if (option_ == ZMQ_IPV6 && optval_ >= 0) {
scoped_lock_t locker (opt_sync); scoped_lock_t locker (opt_sync);
ipv6 = (optval_ != 0); ipv6 = (optval_ != 0);
} else if (option_ == ZMQ_THREAD_PRIORITY && optval_ >= 0) {
scoped_lock_t locker (opt_sync);
thread_priority = optval_;
} else if (option_ == ZMQ_THREAD_SCHED_POLICY && optval_ >= 0) {
scoped_lock_t locker (opt_sync);
thread_sched_policy = optval_;
} else if (option_ == ZMQ_THREAD_AFFINITY_CPU_ADD && optval_ >= 0) {
scoped_lock_t locker (opt_sync);
thread_affinity_cpus.insert (optval_);
} else if (option_ == ZMQ_THREAD_AFFINITY_CPU_REMOVE && optval_ >= 0) {
scoped_lock_t locker (opt_sync);
std::set<int>::iterator it = thread_affinity_cpus.find (optval_);
if (it != thread_affinity_cpus.end ()) {
thread_affinity_cpus.erase (it);
} else {
errno = EINVAL;
rc = -1;
}
} else if (option_ == ZMQ_THREAD_NAME_PREFIX && optval_ >= 0) {
std::ostringstream s;
s << optval_;
scoped_lock_t locker (opt_sync);
thread_name_prefix = s.str ();
} else if (option_ == ZMQ_BLOCKY && optval_ >= 0) { } else if (option_ == ZMQ_BLOCKY && optval_ >= 0) {
scoped_lock_t locker (opt_sync); scoped_lock_t locker (opt_sync);
blocky = (optval_ != 0); blocky = (optval_ != 0);
...@@ -271,8 +246,7 @@ int zmq::ctx_t::set (int option_, int optval_) ...@@ -271,8 +246,7 @@ int zmq::ctx_t::set (int option_, int optval_)
scoped_lock_t locker (opt_sync); scoped_lock_t locker (opt_sync);
max_msgsz = optval_ < INT_MAX ? optval_ : INT_MAX; max_msgsz = optval_ < INT_MAX ? optval_ : INT_MAX;
} else { } else {
errno = EINVAL; rc = thread_ctx_t::set (option_, optval_);
rc = -1;
} }
return rc; return rc;
} }
...@@ -434,9 +408,15 @@ zmq::object_t *zmq::ctx_t::get_reaper () ...@@ -434,9 +408,15 @@ zmq::object_t *zmq::ctx_t::get_reaper ()
return reaper; return reaper;
} }
void zmq::ctx_t::start_thread (thread_t &thread_, zmq::thread_ctx_t::thread_ctx_t () :
thread_fn *tfn_, thread_priority (ZMQ_THREAD_PRIORITY_DFLT),
void *arg_) const thread_sched_policy (ZMQ_THREAD_SCHED_POLICY_DFLT)
{
}
void zmq::thread_ctx_t::start_thread (thread_t &thread_,
thread_fn *tfn_,
void *arg_) const
{ {
static unsigned int nthreads_started = 0; static unsigned int nthreads_started = 0;
...@@ -453,6 +433,39 @@ void zmq::ctx_t::start_thread (thread_t &thread_, ...@@ -453,6 +433,39 @@ void zmq::ctx_t::start_thread (thread_t &thread_,
nthreads_started++; nthreads_started++;
} }
int zmq::thread_ctx_t::set (int option_, int optval_)
{
int rc = 0;
if (option_ == ZMQ_THREAD_SCHED_POLICY && optval_ >= 0) {
scoped_lock_t locker (opt_sync);
thread_sched_policy = optval_;
} else if (option_ == ZMQ_THREAD_AFFINITY_CPU_ADD && optval_ >= 0) {
scoped_lock_t locker (opt_sync);
thread_affinity_cpus.insert (optval_);
} else if (option_ == ZMQ_THREAD_AFFINITY_CPU_REMOVE && optval_ >= 0) {
scoped_lock_t locker (opt_sync);
std::set<int>::iterator it = thread_affinity_cpus.find (optval_);
if (it != thread_affinity_cpus.end ()) {
thread_affinity_cpus.erase (it);
} else {
errno = EINVAL;
rc = -1;
}
} else if (option_ == ZMQ_THREAD_NAME_PREFIX && optval_ >= 0) {
std::ostringstream s;
s << optval_;
scoped_lock_t locker (opt_sync);
thread_name_prefix = s.str ();
} else if (option_ == ZMQ_THREAD_PRIORITY && optval_ >= 0) {
scoped_lock_t locker (opt_sync);
thread_priority = optval_;
} else {
errno = EINVAL;
rc = -1;
}
return rc;
}
void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_) void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_)
{ {
slots[tid_]->send (command_); slots[tid_]->send (command_);
......
...@@ -61,10 +61,32 @@ struct endpoint_t ...@@ -61,10 +61,32 @@ struct endpoint_t
options_t options; options_t options;
}; };
class thread_ctx_t
{
public:
thread_ctx_t ();
// Start a new thread with proper scheduling parameters.
void start_thread (thread_t &thread_, thread_fn *tfn_, void *arg_) const;
int set (int option_, int optval_);
protected:
// Synchronisation of access to context options.
mutex_t opt_sync;
private:
// Thread parameters.
int thread_priority;
int thread_sched_policy;
std::set<int> thread_affinity_cpus;
std::string thread_name_prefix;
};
// Context object encapsulates all the global state associated with // Context object encapsulates all the global state associated with
// the library. // the library.
class ctx_t class ctx_t : public thread_ctx_t
{ {
public: public:
// Create the context object. // Create the context object.
...@@ -96,9 +118,6 @@ class ctx_t ...@@ -96,9 +118,6 @@ class ctx_t
zmq::socket_base_t *create_socket (int type_); zmq::socket_base_t *create_socket (int type_);
void destroy_socket (zmq::socket_base_t *socket_); void destroy_socket (zmq::socket_base_t *socket_);
// Start a new thread with proper scheduling parameters.
void start_thread (thread_t &thread_, thread_fn *tfn_, void *arg_) const;
// Send command to the destination thread. // Send command to the destination thread.
void send_command (uint32_t tid_, const command_t &command_); void send_command (uint32_t tid_, const command_t &command_);
...@@ -215,15 +234,6 @@ class ctx_t ...@@ -215,15 +234,6 @@ class ctx_t
// Is IPv6 enabled on this context? // Is IPv6 enabled on this context?
bool ipv6; bool ipv6;
// Thread parameters.
int thread_priority;
int thread_sched_policy;
std::set<int> thread_affinity_cpus;
std::string thread_name_prefix;
// Synchronisation of access to context options.
mutex_t opt_sync;
ctx_t (const ctx_t &); ctx_t (const ctx_t &);
const ctx_t &operator= (const ctx_t &); const ctx_t &operator= (const ctx_t &);
......
...@@ -46,7 +46,7 @@ ...@@ -46,7 +46,7 @@
#include "config.hpp" #include "config.hpp"
#include "i_poll_events.hpp" #include "i_poll_events.hpp"
zmq::devpoll_t::devpoll_t (const zmq::ctx_t &ctx_) : zmq::devpoll_t::devpoll_t (const zmq::thread_ctx_t &ctx_) :
ctx (ctx_), ctx (ctx_),
stopping (false) stopping (false)
{ {
......
...@@ -52,7 +52,7 @@ class devpoll_t : public poller_base_t ...@@ -52,7 +52,7 @@ class devpoll_t : public poller_base_t
public: public:
typedef fd_t handle_t; typedef fd_t handle_t;
devpoll_t (const ctx_t &ctx_); devpoll_t (const thread_ctx_t &ctx_);
~devpoll_t (); ~devpoll_t ();
// "poller" concept. // "poller" concept.
...@@ -75,7 +75,7 @@ class devpoll_t : public poller_base_t ...@@ -75,7 +75,7 @@ class devpoll_t : public poller_base_t
void loop (); void loop ();
// Reference to ZMQ context. // Reference to ZMQ context.
const ctx_t &ctx; const thread_ctx_t &ctx;
// File descriptor referring to "/dev/poll" pseudo-device. // File descriptor referring to "/dev/poll" pseudo-device.
fd_t devpoll_fd; fd_t devpoll_fd;
......
...@@ -44,7 +44,9 @@ ...@@ -44,7 +44,9 @@
#include "config.hpp" #include "config.hpp"
#include "i_poll_events.hpp" #include "i_poll_events.hpp"
zmq::epoll_t::epoll_t (const zmq::ctx_t &ctx_) : ctx (ctx_), stopping (false) zmq::epoll_t::epoll_t (const zmq::thread_ctx_t &ctx_) :
ctx (ctx_),
stopping (false)
{ {
#ifdef ZMQ_USE_EPOLL_CLOEXEC #ifdef ZMQ_USE_EPOLL_CLOEXEC
// Setting this option result in sane behaviour when exec() functions // Setting this option result in sane behaviour when exec() functions
......
...@@ -55,7 +55,7 @@ class epoll_t : public poller_base_t ...@@ -55,7 +55,7 @@ class epoll_t : public poller_base_t
public: public:
typedef void *handle_t; typedef void *handle_t;
epoll_t (const ctx_t &ctx_); epoll_t (const thread_ctx_t &ctx_);
~epoll_t (); ~epoll_t ();
// "poller" concept. // "poller" concept.
...@@ -78,7 +78,7 @@ class epoll_t : public poller_base_t ...@@ -78,7 +78,7 @@ class epoll_t : public poller_base_t
void loop (); void loop ();
// Reference to ZMQ context. // Reference to ZMQ context.
const ctx_t &ctx; const thread_ctx_t &ctx;
// Main epoll file descriptor // Main epoll file descriptor
fd_t epoll_fd; fd_t epoll_fd;
......
...@@ -54,7 +54,9 @@ ...@@ -54,7 +54,9 @@
#define kevent_udata_t void * #define kevent_udata_t void *
#endif #endif
zmq::kqueue_t::kqueue_t (const zmq::ctx_t &ctx_) : ctx (ctx_), stopping (false) zmq::kqueue_t::kqueue_t (const zmq::thread_ctx_t &ctx_) :
ctx (ctx_),
stopping (false)
{ {
// Create event queue // Create event queue
kqueue_fd = kqueue (); kqueue_fd = kqueue ();
......
...@@ -54,7 +54,7 @@ class kqueue_t : public poller_base_t ...@@ -54,7 +54,7 @@ class kqueue_t : public poller_base_t
public: public:
typedef void *handle_t; typedef void *handle_t;
kqueue_t (const ctx_t &ctx_); kqueue_t (const thread_ctx_t &ctx_);
~kqueue_t (); ~kqueue_t ();
// "poller" concept. // "poller" concept.
...@@ -77,7 +77,7 @@ class kqueue_t : public poller_base_t ...@@ -77,7 +77,7 @@ class kqueue_t : public poller_base_t
void loop (); void loop ();
// Reference to ZMQ context. // Reference to ZMQ context.
const ctx_t &ctx; const thread_ctx_t &ctx;
// File descriptor referring to the kernel event queue. // File descriptor referring to the kernel event queue.
fd_t kqueue_fd; fd_t kqueue_fd;
......
...@@ -43,7 +43,7 @@ ...@@ -43,7 +43,7 @@
#include "config.hpp" #include "config.hpp"
#include "i_poll_events.hpp" #include "i_poll_events.hpp"
zmq::poll_t::poll_t (const zmq::ctx_t &ctx_) : zmq::poll_t::poll_t (const zmq::thread_ctx_t &ctx_) :
ctx (ctx_), ctx (ctx_),
retired (false), retired (false),
stopping (false) stopping (false)
......
...@@ -57,7 +57,7 @@ class poll_t : public poller_base_t ...@@ -57,7 +57,7 @@ class poll_t : public poller_base_t
public: public:
typedef fd_t handle_t; typedef fd_t handle_t;
poll_t (const ctx_t &ctx_); poll_t (const thread_ctx_t &ctx_);
~poll_t (); ~poll_t ();
// "poller" concept. // "poller" concept.
...@@ -82,7 +82,7 @@ class poll_t : public poller_base_t ...@@ -82,7 +82,7 @@ class poll_t : public poller_base_t
void cleanup_retired(); void cleanup_retired();
// Reference to ZMQ context. // Reference to ZMQ context.
const ctx_t &ctx; const thread_ctx_t &ctx;
struct fd_entry_t struct fd_entry_t
{ {
......
...@@ -42,7 +42,7 @@ ...@@ -42,7 +42,7 @@
#include "config.hpp" #include "config.hpp"
#include "i_poll_events.hpp" #include "i_poll_events.hpp"
zmq::pollset_t::pollset_t (const zmq::ctx_t &ctx_) : zmq::pollset_t::pollset_t (const zmq::thread_ctx_t &ctx_) :
ctx (ctx_), ctx (ctx_),
stopping (false) stopping (false)
{ {
......
...@@ -55,7 +55,7 @@ class pollset_t : public poller_base_t ...@@ -55,7 +55,7 @@ class pollset_t : public poller_base_t
public: public:
typedef void *handle_t; typedef void *handle_t;
pollset_t (const ctx_t &ctx_); pollset_t (const thread_ctx_t &ctx_);
~pollset_t (); ~pollset_t ();
// "poller" concept. // "poller" concept.
...@@ -78,7 +78,7 @@ class pollset_t : public poller_base_t ...@@ -78,7 +78,7 @@ class pollset_t : public poller_base_t
void loop (); void loop ();
// Reference to ZMQ context. // Reference to ZMQ context.
const ctx_t &ctx; const thread_ctx_t &ctx;
// Main pollset file descriptor // Main pollset file descriptor
::pollset_t pollset_fd; ::pollset_t pollset_fd;
......
...@@ -49,7 +49,7 @@ ...@@ -49,7 +49,7 @@
#include <algorithm> #include <algorithm>
zmq::select_t::select_t (const zmq::ctx_t &ctx_) : zmq::select_t::select_t (const zmq::thread_ctx_t &ctx_) :
ctx (ctx_), ctx (ctx_),
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
// Fine as long as map is not cleared. // Fine as long as map is not cleared.
......
...@@ -63,7 +63,7 @@ class select_t : public poller_base_t ...@@ -63,7 +63,7 @@ class select_t : public poller_base_t
public: public:
typedef fd_t handle_t; typedef fd_t handle_t;
select_t (const ctx_t &ctx_); select_t (const thread_ctx_t &ctx_);
~select_t (); ~select_t ();
// "poller" concept. // "poller" concept.
...@@ -86,7 +86,7 @@ class select_t : public poller_base_t ...@@ -86,7 +86,7 @@ class select_t : public poller_base_t
void loop (); void loop ();
// Reference to ZMQ context. // Reference to ZMQ context.
const ctx_t &ctx; const thread_ctx_t &ctx;
// Internal state. // Internal state.
struct fds_set_t struct fds_set_t
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment