Commit c193fd14 authored by Martin Sustrik's avatar Martin Sustrik

lock-free polling removed; ZMQ_POLL flag removed

parent 7cb076e5
......@@ -25,12 +25,8 @@ The 'io_threads' argument specifies the size of the 0MQ thread pool to handle
I/O operations. If your application is using 'inproc' messaging exclusively you
may set this to zero, otherwise set it to at least one.
The 'flags' argument is a combination of the flags defined below:
*ZMQ_POLL*::
Specifies that sockets within this 'context' should support multiplexing using
_zmq_poll()_. Enabling this functionality may add a small amount of latency to
message transfers compared to leaving it disabled.
There are no flags defined at the moment and the 'flags' argument should be set
to zero.
RETURN VALUE
......
......@@ -144,6 +144,7 @@ ZMQ_EXPORT size_t zmq_msg_size (zmq_msg_t *msg);
/* 0MQ infrastructure (a.k.a. context) initialisation & termination. */
/******************************************************************************/
/* This flag is obsolete and has no effect. To be removed in next version. */
#define ZMQ_POLL 1
ZMQ_EXPORT void *zmq_init (int app_threads, int io_threads, int flags);
......
......@@ -50,7 +50,6 @@ endif
nodist_libzmq_la_SOURCES = $(pgm_sources)
libzmq_la_SOURCES = app_thread.hpp \
atomic_bitmap.hpp \
atomic_counter.hpp \
atomic_ptr.hpp \
blob.hpp \
......@@ -74,7 +73,6 @@ libzmq_la_SOURCES = app_thread.hpp \
i_endpoint.hpp \
i_engine.hpp \
i_poll_events.hpp \
i_signaler.hpp \
kqueue.hpp \
lb.hpp \
likely.hpp \
......@@ -98,7 +96,6 @@ libzmq_la_SOURCES = app_thread.hpp \
req.hpp \
select.hpp \
session.hpp \
simple_semaphore.hpp \
socket_base.hpp \
stdint.hpp \
streamer.hpp \
......@@ -116,7 +113,6 @@ libzmq_la_SOURCES = app_thread.hpp \
yarray.hpp \
yarray_item.hpp \
ypipe.hpp \
ypollset.hpp \
yqueue.hpp \
zmq_connecter.hpp \
zmq_decoder.hpp \
......@@ -166,7 +162,6 @@ libzmq_la_SOURCES = app_thread.hpp \
uuid.cpp \
xrep.cpp \
xreq.cpp \
ypollset.cpp \
zmq.cpp \
zmq_connecter.cpp \
zmq_decoder.cpp \
......
......@@ -36,7 +36,6 @@
#include "app_thread.hpp"
#include "dispatcher.hpp"
#include "fd_signaler.hpp"
#include "ypollset.hpp"
#include "err.hpp"
#include "pipe.hpp"
#include "config.hpp"
......@@ -59,27 +58,16 @@
#define ZMQ_DELAY_COMMANDS
#endif
zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_,
int flags_) :
zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) :
object_t (dispatcher_, thread_slot_),
last_processing_time (0),
terminated (false)
{
if (flags_ & ZMQ_POLL) {
signaler = new (std::nothrow) fd_signaler_t;
zmq_assert (signaler);
}
else {
signaler = new (std::nothrow) ypollset_t;
zmq_assert (signaler);
}
}
zmq::app_thread_t::~app_thread_t ()
{
zmq_assert (sockets.empty ());
zmq_assert (signaler);
delete signaler;
}
void zmq::app_thread_t::stop ()
......@@ -87,16 +75,16 @@ void zmq::app_thread_t::stop ()
send_stop ();
}
zmq::i_signaler *zmq::app_thread_t::get_signaler ()
zmq::fd_signaler_t *zmq::app_thread_t::get_signaler ()
{
return signaler;
return &signaler;
}
bool zmq::app_thread_t::process_commands (bool block_, bool throttle_)
{
uint64_t signals;
if (block_)
signals = signaler->poll ();
signals = signaler.poll ();
else {
#if defined ZMQ_DELAY_COMMANDS
......@@ -129,7 +117,7 @@ bool zmq::app_thread_t::process_commands (bool block_, bool throttle_)
#endif
// Check whether there are any commands pending for this thread.
signals = signaler->check ();
signals = signaler.check ();
}
if (signals) {
......
......@@ -25,6 +25,7 @@
#include "stdint.hpp"
#include "object.hpp"
#include "yarray.hpp"
#include "fd_signaler.hpp"
namespace zmq
{
......@@ -33,8 +34,7 @@ namespace zmq
{
public:
app_thread_t (class dispatcher_t *dispatcher_, int thread_slot_,
int flags_);
app_thread_t (class dispatcher_t *dispatcher_, int thread_slot_);
~app_thread_t ();
......@@ -43,7 +43,7 @@ namespace zmq
void stop ();
// Returns signaler associated with this application thread.
struct i_signaler *get_signaler ();
fd_signaler_t *get_signaler ();
// Processes commands sent to this thread (if any). If 'block' is
// set to true, returns only after at least one command was processed.
......@@ -71,7 +71,7 @@ namespace zmq
sockets_t sockets;
// App thread's signaler object.
struct i_signaler *signaler;
fd_signaler_t signaler;
// Timestamp of when commands were processed the last time.
uint64_t last_processing_time;
......
This diff is collapsed.
......@@ -33,8 +33,7 @@
#include "windows.h"
#endif
zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_,
int flags_) :
zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) :
sockets (0),
terminated (false)
{
......@@ -53,7 +52,7 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_,
for (int i = 0; i != app_threads_; i++) {
app_thread_info_t info;
info.associated = false;
info.app_thread = new (std::nothrow) app_thread_t (this, i, flags_);
info.app_thread = new (std::nothrow) app_thread_t (this, i);
zmq_assert (info.app_thread);
app_threads.push_back (info);
signalers.push_back (info.app_thread->get_signaler ());
......@@ -62,7 +61,7 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_,
// Create I/O thread objects.
for (int i = 0; i != io_threads_; i++) {
io_thread_t *io_thread = new (std::nothrow) io_thread_t (this,
i + app_threads_, flags_);
i + app_threads_);
zmq_assert (io_thread);
io_threads.push_back (io_thread);
signalers.push_back (io_thread->get_signaler ());
......
......@@ -25,7 +25,7 @@
#include <map>
#include <string>
#include "i_signaler.hpp"
#include "fd_signaler.hpp"
#include "ypipe.hpp"
#include "command.hpp"
#include "config.hpp"
......@@ -51,7 +51,7 @@ namespace zmq
// Create the dispatcher object. Matrix of pipes to communicate between
// each socket and each I/O thread is created along with appropriate
// signalers.
dispatcher_t (int app_threads_, int io_threads_, int flags_);
dispatcher_t (int app_threads_, int io_threads_);
// This function is called when user invokes zmq_term. If there are
// no more sockets open it'll cause all the infrastructure to be shut
......@@ -125,7 +125,7 @@ namespace zmq
io_threads_t io_threads;
// Signalers for both application and I/O threads.
std::vector <i_signaler*> signalers;
std::vector <fd_signaler_t*> signalers;
// Pipe to hold the commands.
typedef ypipe_t <command_t, true,
......
......@@ -21,7 +21,6 @@
#define __ZMQ_FD_SIGNALER_HPP_INCLUDED__
#include "platform.hpp"
#include "i_signaler.hpp"
#include "fd.hpp"
#include "stdint.hpp"
......@@ -33,7 +32,7 @@ namespace zmq
// descriptor and so it can be polled on. Same signal cannot be sent twice
// unless signals are retrieved by the reader side in the meantime.
class fd_signaler_t : public i_signaler
class fd_signaler_t
{
public:
......
/*
Copyright (c) 2007-2010 iMatix Corporation
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_I_SIGNALER_HPP_INCLUDED__
#define __ZMQ_I_SIGNALER_HPP_INCLUDED__
#include "stdint.hpp"
#include "fd.hpp"
namespace zmq
{
// Virtual interface used to send signals. Individual implementations
// may restrict the number of possible signal types to send.
struct i_signaler
{
virtual ~i_signaler () {};
// Send a signal with a specific ID.
virtual void signal (int signal_) = 0;
// Wait for signal. Returns a set of signals in form of a bitmap.
// Signal with index 0 corresponds to value 1, index 1 to value 2,
// index 2 to value 3 etc.
virtual uint64_t poll () = 0;
// Same as poll, however, if there is no signal available,
// function returns zero immediately instead of waiting for a signal.
virtual uint64_t check () = 0;
// Returns file descriptor that allows waiting for signals. Specific
// signalers may not support this functionality. If so, the function
// returns retired_fd.
virtual fd_t get_fd () = 0;
};
}
#endif
......@@ -27,10 +27,8 @@
#include "err.hpp"
#include "command.hpp"
#include "dispatcher.hpp"
#include "simple_semaphore.hpp"
zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_,
int flags_) :
zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_) :
object_t (dispatcher_, thread_slot_)
{
poller = new (std::nothrow) poller_t;
......@@ -56,7 +54,7 @@ void zmq::io_thread_t::stop ()
send_stop ();
}
zmq::i_signaler *zmq::io_thread_t::get_signaler ()
zmq::fd_signaler_t *zmq::io_thread_t::get_signaler ()
{
return &signaler;
}
......
......@@ -38,8 +38,7 @@ namespace zmq
{
public:
io_thread_t (class dispatcher_t *dispatcher_, int thread_slot_,
int flags_);
io_thread_t (class dispatcher_t *dispatcher_, int thread_slot_);
// Clean-up. If the thread was started, it's neccessary to call 'stop'
// before invoking destructor. Otherwise the destructor would hang up.
......@@ -52,7 +51,7 @@ namespace zmq
void stop ();
// Returns signaler associated with this I/O thread.
i_signaler *get_signaler ();
fd_signaler_t *get_signaler ();
// i_poll_events implementation.
void in_event ();
......
......@@ -24,7 +24,6 @@
#include "err.hpp"
#include "pipe.hpp"
#include "io_thread.hpp"
#include "simple_semaphore.hpp"
#include "owned.hpp"
#include "session.hpp"
#include "socket_base.hpp"
......
......@@ -51,8 +51,9 @@ int zmq::queue (class socket_base_t *insocket_,
errno_assert (rc > 0);
// The algorithm below asumes ratio of request and replies processed
// under full load to be 1:1. The alternative would be to process
// replies first, handle request only when there are no more replies.
// under full load to be 1:1. While processing requests replies first
// is tempting it is suspectible to DoS attacks (overloading the system
// with unsolicited replies).
// Receive a new request.
if (items [0].revents & ZMQ_POLLIN) {
......
/*
Copyright (c) 2007-2010 iMatix Corporation
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_SIMPLE_SEMAPHORE_HPP_INCLUDED__
#define __ZMQ_SIMPLE_SEMAPHORE_HPP_INCLUDED__
#include "platform.hpp"
#include "err.hpp"
#if 0 //defined ZMQ_HAVE_LINUX
#include <sys/syscall.h>
#include <unistd.h>
#include <linux/futex.h>
#elif defined ZMQ_HAVE_LINUX ||defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENVMS
#include <pthread.h>
#elif defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#else
#include <semaphore.h>
#endif
namespace zmq
{
// Simple semaphore. Only single thread may be waiting at any given time.
// Also, the semaphore may not be posted before the previous post
// was matched by corresponding wait and the waiting thread was
// released.
#if 0 //defined ZMQ_HAVE_LINUX
// In theory, using private futexes should be more efficient on Linux
// platform than using mutexes. However, in uncontended cases of TCP
// transport on loopback interface we haven't seen any latency improvement.
// The code is commented out waiting for more thorough testing.
class simple_semaphore_t
{
public:
// Initialise the semaphore.
inline simple_semaphore_t () :
dummy (0)
{
}
// Destroy the semaphore.
inline ~simple_semaphore_t ()
{
}
// Wait for the semaphore.
inline void wait ()
{
int rc = syscall (SYS_futex, &dummy, (int) FUTEX_WAIT_PRIVATE,
(int) 0, NULL, NULL, (int) 0);
zmq_assert (rc == 0);
}
// Post the semaphore.
inline void post ()
{
while (true) {
int rc = syscall (SYS_futex, &dummy, (int) FUTEX_WAKE_PRIVATE,
(int) 1, NULL, NULL, (int) 0);
zmq_assert (rc != -1 && rc <= 1);
if (rc == 1)
break;
}
}
private:
int dummy;
// Disable copying of the object.
simple_semaphore_t (const simple_semaphore_t&);
void operator = (const simple_semaphore_t&);
};
#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENVMS
// On platforms that allow for double locking of a mutex from the same
// thread, simple semaphore is implemented using mutex, as it is more
// efficient than full-blown semaphore.
class simple_semaphore_t
{
public:
// Initialise the semaphore.
inline simple_semaphore_t ()
{
int rc = pthread_mutex_init (&mutex, NULL);
posix_assert (rc);
rc = pthread_mutex_lock (&mutex);
posix_assert (rc);
}
// Destroy the semaphore.
inline ~simple_semaphore_t ()
{
int rc = pthread_mutex_unlock (&mutex);
posix_assert (rc);
rc = pthread_mutex_destroy (&mutex);
posix_assert (rc);
}
// Wait for the semaphore.
inline void wait ()
{
int rc = pthread_mutex_lock (&mutex);
posix_assert (rc);
}
// Post the semaphore.
inline void post ()
{
int rc = pthread_mutex_unlock (&mutex);
posix_assert (rc);
}
private:
pthread_mutex_t mutex;
// Disable copying of the object.
simple_semaphore_t (const simple_semaphore_t&);
void operator = (const simple_semaphore_t&);
};
#elif defined ZMQ_HAVE_WINDOWS
// On Windows platform simple semaphore is implemeted using event object.
class simple_semaphore_t
{
public:
// Initialise the semaphore.
inline simple_semaphore_t ()
{
ev = CreateEvent (NULL, FALSE, FALSE, NULL);
win_assert (ev != NULL);
}
// Destroy the semaphore.
inline ~simple_semaphore_t ()
{
int rc = CloseHandle (ev);
win_assert (rc != 0);
}
// Wait for the semaphore.
inline void wait ()
{
DWORD rc = WaitForSingleObject (ev, INFINITE);
win_assert (rc != WAIT_FAILED);
}
// Post the semaphore.
inline void post ()
{
int rc = SetEvent (ev);
win_assert (rc != 0);
}
private:
HANDLE ev;
// Disable copying of the object.
simple_semaphore_t (const simple_semaphore_t&);
void operator = (const simple_semaphore_t&);
};
#else
// Default implementation maps simple semaphore to standard semaphore.
class simple_semaphore_t
{
public:
// Initialise the semaphore.
inline simple_semaphore_t ()
{
int rc = sem_init (&sem, 0, 0);
errno_assert (rc != -1);
}
// Destroy the semaphore.
inline ~simple_semaphore_t ()
{
int rc = sem_destroy (&sem);
errno_assert (rc != -1);
}
// Wait for the semaphore.
inline void wait ()
{
int rc = sem_wait (&sem);
errno_assert (rc != -1);
}
// Post the semaphore.
inline void post ()
{
int rc = sem_post (&sem);
errno_assert (rc != -1);
}
private:
// Underlying system semaphore object.
sem_t sem;
// Disable copying of the object.
simple_semaphore_t (const simple_semaphore_t&);
void operator = (const simple_semaphore_t&);
};
#endif
}
#endif
/*
Copyright (c) 2007-2010 iMatix Corporation
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "ypollset.hpp"
zmq::ypollset_t::ypollset_t ()
{
}
zmq::ypollset_t::~ypollset_t ()
{
}
void zmq::ypollset_t::signal (int signal_)
{
zmq_assert (signal_ >= 0 && signal_ < wait_signal);
if (bits.btsr (signal_, wait_signal))
sem.post ();
}
uint64_t zmq::ypollset_t::poll ()
{
signals_t result = 0;
while (!result) {
result = bits.izte (signals_t (1) << wait_signal, 0);
if (!result) {
sem.wait ();
result = bits.xchg (0);
}
// If btsr was really atomic, result would never be 0 at this
// point, i.e. no looping would be possible. However, to
// support even CPU architectures without CAS instruction
// we allow btsr to be composed of two independent atomic
// operation (set and reset). In such case looping can occur
// sporadically.
}
return (uint64_t) result;
}
uint64_t zmq::ypollset_t::check ()
{
return (uint64_t) bits.xchg (0);
}
zmq::fd_t zmq::ypollset_t::get_fd ()
{
return retired_fd;
}
/*
Copyright (c) 2007-2010 iMatix Corporation
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_YPOLLSET_HPP_INCLUDED__
#define __ZMQ_YPOLLSET_HPP_INCLUDED__
#include "i_signaler.hpp"
#include "simple_semaphore.hpp"
#include "atomic_bitmap.hpp"
namespace zmq
{
// ypollset allows for rapid polling for up to constant number of
// different signals each produced by a different thread. The number of
// possible signals is platform-dependent.
class ypollset_t : public i_signaler
{
public:
ypollset_t ();
~ypollset_t ();
// i_signaler interface implementation.
void signal (int signal_);
uint64_t poll ();
uint64_t check ();
fd_t get_fd ();
private:
// Internal representation of signal bitmap.
typedef atomic_bitmap_t::bitmap_t signals_t;
// Wait signal is carried in the most significant bit of integer.
enum {wait_signal = sizeof (signals_t) * 8 - 1};
// The bits of the pollset.
atomic_bitmap_t bits;
// Used by thread waiting for signals to sleep if there are no
// signals available.
simple_semaphore_t sem;
// Disable copying of ypollset object.
ypollset_t (const ypollset_t&);
void operator = (const ypollset_t&);
};
}
#endif
......@@ -228,14 +228,7 @@ size_t zmq_msg_size (zmq_msg_t *msg_)
void *zmq_init (int app_threads_, int io_threads_, int flags_)
{
// There should be at least a single application thread managed
// by the dispatcher. There's no need for I/O threads if 0MQ is used
// only for inproc messaging
if (app_threads_ < 1 || io_threads_ < 0 ||
app_threads_ > 63 || io_threads_ > 63) {
errno = EINVAL;
return NULL;
}
// There are no context flags defined at the moment, so flags_ is ignored.
#if defined ZMQ_HAVE_OPENPGM
// Unfortunately, OpenPGM doesn't support refcounted init/shutdown, thus,
......@@ -269,7 +262,7 @@ void *zmq_init (int app_threads_, int io_threads_, int flags_)
// Create 0MQ context.
zmq::dispatcher_t *dispatcher = new (std::nothrow) zmq::dispatcher_t (
app_threads_, io_threads_, flags_);
app_threads_, io_threads_);
zmq_assert (dispatcher);
return (void*) dispatcher;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment