Commit ae93ed31 authored by Martin Sustrik's avatar Martin Sustrik

signaler rewritten in such a way that any number (>64) of threads can be used

parent 1ffc6dd4
......@@ -57,7 +57,8 @@
#define ZMQ_DELAY_COMMANDS
#endif
zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) :
zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_,
uint32_t thread_slot_) :
object_t (dispatcher_, thread_slot_),
last_processing_time (0),
terminated (false)
......@@ -81,9 +82,9 @@ zmq::signaler_t *zmq::app_thread_t::get_signaler ()
bool zmq::app_thread_t::process_commands (bool block_, bool throttle_)
{
uint64_t signals;
uint32_t signal;
if (block_)
signals = signaler.poll ();
signal = signaler.poll ();
else {
#if defined ZMQ_DELAY_COMMANDS
......@@ -116,20 +117,14 @@ bool zmq::app_thread_t::process_commands (bool block_, bool throttle_)
#endif
// Check whether there are any commands pending for this thread.
signals = signaler.check ();
signal = signaler.check ();
}
if (signals) {
// Traverse all the possible sources of commands and process
// all the commands from all of them.
for (int i = 0; i != thread_slot_count (); i++) {
if (signals & (uint64_t (1) << i)) {
command_t cmd;
while (dispatcher->read (i, get_thread_slot (), &cmd))
cmd.destination->process_command (cmd);
}
}
// Process all the commands from the signaling source if there is one.
if (signal != signaler_t::no_signal) {
command_t cmd;
while (dispatcher->read (signal, get_thread_slot (), &cmd))
cmd.destination->process_command (cmd);
}
return !terminated;
......
......@@ -34,7 +34,7 @@ namespace zmq
{
public:
app_thread_t (class dispatcher_t *dispatcher_, int thread_slot_);
app_thread_t (class dispatcher_t *dispatcher_, uint32_t thread_slot_);
~app_thread_t ();
......
......@@ -37,6 +37,10 @@ namespace zmq
// footprint of dispatcher.
command_pipe_granularity = 4,
// Number of signals that can be read by the signaler
// using a single system call.
signal_buffer_size = 8,
// Determines how often does socket poll for new commands when it
// still has unprocessed messages to handle. Thus, if it is set to 100,
// socket will process 100 inbound messages before doing the poll.
......
......@@ -33,7 +33,7 @@
#include "windows.h"
#endif
zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) :
zmq::dispatcher_t::dispatcher_t (uint32_t app_threads_, uint32_t io_threads_) :
sockets (0),
terminated (false)
{
......@@ -49,7 +49,7 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) :
#endif
// Create application thread proxies.
for (int i = 0; i != app_threads_; i++) {
for (uint32_t i = 0; i != app_threads_; i++) {
app_thread_info_t info;
info.associated = false;
info.app_thread = new (std::nothrow) app_thread_t (this, i);
......@@ -59,7 +59,7 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) :
}
// Create I/O thread objects.
for (int i = 0; i != io_threads_; i++) {
for (uint32_t i = 0; i != io_threads_; i++) {
io_thread_t *io_thread = new (std::nothrow) io_thread_t (this,
i + app_threads_);
zmq_assert (io_thread);
......@@ -79,7 +79,7 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) :
zmq_assert (command_pipes);
// Launch I/O threads.
for (int i = 0; i != io_threads_; i++)
for (uint32_t i = 0; i != io_threads_; i++)
io_threads [i]->start ();
}
......@@ -136,9 +136,9 @@ zmq::dispatcher_t::~dispatcher_t ()
#endif
}
int zmq::dispatcher_t::thread_slot_count ()
uint32_t zmq::dispatcher_t::thread_slot_count ()
{
return signalers.size ();
return (uint32_t) signalers.size ();
}
zmq::socket_base_t *zmq::dispatcher_t::create_socket (int type_)
......@@ -213,7 +213,7 @@ void zmq::dispatcher_t::no_sockets (app_thread_t *thread_)
app_threads_sync.unlock ();
}
void zmq::dispatcher_t::write (int source_, int destination_,
void zmq::dispatcher_t::write (uint32_t source_, uint32_t destination_,
const command_t &command_)
{
command_pipe_t &pipe =
......@@ -223,7 +223,7 @@ void zmq::dispatcher_t::write (int source_, int destination_,
signalers [destination_]->signal (source_);
}
bool zmq::dispatcher_t::read (int source_, int destination_,
bool zmq::dispatcher_t::read (uint32_t source_, uint32_t destination_,
command_t *command_)
{
return command_pipes [source_ * signalers.size () +
......
......@@ -51,7 +51,7 @@ namespace zmq
// Create the dispatcher object. Matrix of pipes to communicate between
// each socket and each I/O thread is created along with appropriate
// signalers.
dispatcher_t (int app_threads_, int io_threads_);
dispatcher_t (uint32_t app_threads_, uint32_t io_threads_);
// This function is called when user invokes zmq_term. If there are
// no more sockets open it'll cause all the infrastructure to be shut
......@@ -72,14 +72,16 @@ namespace zmq
// Returns number of thread slots in the dispatcher. To be used by
// individual threads to find out how many distinct signals can be
// received.
int thread_slot_count ();
uint32_t thread_slot_count ();
// Send command from the source to the destination.
void write (int source_, int destination_, const command_t &command_);
void write (uint32_t source_, uint32_t destination_,
const command_t &command_);
// Receive command from the source. Returns false if there is no
// command available.
bool read (int source_, int destination_, command_t *command_);
bool read (uint32_t source_, uint32_t destination_,
command_t *command_);
// Returns the I/O thread that is the least busy at the moment.
// Taskset specifies which I/O threads are eligible (0 = all).
......
......@@ -28,7 +28,8 @@
#include "command.hpp"
#include "dispatcher.hpp"
zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_) :
zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_,
uint32_t thread_slot_) :
object_t (dispatcher_, thread_slot_)
{
poller = new (std::nothrow) poller_t;
......@@ -66,22 +67,17 @@ int zmq::io_thread_t::get_load ()
void zmq::io_thread_t::in_event ()
{
// Find out which threads are sending us commands.
uint64_t signals = signaler.check ();
zmq_assert (signals);
// Iterate through all the threads in the process and find out
// which of them sent us commands.
int slot_count = thread_slot_count ();
for (int source_thread_slot = 0;
source_thread_slot != slot_count; source_thread_slot++) {
if (signals & (uint64_t (1) << source_thread_slot)) {
// Read all the commands from particular thread.
command_t cmd;
while (dispatcher->read (source_thread_slot, thread_slot, &cmd))
cmd.destination->process_command (cmd);
}
while (true) {
// Get the next signal.
uint32_t signal = signaler.check ();
if (signal == signaler_t::no_signal)
break;
// Process all the commands from the thread that sent the signal.
command_t cmd;
while (dispatcher->read (signal, thread_slot, &cmd))
cmd.destination->process_command (cmd);
}
}
......
......@@ -38,7 +38,7 @@ namespace zmq
{
public:
io_thread_t (class dispatcher_t *dispatcher_, int thread_slot_);
io_thread_t (class dispatcher_t *dispatcher_, uint32_t thread_slot_);
// Clean-up. If the thread was started, it's neccessary to call 'stop'
// before invoking destructor. Otherwise the destructor would hang up.
......
......@@ -28,7 +28,7 @@
#include "session.hpp"
#include "socket_base.hpp"
zmq::object_t::object_t (dispatcher_t *dispatcher_, int thread_slot_) :
zmq::object_t::object_t (dispatcher_t *dispatcher_, uint32_t thread_slot_) :
dispatcher (dispatcher_),
thread_slot (thread_slot_)
{
......@@ -44,12 +44,7 @@ zmq::object_t::~object_t ()
{
}
int zmq::object_t::thread_slot_count ()
{
return dispatcher->thread_slot_count ();
}
int zmq::object_t::get_thread_slot ()
uint32_t zmq::object_t::get_thread_slot ()
{
return thread_slot;
}
......@@ -162,7 +157,7 @@ void zmq::object_t::send_stop ()
{
// 'stop' command goes always from administrative thread to
// the current object.
int admin_thread_id = dispatcher->thread_slot_count () - 1;
uint32_t admin_thread_id = dispatcher->thread_slot_count () - 1;
command_t cmd;
cmd.destination = this;
cmd.type = command_t::stop;
......@@ -375,7 +370,7 @@ void zmq::object_t::process_seqnum ()
void zmq::object_t::send_command (command_t &cmd_)
{
int destination_thread_slot = cmd_.destination->get_thread_slot ();
uint32_t destination_thread_slot = cmd_.destination->get_thread_slot ();
dispatcher->write (thread_slot, destination_thread_slot, cmd_);
}
......@@ -32,11 +32,11 @@ namespace zmq
{
public:
object_t (class dispatcher_t *dispatcher_, int thread_slot_);
object_t (class dispatcher_t *dispatcher_, uint32_t thread_slot_);
object_t (object_t *parent_);
virtual ~object_t ();
int get_thread_slot ();
uint32_t get_thread_slot ();
dispatcher_t *get_dispatcher ();
void process_command (struct command_t &cmd_);
......@@ -52,9 +52,6 @@ namespace zmq
void unregister_endpoints (class socket_base_t *socket_);
class socket_base_t *find_endpoint (const char *addr_);
// Returns number of thead slots in the dispatcher.
int thread_slot_count ();
// Chooses least loaded I/O thread.
class io_thread_t *choose_io_thread (uint64_t taskset_);
......@@ -106,7 +103,7 @@ namespace zmq
class dispatcher_t *dispatcher;
// Slot ID of the thread the object belongs to.
int thread_slot;
uint32_t thread_slot;
private:
......
......@@ -32,9 +32,60 @@
#include <fcntl.h>
#endif
const uint32_t zmq::signaler_t::no_signal = 0xffffffff;
uint32_t zmq::signaler_t::poll ()
{
// Return next signal.
if (current != count) {
uint32_t result = buffer [current];
current++;
return result;
}
// If there is no signal buffered, poll for new signals.
xpoll ();
// Return first signal.
zmq_assert (current != count);
uint32_t result = buffer [current];
current++;
return result;
}
uint32_t zmq::signaler_t::check ()
{
// Return next signal.
if (current != count) {
uint32_t result = buffer [current];
current++;
return result;
}
// If there is no signal buffered, check whether more signals
// can be obtained.
xcheck ();
// Return first signal if any.
if (current != count) {
uint32_t result = buffer [current];
current++;
return result;
}
return no_signal;
}
zmq::fd_t zmq::signaler_t::get_fd ()
{
return r;
}
#if defined ZMQ_HAVE_WINDOWS
zmq::signaler_t::signaler_t ()
zmq::signaler_t::signaler_t () :
current (0),
count (0)
{
// Windows have no 'socketpair' function. CreatePipe is no good as pipe
// handles cannot be polled on. Here we create the socketpair by hand.
......@@ -95,18 +146,16 @@ zmq::signaler_t::~signaler_t ()
wsa_assert (rc != SOCKET_ERROR);
}
void zmq::signaler_t::signal (int signal_)
void zmq::signaler_t::signal (uint32_t signal_)
{
// TODO: Note that send is a blocking operation.
// How should we behave if the signal cannot be written to the signaler?
zmq_assert (signal_ >= 0 && signal_ < 64);
char c = (char) signal_;
int rc = send (w, &c, 1, 0);
int rc = send (w, &signal_, sizeof (signal_), 0);
win_assert (rc != SOCKET_ERROR);
zmq_assert (rc == sizeof (signal_));
}
uint64_t zmq::signaler_t::poll ()
void zmq::signaler_t::xpoll ()
{
// Switch to blocking mode.
unsigned long argp = 0;
......@@ -115,8 +164,8 @@ uint64_t zmq::signaler_t::poll ()
// Get the signals. Given that we are in the blocking mode now,
// there should be at least a single signal returned.
uint64_t signals = check ();
zmq_assert (signals);
xcheck ();
zmq_assert (current != count);
// Switch back to non-blocking mode.
argp = 1;
......@@ -126,25 +175,24 @@ uint64_t zmq::signaler_t::poll ()
return signals;
}
uint64_t zmq::signaler_t::check ()
void zmq::signaler_t::xcheck ()
{
unsigned char buffer [32];
int nbytes = recv (r, (char*) buffer, 32, 0);
if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK)
return 0;
wsa_assert (nbytes != -1);
int nbytes = recv (r, (char*) buffer, sizeof (buffer), 0);
uint64_t signals = 0;
for (int pos = 0; pos != nbytes; pos++) {
zmq_assert (buffer [pos] < 64);
signals |= (uint64_t (1) << (buffer [pos]));
// No signals are available.
if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK) {
current = 0;
count = 0;
return;
}
return signals;
}
zmq::fd_t zmq::signaler_t::get_fd ()
{
return r;
wsa_assert (nbytes != -1);
// Check whether we haven't got half of a signal.
zmq_assert (nbytes % sizeof (uint32_t) == 0);
current = 0;
count = nbytes / sizeof (uint32_t);
}
#elif defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX
......@@ -152,7 +200,9 @@ zmq::fd_t zmq::signaler_t::get_fd ()
#include <sys/types.h>
#include <sys/socket.h>
zmq::signaler_t::signaler_t ()
zmq::signaler_t::signaler_t () :
current (0),
count (0)
{
int sv [2];
int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
......@@ -174,15 +224,14 @@ zmq::signaler_t::~signaler_t ()
close (r);
}
void zmq::signaler_t::signal (int signal_)
void zmq::signaler_t::signal (uint32_t signal_)
{
zmq_assert (signal_ >= 0 && signal_ < 64);
unsigned char c = (unsigned char) signal_;
ssize_t nbytes = send (w, &c, 1, 0);
errno_assert (nbytes == 1);
ssize_t nbytes = send (w, &signal_, sizeof (signal_), 0);
errno_assert (nbytes != -1);
zmq_assert (nbytes == sizeof (signal_);
}
uint64_t zmq::signaler_t::poll ()
void zmq::signaler_t::xpoll ()
{
// Set the reader to blocking mode.
int flags = fcntl (r, F_GETFL, 0);
......@@ -192,7 +241,8 @@ uint64_t zmq::signaler_t::poll ()
errno_assert (rc != -1);
// Poll for events.
uint64_t signals = check ();
xcheck ();
zmq_assert (current != count);
// Set the reader to non-blocking mode.
flags = fcntl (r, F_GETFL, 0);
......@@ -200,29 +250,23 @@ uint64_t zmq::signaler_t::poll ()
flags = 0;
rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
errno_assert (rc != -1);
return signals;
}
uint64_t zmq::signaler_t::check ()
void zmq::signaler_t::xcheck ()
{
unsigned char buffer [64];
ssize_t nbytes = recv (r, buffer, 64, 0);
if (nbytes == -1 && errno == EAGAIN)
return 0;
ssize_t nbytes = recv (r, buffer, sizeof (buffer), 0);
if (nbytes == -1 && errno == EAGAIN) {
current = 0;
count = 0;
return;
}
zmq_assert (nbytes != -1);
uint64_t signals = 0;
for (int pos = 0; pos != nbytes; pos ++) {
zmq_assert (buffer [pos] < 64);
signals |= (uint64_t (1) << (buffer [pos]));
}
return signals;
}
// Check whether we haven't got half of a signal.
zmq_assert (nbytes % sizeof (uint32_t) == 0);
zmq::fd_t zmq::signaler_t::get_fd ()
{
return r;
current = 0;
count = nbytes / sizeof (uint32_t);
}
#else
......@@ -230,7 +274,9 @@ zmq::fd_t zmq::signaler_t::get_fd ()
#include <sys/types.h>
#include <sys/socket.h>
zmq::signaler_t::signaler_t ()
zmq::signaler_t::signaler_t () :
current (0),
count (0)
{
int sv [2];
int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
......@@ -245,50 +291,42 @@ zmq::signaler_t::~signaler_t ()
close (r);
}
void zmq::signaler_t::signal (int signal_)
void zmq::signaler_t::signal (uint32_t signal_)
{
// TODO: Note that send is a blocking operation.
// How should we behave if the signal cannot be written to the signaler?
zmq_assert (signal_ >= 0 && signal_ < 64);
unsigned char c = (unsigned char) signal_;
ssize_t nbytes = send (w, &c, 1, 0);
errno_assert (nbytes == 1);
ssize_t nbytes = send (w, &signal_, sizeof (signal_), 0);
errno_assert (nbytes != -1);
zmq_assert (nbytes == sizeof (signal_));
}
uint64_t zmq::signaler_t::poll ()
void zmq::signaler_t::xpoll ()
{
unsigned char buffer [64];
ssize_t nbytes = recv (r, buffer, 64, 0);
zmq_assert (nbytes != -1);
ssize_t nbytes = recv (r, buffer, sizeof (buffer), 0);
errno_assert (nbytes != -1);
// Check whether we haven't got half of a signal.
zmq_assert (nbytes % sizeof (uint32_t) == 0);
uint64_t signals = 0;
for (int pos = 0; pos != nbytes; pos ++) {
zmq_assert (buffer [pos] < 64);
signals |= (uint64_t (1) << (buffer [pos]));
}
return signals;
current = 0;
count = nbytes / sizeof (uint32_t);
}
uint64_t zmq::signaler_t::check ()
void zmq::signaler_t::xcheck ()
{
unsigned char buffer [64];
ssize_t nbytes = recv (r, buffer, 64, MSG_DONTWAIT);
if (nbytes == -1 && errno == EAGAIN)
return 0;
zmq_assert (nbytes != -1);
uint64_t signals = 0;
for (int pos = 0; pos != nbytes; pos ++) {
zmq_assert (buffer [pos] < 64);
signals |= (uint64_t (1) << (buffer [pos]));
if (nbytes == -1 && errno == EAGAIN) {
current = 0;
count = 0;
return;
}
return signals;
}
errno_assert (nbytes != -1);
zmq::fd_t zmq::signaler_t::get_fd ()
{
return r;
// Check whether we haven't got half of a signal.
zmq_assert (nbytes % sizeof (uint32_t) == 0);
current = 0;
count = nbytes / sizeof (uint32_t);
}
#endif
......
......@@ -20,9 +20,12 @@
#ifndef __ZMQ_SIGNALER_HPP_INCLUDED__
#define __ZMQ_SIGNALER_HPP_INCLUDED__
#include <stddef.h>
#include "platform.hpp"
#include "fd.hpp"
#include "stdint.hpp"
#include "config.hpp"
namespace zmq
{
......@@ -39,14 +42,18 @@ namespace zmq
signaler_t ();
~signaler_t ();
// i_signaler interface implementation.
void signal (int signal_);
uint64_t poll ();
uint64_t check ();
static const uint32_t no_signal;
void signal (uint32_t signal_);
uint32_t poll ();
uint32_t check ();
fd_t get_fd ();
private:
void xpoll ();
void xcheck ();
#if defined ZMQ_HAVE_OPENVMS
// Whilst OpenVMS supports socketpair - it maps to AF_INET only.
......@@ -64,6 +71,15 @@ namespace zmq
fd_t w;
fd_t r;
// Signal buffer.
uint32_t buffer [signal_buffer_size];
// Position of the next signal in the buffer to return to the user.
size_t current;
// Number of signals in the signal buffer.
size_t count;
// Disable copying of fd_signeler object.
signaler_t (const signaler_t&);
void operator = (const signaler_t&);
......
......@@ -262,7 +262,7 @@ void *zmq_init (int app_threads_, int io_threads_, int flags_)
// Create 0MQ context.
zmq::dispatcher_t *dispatcher = new (std::nothrow) zmq::dispatcher_t (
app_threads_, io_threads_);
(uint32_t) app_threads_, (uint32_t) io_threads_);
zmq_assert (dispatcher);
return (void*) dispatcher;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment