Commit f99b8fc9 authored by Martin Sustrik's avatar Martin Sustrik

receiving side of signaler virtualised

parent 50a8b9ea
...@@ -84,7 +84,7 @@ bool zmq::app_thread_t::make_current () ...@@ -84,7 +84,7 @@ bool zmq::app_thread_t::make_current ()
void zmq::app_thread_t::process_commands (bool block_, bool throttle_) void zmq::app_thread_t::process_commands (bool block_, bool throttle_)
{ {
ypollset_t::signals_t signals; uint64_t signals;
if (block_) if (block_)
signals = pollset.poll (); signals = pollset.poll ();
else { else {
...@@ -127,7 +127,7 @@ void zmq::app_thread_t::process_commands (bool block_, bool throttle_) ...@@ -127,7 +127,7 @@ void zmq::app_thread_t::process_commands (bool block_, bool throttle_)
// Traverse all the possible sources of commands and process // Traverse all the possible sources of commands and process
// all the commands from all of them. // all the commands from all of them.
for (int i = 0; i != thread_slot_count (); i++) { for (int i = 0; i != thread_slot_count (); i++) {
if (signals & (ypollset_t::signals_t (1) << i)) { if (signals & (uint64_t (1) << i)) {
command_t cmd; command_t cmd;
while (dispatcher->read (i, get_thread_slot (), &cmd)) while (dispatcher->read (i, get_thread_slot (), &cmd))
cmd.destination->process_command (cmd); cmd.destination->process_command (cmd);
......
...@@ -59,16 +59,16 @@ zmq::fd_signaler_t::~fd_signaler_t () ...@@ -59,16 +59,16 @@ zmq::fd_signaler_t::~fd_signaler_t ()
void zmq::fd_signaler_t::signal (int signal_) void zmq::fd_signaler_t::signal (int signal_)
{ {
zmq_assert (signal_ >= 0 && signal_ < 64); zmq_assert (signal_ >= 0 && signal_ < 64);
signals_t inc = 1; uint64_t inc = 1;
inc <<= signal_; inc <<= signal_;
ssize_t sz = write (fd, &inc, sizeof (signals_t)); ssize_t sz = write (fd, &inc, sizeof (uint64_t));
errno_assert (sz == sizeof (signals_t)); errno_assert (sz == sizeof (uint64_t));
} }
zmq::fd_signaler_t::signals_t zmq::fd_signaler_t::check () uint64_t zmq::fd_signaler_t::check ()
{ {
signals_t val; uint64_t val;
ssize_t sz = read (fd, &val, sizeof (signals_t)); ssize_t sz = read (fd, &val, sizeof (uint64_t));
if (sz == -1 && (errno == EAGAIN || errno == EWOULDBLOCK || if (sz == -1 && (errno == EAGAIN || errno == EWOULDBLOCK ||
errno == EINTR)) errno == EINTR))
return 0; return 0;
...@@ -148,16 +148,30 @@ void zmq::fd_signaler_t::signal (int signal_) ...@@ -148,16 +148,30 @@ void zmq::fd_signaler_t::signal (int signal_)
win_assert (rc != SOCKET_ERROR); win_assert (rc != SOCKET_ERROR);
} }
zmq::fd_signaler_t::signals_t zmq::fd_signaler_t::check () uint64_t zmq::fd_signaler_t::poll ()
{ {
char buffer [32]; // If there are signals available, return straight away.
uint64_t signals = check ();
if (signals)
return signals;
// If there are no signals, wait until at least one signal arrives.
unsigned char sig;
int nbytes = recv (r, &sig, 1, MSG_WAITALL);
win_assert (nbytes != -1);
return uint64_t (1) << sig;
}
uint64_t zmq::fd_signaler_t::check ()
{
unsigned char buffer [32];
int nbytes = recv (r, buffer, 32, 0); int nbytes = recv (r, buffer, 32, 0);
win_assert (nbytes != -1); win_assert (nbytes != -1);
signals_t signals = 0; uint64_t signals = 0;
for (int pos = 0; pos != nbytes; pos++) { for (int pos = 0; pos != nbytes; pos++) {
zmq_assert (buffer [pos] < 64); zmq_assert (buffer [pos] < 64);
signals |= (signals_t (1) << (buffer [pos])); signals |= (uint64_t (1) << (buffer [pos]));
} }
return signals; return signals;
} }
...@@ -202,15 +216,30 @@ void zmq::fd_signaler_t::signal (int signal_) ...@@ -202,15 +216,30 @@ void zmq::fd_signaler_t::signal (int signal_)
errno_assert (nbytes == 1); errno_assert (nbytes == 1);
} }
zmq::fd_signaler_t::signals_t zmq::fd_signaler_t::check () uint64_t zmq::fd_signaler_t::poll ()
{
// If there are signals available, return straight away.
uint64_t signals = check ();
if (signals)
return signals;
// If there are no signals, wait until at least one signal arrives.
unsigned char sig;
ssize_t nbytes = recv (r, &sig, 1, MSG_WAITALL);
errno_assert (nbytes != -1);
return uint64_t (1) << sig;
}
uint64_t zmq::fd_signaler_t::check ()
{ {
unsigned char buffer [32]; unsigned char buffer [32];
ssize_t nbytes = recv (r, buffer, 32, 0); ssize_t nbytes = recv (r, buffer, 32, 0);
errno_assert (nbytes != -1); errno_assert (nbytes != -1);
signals_t signals = 0;
uint64_t signals = 0;
for (int pos = 0; pos != nbytes; pos ++) { for (int pos = 0; pos != nbytes; pos ++) {
zmq_assert (buffer [pos] < 64); zmq_assert (buffer [pos] < 64);
signals |= (1 << (buffer [pos])); signals |= (uint64_t (1) << (buffer [pos]));
} }
return signals; return signals;
} }
......
...@@ -37,22 +37,16 @@ namespace zmq ...@@ -37,22 +37,16 @@ namespace zmq
{ {
public: public:
typedef uint64_t signals_t;
// Initialise the object. // Initialise the object.
fd_signaler_t (); fd_signaler_t ();
// Destroy the object. // Destroy the object.
~fd_signaler_t (); ~fd_signaler_t ();
// Send specific signal. // i_signaler interface implementation.
void signal (int signal_); void signal (int signal_);
uint64_t poll ();
// Retrieves signals. Returns a set of signals in form of a bitmap. uint64_t check ();
// Signal with index 0 corresponds to value 1, index 1 to value 2,
// index 2 to value 4 etc. If there is no signal available,
// it returns zero immediately.
signals_t check ();
// Get the file descriptor associated with the object. // Get the file descriptor associated with the object.
fd_t get_fd (); fd_t get_fd ();
......
...@@ -20,6 +20,8 @@ ...@@ -20,6 +20,8 @@
#ifndef __ZMQ_I_SIGNALER_HPP_INCLUDED__ #ifndef __ZMQ_I_SIGNALER_HPP_INCLUDED__
#define __ZMQ_I_SIGNALER_HPP_INCLUDED__ #define __ZMQ_I_SIGNALER_HPP_INCLUDED__
#include "stdint.hpp"
namespace zmq namespace zmq
{ {
// Virtual interface used to send signals. Individual implementations // Virtual interface used to send signals. Individual implementations
...@@ -31,6 +33,15 @@ namespace zmq ...@@ -31,6 +33,15 @@ namespace zmq
// Send a signal with a specific ID. // Send a signal with a specific ID.
virtual void signal (int signal_) = 0; virtual void signal (int signal_) = 0;
// Wait for signal. Returns a set of signals in form of a bitmap.
// Signal with index 0 corresponds to value 1, index 1 to value 2,
// index 2 to value 3 etc.
uint64_t poll ();
// Same as poll, however, if there is no signal available,
// function returns zero immediately instead of waiting for a signal.
uint64_t check ();
}; };
} }
......
...@@ -104,7 +104,7 @@ int zmq::io_thread_t::get_load () ...@@ -104,7 +104,7 @@ int zmq::io_thread_t::get_load ()
void zmq::io_thread_t::in_event () void zmq::io_thread_t::in_event ()
{ {
// Find out which threads are sending us commands. // Find out which threads are sending us commands.
fd_signaler_t::signals_t signals = signaler.check (); uint64_t signals = signaler.check ();
zmq_assert (signals); zmq_assert (signals);
// Iterate through all the threads in the process and find out // Iterate through all the threads in the process and find out
...@@ -112,7 +112,7 @@ void zmq::io_thread_t::in_event () ...@@ -112,7 +112,7 @@ void zmq::io_thread_t::in_event ()
int slot_count = thread_slot_count (); int slot_count = thread_slot_count ();
for (int source_thread_slot = 0; for (int source_thread_slot = 0;
source_thread_slot != slot_count; source_thread_slot++) { source_thread_slot != slot_count; source_thread_slot++) {
if (signals & (fd_signaler_t::signals_t (1) << source_thread_slot)) { if (signals & (uint64_t (1) << source_thread_slot)) {
// Read all the commands from particular thread. // Read all the commands from particular thread.
command_t cmd; command_t cmd;
......
...@@ -30,7 +30,7 @@ void zmq::ypollset_t::signal (int signal_) ...@@ -30,7 +30,7 @@ void zmq::ypollset_t::signal (int signal_)
sem.post (); sem.post ();
} }
zmq::ypollset_t::signals_t zmq::ypollset_t::poll () uint64_t zmq::ypollset_t::poll ()
{ {
signals_t result = 0; signals_t result = 0;
while (!result) { while (!result) {
...@@ -47,10 +47,10 @@ zmq::ypollset_t::signals_t zmq::ypollset_t::poll () ...@@ -47,10 +47,10 @@ zmq::ypollset_t::signals_t zmq::ypollset_t::poll ()
// operation (set and reset). In such case looping can occur // operation (set and reset). In such case looping can occur
// sporadically. // sporadically.
} }
return result; return (uint64_t) result;
} }
zmq::ypollset_t::signals_t zmq::ypollset_t::check () uint64_t zmq::ypollset_t::check ()
{ {
return bits.xchg (0); return (uint64_t) bits.xchg (0);
} }
...@@ -35,25 +35,19 @@ namespace zmq ...@@ -35,25 +35,19 @@ namespace zmq
{ {
public: public:
typedef atomic_bitmap_t::bitmap_t signals_t;
// Create the pollset. // Create the pollset.
ypollset_t (); ypollset_t ();
// Send a signal to the pollset (i_singnaler implementation). // i_signaler interface implementation.
void signal (int signal_); void signal (int signal_);
uint64_t poll ();
// Wait for signal. Returns a set of signals in form of a bitmap. uint64_t check ();
// Signal with index 0 corresponds to value 1, index 1 to value 2,
// index 2 to value 3 etc.
signals_t poll ();
// Same as poll, however, if there is no signal available,
// function returns zero immediately instead of waiting for a signal.
signals_t check ();
private: private:
// Internal representation of signal bitmap.
typedef atomic_bitmap_t::bitmap_t signals_t;
// Wait signal is carried in the most significant bit of integer. // Wait signal is carried in the most significant bit of integer.
enum {wait_signal = sizeof (signals_t) * 8 - 1}; enum {wait_signal = sizeof (signals_t) * 8 - 1};
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment