Commit 2aa0e6fd authored by f18m's avatar f18m

Change ZMQ_THREAD_AFFINITY to…

Change ZMQ_THREAD_AFFINITY to ZMQ_THREAD_AFFINITY_CPU_ADD/ZMQ_THREAD_AFFINITY_CPU_REMOVE. Avoid prefix thread names when no prefix was set.
parent cb266ee0
......@@ -76,15 +76,25 @@ This option only applies before creating any sockets on the context.
Default value:: -1
ZMQ_THREAD_AFFINITY: Set CPU affinity for I/O threads
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_THREAD_AFFINITY' argument sets CPU affinity for the internal
ZMQ_THREAD_AFFINITY_CPU_ADD: Add a CPU to list of affinity for I/O threads
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_THREAD_AFFINITY_CPU_ADD' argument adds a specific CPU to the affinity list for the internal
context's thread pool. This option is only supported on Linux.
On Linux, each bit of the 'option_value' argument will represent an enabled
CPU in the corresponding cpu_set_t; for example the value 0x1 will set affinity
of all context's thread pool to CPU 0; the value 0x110 will set affinity
of all context's thread pool to CPU 1 and 2.
This option only applies before creating any sockets on the context.
The default affinity list is empty and means that no explicit CPU-affinity will be set on
internal context's threads.
[horizontal]
Default value:: -1
ZMQ_THREAD_AFFINITY_CPU_REMOVE: Remove a CPU to list of affinity for I/O threads
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_THREAD_AFFINITY_CPU_REMOVE' argument removes a specific CPU to the affinity list for the internal
context's thread pool. This option is only supported on Linux.
This option only applies before creating any sockets on the context.
The default affinity list is empty and means that no explicit CPU-affinity will be set on
internal context's threads.
[horizontal]
Default value:: -1
......
......@@ -610,9 +610,9 @@ ZMQ_EXPORT void zmq_threadclose (void* thread);
/* DRAFT Context options */
#define ZMQ_MSG_T_SIZE 6
#define ZMQ_THREAD_AFFINITY 7
#define ZMQ_THREAD_AFFINITY_DFLT -1
#define ZMQ_THREAD_NAME_PREFIX 8
#define ZMQ_THREAD_AFFINITY_CPU_ADD 7
#define ZMQ_THREAD_AFFINITY_CPU_REMOVE 8
#define ZMQ_THREAD_NAME_PREFIX 9
/* DRAFT Socket methods. */
ZMQ_EXPORT int zmq_join (void *s, const char *group);
......
......@@ -77,8 +77,7 @@ zmq::ctx_t::ctx_t () :
blocky (true),
ipv6 (false),
thread_priority (ZMQ_THREAD_PRIORITY_DFLT),
thread_sched_policy (ZMQ_THREAD_SCHED_POLICY_DFLT),
thread_affinity (ZMQ_THREAD_AFFINITY_DFLT)
thread_sched_policy (ZMQ_THREAD_SCHED_POLICY_DFLT)
{
#ifdef HAVE_FORK
pid = getpid();
......@@ -252,9 +251,20 @@ int zmq::ctx_t::set (int option_, int optval_)
thread_sched_policy = optval_;
}
else
if (option_ == ZMQ_THREAD_AFFINITY && optval_ >= 0) {
if (option_ == ZMQ_THREAD_AFFINITY_CPU_ADD && optval_ >= 0) {
scoped_lock_t locker(opt_sync);
thread_affinity = optval_;
thread_affinity_cpus.insert( optval_ );
}
else
if (option_ == ZMQ_THREAD_AFFINITY_CPU_REMOVE && optval_ >= 0) {
scoped_lock_t locker(opt_sync);
std::set<int>::iterator it = thread_affinity_cpus.find( optval_ );
if (it != thread_affinity_cpus.end()) {
thread_affinity_cpus.erase( it );
} else {
errno = EINVAL;
rc = -1;
}
}
else
if (option_ == ZMQ_THREAD_NAME_PREFIX && optval_ >= 0) {
......@@ -411,11 +421,13 @@ void zmq::ctx_t::start_thread (thread_t &thread_, thread_fn *tfn_, void *arg_) c
{
static unsigned int nthreads_started = 0;
thread_.setSchedulingParameters(thread_priority, thread_sched_policy, thread_affinity);
thread_.setSchedulingParameters(thread_priority, thread_sched_policy, thread_affinity_cpus);
thread_.start(tfn_, arg_);
#ifndef ZMQ_HAVE_ANDROID
std::ostringstream s;
s << thread_name_prefix << "/ZMQbg/" << nthreads_started;
if (!thread_name_prefix.empty())
s << thread_name_prefix << "/";
s << "ZMQbg/" << nthreads_started;
thread_.setThreadName (s.str().c_str());
#endif
nthreads_started++;
......
......@@ -214,7 +214,7 @@ namespace zmq
// Thread parameters.
int thread_priority;
int thread_sched_policy;
int thread_affinity;
std::set<int> thread_affinity_cpus;
std::string thread_name_prefix;
// Synchronisation of access to context options.
......
......@@ -70,12 +70,12 @@ void zmq::thread_t::stop ()
win_assert (rc2 != 0);
}
void zmq::thread_t::setSchedulingParameters(int priority_, int schedulingPolicy_, int affinity_)
void zmq::thread_t::setSchedulingParameters(int priority_, int schedulingPolicy_, const std::set<int>& affinity_cpus_)
{
// not implemented
LIBZMQ_UNUSED (priority_);
LIBZMQ_UNUSED (schedulingPolicy_);
LIBZMQ_UNUSED (affinity_);
LIBZMQ_UNUSED (affinity_cpus_);
}
void zmq::thread_t::setThreadName(const char *name_)
......@@ -125,11 +125,11 @@ void zmq::thread_t::stop ()
posix_assert (rc);
}
void zmq::thread_t::setSchedulingParameters(int priority_, int schedulingPolicy_, int affinity_)
void zmq::thread_t::setSchedulingParameters(int priority_, int schedulingPolicy_, const std::set<int>& affinity_cpus_)
{
thread_priority=priority_;
thread_sched_policy=schedulingPolicy_;
thread_affinity=affinity_;
thread_affinity_cpus=affinity_cpus_;
}
void zmq::thread_t::applySchedulingParameters() // to be called in secondary thread context
......@@ -194,15 +194,13 @@ void zmq::thread_t::applySchedulingParameters() // to be called in secon
}
#ifdef ZMQ_HAVE_PTHREAD_SET_AFFINITY
if (thread_affinity != ZMQ_THREAD_AFFINITY_DFLT)
if (!thread_affinity_cpus.empty())
{
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
for (unsigned int cpuidx=0; cpuidx<sizeof(int)*8; cpuidx++)
for (std::set<int>::const_iterator it = thread_affinity_cpus.begin(); it != thread_affinity_cpus.end(); it++)
{
int cpubit = (1 << cpuidx);
if ( (thread_affinity & cpubit) != 0 )
CPU_SET( cpuidx , &cpuset );
CPU_SET( (int)(*it) , &cpuset );
}
rc = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
posix_assert (rc);
......
......@@ -33,6 +33,7 @@
#ifndef ZMQ_HAVE_WINDOWS
#include <pthread.h>
#endif
#include <set>
namespace zmq
{
......@@ -55,7 +56,6 @@ namespace zmq
, arg(NULL)
, thread_priority(ZMQ_THREAD_PRIORITY_DFLT)
, thread_sched_policy(ZMQ_THREAD_SCHED_POLICY_DFLT)
, thread_affinity(ZMQ_THREAD_AFFINITY_DFLT)
{
}
......@@ -68,7 +68,7 @@ namespace zmq
// Sets the thread scheduling parameters. Only implemented for
// pthread. Has no effect on other platforms.
void setSchedulingParameters(int priority_, int schedulingPolicy_, int affinity_);
void setSchedulingParameters(int priority_, int schedulingPolicy_, const std::set<int>& affinity_cpus_);
// Sets the thread name, 16 characters max including terminating NUL.
// Only implemented for pthread. Has no effect on other platforms.
......@@ -91,7 +91,7 @@ namespace zmq
// Thread scheduling parameters.
int thread_priority;
int thread_sched_policy;
int thread_affinity;
std::set<int> thread_affinity_cpus;
thread_t (const thread_t&);
const thread_t &operator = (const thread_t&);
......
......@@ -91,9 +91,9 @@
/* DRAFT Context options */
#define ZMQ_MSG_T_SIZE 6
#define ZMQ_THREAD_AFFINITY 7
#define ZMQ_THREAD_AFFINITY_DFLT -1
#define ZMQ_THREAD_NAME_PREFIX 8
#define ZMQ_THREAD_AFFINITY_CPU_ADD 7
#define ZMQ_THREAD_AFFINITY_CPU_REMOVE 8
#define ZMQ_THREAD_NAME_PREFIX 9
/* DRAFT Socket methods. */
int zmq_join (void *s, const char *group);
......
......@@ -81,10 +81,6 @@ void test_ctx_thread_opts(void* ctx)
assert (rc == -1 && errno == EINVAL);
rc = zmq_ctx_set(ctx, ZMQ_THREAD_PRIORITY, ZMQ_THREAD_PRIORITY_DFLT);
assert (rc == -1 && errno == EINVAL);
#ifdef ZMQ_THREAD_AFFINITY
rc = zmq_ctx_set(ctx, ZMQ_THREAD_AFFINITY, ZMQ_THREAD_AFFINITY_DFLT);
assert (rc == -1 && errno == EINVAL);
#endif
// test scheduling policy:
......@@ -113,16 +109,27 @@ void test_ctx_thread_opts(void* ctx)
}
#ifdef ZMQ_THREAD_AFFINITY
#ifdef ZMQ_THREAD_AFFINITY_CPU_ADD
// test affinity:
int cpu_affinity_test = (1 << 0);
// this should result in background threads being placed only on the
// first CPU available on this system; try experimenting with other values
// (e.g., 1<<5 to use CPU index 5) and use "top -H" or "taskset -pc" to see the result
// (e.g., 5 to use CPU index 5) and use "top -H" or "taskset -pc" to see the result
int cpus_add[] = { 0, 1 };
for (unsigned int idx=0; idx < sizeof(cpus_add)/sizeof(cpus_add[0]); idx++)
{
rc = zmq_ctx_set(ctx, ZMQ_THREAD_AFFINITY_CPU_ADD, cpus_add[idx]);
assert (rc == 0);
}
rc = zmq_ctx_set(ctx, ZMQ_THREAD_AFFINITY, cpu_affinity_test);
// you can also remove CPUs from list of affinities:
int cpus_remove[] = { 1 };
for (unsigned int idx=0; idx < sizeof(cpus_remove)/sizeof(cpus_remove[0]); idx++)
{
rc = zmq_ctx_set(ctx, ZMQ_THREAD_AFFINITY_CPU_REMOVE, cpus_remove[idx]);
assert (rc == 0);
}
#endif
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment