Commit d57ee098 authored by malosek's avatar malosek

Merge branch 'master' of git@github.com:sustrik/zeromq2

parents ff65e26c 4efe2366
...@@ -51,6 +51,18 @@ extern "C" { ...@@ -51,6 +51,18 @@ extern "C" {
#ifndef EPROTONOSUPPORT #ifndef EPROTONOSUPPORT
#define EPROTONOSUPPORT (ZMQ_HAUSNUMERO + 2) #define EPROTONOSUPPORT (ZMQ_HAUSNUMERO + 2)
#endif #endif
#ifndef ENOBUFS
#define ENOBUFS (ZMQ_HAUSNUMERO + 3)
#endif
#ifndef ENETDOWN
#define ENETDOWN (ZMQ_HAUSNUMERO + 4)
#endif
#ifndef EADDRINUSE
#define EADDRINUSE (ZMQ_HAUSNUMERO + 5)
#endif
#ifndef EADDRNOTAVAIL
#define EADDRNOTAVAIL (ZMQ_HAUSNUMERO + 6)
#endif
// Native 0MQ error codes. // Native 0MQ error codes.
#define EMTHREAD (ZMQ_HAUSNUMERO + 50) #define EMTHREAD (ZMQ_HAUSNUMERO + 50)
...@@ -344,8 +356,8 @@ ZMQ_EXPORT int zmq_send (void *s, zmq_msg_t *msg, int flags); ...@@ -344,8 +356,8 @@ ZMQ_EXPORT int zmq_send (void *s, zmq_msg_t *msg, int flags);
// EFSM - function cannot be called at the moment. // EFSM - function cannot be called at the moment.
ZMQ_EXPORT int zmq_flush (void *s); ZMQ_EXPORT int zmq_flush (void *s);
// Send a message from the socket 's'. 'flags' argument can be combination // Receive a message from the socket 's'. 'flags' argument can be combination
// of the flags described above. // of the flags described above with the exception of ZMQ_NOFLUSH.
// //
// Errors: EAGAIN - message cannot be received at the moment (applies only to // Errors: EAGAIN - message cannot be received at the moment (applies only to
// non-blocking receive). // non-blocking receive).
...@@ -353,6 +365,39 @@ ZMQ_EXPORT int zmq_flush (void *s); ...@@ -353,6 +365,39 @@ ZMQ_EXPORT int zmq_flush (void *s);
// EFSM - function cannot be called at the moment. // EFSM - function cannot be called at the moment.
ZMQ_EXPORT int zmq_recv (void *s, zmq_msg_t *msg, int flags); ZMQ_EXPORT int zmq_recv (void *s, zmq_msg_t *msg, int flags);
////////////////////////////////////////////////////////////////////////////////
// I/O multiplexing.
////////////////////////////////////////////////////////////////////////////////
#define ZMQ_POLLIN 1
#define ZMQ_POLLOUT 2
// 'socket' is a 0MQ socket we want to poll on. If set to NULL, native file
// descriptor (socket) 'fd' will be used instead. 'events' defines event we
// are going to poll on - combination of ZMQ_POLLIN and ZMQ_POLLOUT. Error
// event does not exist for portability reasons. Errors from native sockets
// are reported as ZMQ_POLLIN. It's client's responsibilty to identify the
// error afterwards. 'revents' field is filled in after function returns. It's
// a combination of ZMQ_POLLIN and/or ZMQ_POLLOUT depending on the state of the
// socket.
typedef struct
{
void *socket;
int fd;
short events;
short revents;
} zmq_pollitem_t;
// Polls for the items specified by 'items'. Number of items in the array is
// determined by 'nitems' argument. Returns number of items signaled, -1
// in the case of error.
//
// Errors: EFAULT - there's a 0MQ socket in the pollset belonging to
// a different thread.
// ENOTSUP - 0MQ context was initialised without ZMQ_POLL flag.
// I/O multiplexing is disabled.
ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// Helper functions. // Helper functions.
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
......
...@@ -200,6 +200,11 @@ namespace zmq ...@@ -200,6 +200,11 @@ namespace zmq
throw error_t (); throw error_t ();
} }
inline operator void* ()
{
return ptr;
}
inline void setsockopt (int option_, const void *optval_, inline void setsockopt (int option_, const void *optval_,
size_t optvallen_) size_t optvallen_)
{ {
......
...@@ -85,7 +85,6 @@ libzmq_la_SOURCES = $(pgm_sources) \ ...@@ -85,7 +85,6 @@ libzmq_la_SOURCES = $(pgm_sources) \
ip.hpp \ ip.hpp \
i_endpoint.hpp \ i_endpoint.hpp \
i_engine.hpp \ i_engine.hpp \
i_poller.hpp \
i_poll_events.hpp \ i_poll_events.hpp \
i_signaler.hpp \ i_signaler.hpp \
kqueue.hpp \ kqueue.hpp \
...@@ -100,6 +99,7 @@ libzmq_la_SOURCES = $(pgm_sources) \ ...@@ -100,6 +99,7 @@ libzmq_la_SOURCES = $(pgm_sources) \
pipe.hpp \ pipe.hpp \
platform.hpp \ platform.hpp \
poll.hpp \ poll.hpp \
poller.hpp \
p2p.hpp \ p2p.hpp \
pub.hpp \ pub.hpp \
rep.hpp \ rep.hpp \
......
...@@ -69,7 +69,8 @@ void zmq::devpoll_t::devpoll_ctl (fd_t fd_, short events_) ...@@ -69,7 +69,8 @@ void zmq::devpoll_t::devpoll_ctl (fd_t fd_, short events_)
zmq_assert (rc == sizeof pfd); zmq_assert (rc == sizeof pfd);
} }
zmq::handle_t zmq::devpoll_t::add_fd (fd_t fd_, i_poll_events *reactor_) zmq::devpoll_t::handle_t zmq::devpoll_t::add_fd (fd_t fd_,
i_poll_events *reactor_)
{ {
assert (!fd_table [fd_].valid); assert (!fd_table [fd_].valid);
...@@ -84,17 +85,15 @@ zmq::handle_t zmq::devpoll_t::add_fd (fd_t fd_, i_poll_events *reactor_) ...@@ -84,17 +85,15 @@ zmq::handle_t zmq::devpoll_t::add_fd (fd_t fd_, i_poll_events *reactor_)
// Increase the load metric of the thread. // Increase the load metric of the thread.
load.add (1); load.add (1);
handle_t handle; return fd_;
handle.fd = fd_;
return handle;
} }
void zmq::devpoll_t::rm_fd (handle_t handle_) void zmq::devpoll_t::rm_fd (handle_t handle_)
{ {
assert (fd_table [handle_.fd].valid); assert (fd_table [handle_].valid);
devpoll_ctl (handle_.fd, POLLREMOVE); devpoll_ctl (handle_, POLLREMOVE);
fd_table [handle_.fd].valid = false; fd_table [handle_].valid = false;
// Decrease the load metric of the thread. // Decrease the load metric of the thread.
load.sub (1); load.sub (1);
...@@ -102,34 +101,30 @@ void zmq::devpoll_t::rm_fd (handle_t handle_) ...@@ -102,34 +101,30 @@ void zmq::devpoll_t::rm_fd (handle_t handle_)
void zmq::devpoll_t::set_pollin (handle_t handle_) void zmq::devpoll_t::set_pollin (handle_t handle_)
{ {
fd_t fd = handle_.fd; devpoll_ctl (handle_, POLLREMOVE);
devpoll_ctl (fd, POLLREMOVE); fd_table [handle_].events |= POLLIN;
fd_table [fd].events |= POLLIN; devpoll_ctl (handle_, fd_table [handle_].events);
devpoll_ctl (fd, fd_table [fd].events);
} }
void zmq::devpoll_t::reset_pollin (handle_t handle_) void zmq::devpoll_t::reset_pollin (handle_t handle_)
{ {
fd_t fd = handle_.fd; devpoll_ctl (handle_, POLLREMOVE);
devpoll_ctl (fd, POLLREMOVE); fd_table [handle_].events &= ~((short) POLLIN);
fd_table [fd].events &= ~((short) POLLIN); devpoll_ctl (handle_, fd_table [handle_].events);
devpoll_ctl (fd, fd_table [fd].events);
} }
void zmq::devpoll_t::set_pollout (handle_t handle_) void zmq::devpoll_t::set_pollout (handle_t handle_)
{ {
fd_t fd = handle_.fd; devpoll_ctl (handle_, POLLREMOVE);
devpoll_ctl (fd, POLLREMOVE); fd_table [handle_].events |= POLLOUT;
fd_table [fd].events |= POLLOUT; devpoll_ctl (handle_, fd_table [handle_].events);
devpoll_ctl (fd, fd_table [fd].events);
} }
void zmq::devpoll_t::reset_pollout (handle_t handle_) void zmq::devpoll_t::reset_pollout (handle_t handle_)
{ {
fd_t fd = handle_.fd; devpoll_ctl (handle_, POLLREMOVE);
devpoll_ctl (fd, POLLREMOVE); fd_table [handle_].events &= ~((short) POLLOUT);
fd_table [fd].events &= ~((short) POLLOUT); devpoll_ctl (handle_, fd_table [handle_].events);
devpoll_ctl (fd, fd_table [fd].events);
} }
void zmq::devpoll_t::add_timer (i_poll_events *events_) void zmq::devpoll_t::add_timer (i_poll_events *events_)
......
...@@ -26,7 +26,6 @@ ...@@ -26,7 +26,6 @@
#include <vector> #include <vector>
#include "i_poller.hpp"
#include "fd.hpp" #include "fd.hpp"
#include "thread.hpp" #include "thread.hpp"
#include "atomic_counter.hpp" #include "atomic_counter.hpp"
...@@ -37,22 +36,24 @@ namespace zmq ...@@ -37,22 +36,24 @@ namespace zmq
// Implements socket polling mechanism using the Solaris-specific // Implements socket polling mechanism using the Solaris-specific
// "/dev/poll" interface. // "/dev/poll" interface.
class devpoll_t : public i_poller class devpoll_t
{ {
public: public:
typedef fd_t handle_t;
devpoll_t (); devpoll_t ();
~devpoll_t (); ~devpoll_t ();
// i_poller implementation. // "poller" concept.
handle_t add_fd (fd_t fd_, i_poll_events *events_); handle_t add_fd (fd_t fd_, struct i_poll_events *events_);
void rm_fd (handle_t handle_); void rm_fd (handle_t handle_);
void set_pollin (handle_t handle_); void set_pollin (handle_t handle_);
void reset_pollin (handle_t handle_); void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_); void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_); void reset_pollout (handle_t handle_);
void add_timer (i_poll_events *events_); void add_timer (struct i_poll_events *events_);
void cancel_timer (i_poll_events *events_); void cancel_timer (struct i_poll_events *events_);
int get_load (); int get_load ();
void start (); void start ();
void stop (); void stop ();
......
...@@ -52,7 +52,7 @@ zmq::epoll_t::~epoll_t () ...@@ -52,7 +52,7 @@ zmq::epoll_t::~epoll_t ()
delete *it; delete *it;
} }
zmq::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_) zmq::epoll_t::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_)
{ {
poll_entry_t *pe = new poll_entry_t; poll_entry_t *pe = new poll_entry_t;
zmq_assert (pe != NULL); zmq_assert (pe != NULL);
...@@ -72,14 +72,12 @@ zmq::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_) ...@@ -72,14 +72,12 @@ zmq::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_)
// Increase the load metric of the thread. // Increase the load metric of the thread.
load.add (1); load.add (1);
handle_t handle; return pe;
handle.ptr = pe;
return handle;
} }
void zmq::epoll_t::rm_fd (handle_t handle_) void zmq::epoll_t::rm_fd (handle_t handle_)
{ {
poll_entry_t *pe = (poll_entry_t*) handle_.ptr; poll_entry_t *pe = (poll_entry_t*) handle_;
int rc = epoll_ctl (epoll_fd, EPOLL_CTL_DEL, pe->fd, &pe->ev); int rc = epoll_ctl (epoll_fd, EPOLL_CTL_DEL, pe->fd, &pe->ev);
errno_assert (rc != -1); errno_assert (rc != -1);
pe->fd = retired_fd; pe->fd = retired_fd;
...@@ -91,7 +89,7 @@ void zmq::epoll_t::rm_fd (handle_t handle_) ...@@ -91,7 +89,7 @@ void zmq::epoll_t::rm_fd (handle_t handle_)
void zmq::epoll_t::set_pollin (handle_t handle_) void zmq::epoll_t::set_pollin (handle_t handle_)
{ {
poll_entry_t *pe = (poll_entry_t*) handle_.ptr; poll_entry_t *pe = (poll_entry_t*) handle_;
pe->ev.events |= EPOLLIN; pe->ev.events |= EPOLLIN;
int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
errno_assert (rc != -1); errno_assert (rc != -1);
...@@ -99,7 +97,7 @@ void zmq::epoll_t::set_pollin (handle_t handle_) ...@@ -99,7 +97,7 @@ void zmq::epoll_t::set_pollin (handle_t handle_)
void zmq::epoll_t::reset_pollin (handle_t handle_) void zmq::epoll_t::reset_pollin (handle_t handle_)
{ {
poll_entry_t *pe = (poll_entry_t*) handle_.ptr; poll_entry_t *pe = (poll_entry_t*) handle_;
pe->ev.events &= ~((short) EPOLLIN); pe->ev.events &= ~((short) EPOLLIN);
int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
errno_assert (rc != -1); errno_assert (rc != -1);
...@@ -107,7 +105,7 @@ void zmq::epoll_t::reset_pollin (handle_t handle_) ...@@ -107,7 +105,7 @@ void zmq::epoll_t::reset_pollin (handle_t handle_)
void zmq::epoll_t::set_pollout (handle_t handle_) void zmq::epoll_t::set_pollout (handle_t handle_)
{ {
poll_entry_t *pe = (poll_entry_t*) handle_.ptr; poll_entry_t *pe = (poll_entry_t*) handle_;
pe->ev.events |= EPOLLOUT; pe->ev.events |= EPOLLOUT;
int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
errno_assert (rc != -1); errno_assert (rc != -1);
...@@ -115,7 +113,7 @@ void zmq::epoll_t::set_pollout (handle_t handle_) ...@@ -115,7 +113,7 @@ void zmq::epoll_t::set_pollout (handle_t handle_)
void zmq::epoll_t::reset_pollout (handle_t handle_) void zmq::epoll_t::reset_pollout (handle_t handle_)
{ {
poll_entry_t *pe = (poll_entry_t*) handle_.ptr; poll_entry_t *pe = (poll_entry_t*) handle_;
pe->ev.events &= ~((short) EPOLLOUT); pe->ev.events &= ~((short) EPOLLOUT);
int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
errno_assert (rc != -1); errno_assert (rc != -1);
......
...@@ -27,8 +27,6 @@ ...@@ -27,8 +27,6 @@
#include <vector> #include <vector>
#include <sys/epoll.h> #include <sys/epoll.h>
#include "i_poller.hpp"
//#include "i_poll_events.hpp"
#include "fd.hpp" #include "fd.hpp"
#include "thread.hpp" #include "thread.hpp"
#include "atomic_counter.hpp" #include "atomic_counter.hpp"
...@@ -39,22 +37,24 @@ namespace zmq ...@@ -39,22 +37,24 @@ namespace zmq
// This class implements socket polling mechanism using the Linux-specific // This class implements socket polling mechanism using the Linux-specific
// epoll mechanism. // epoll mechanism.
class epoll_t : public i_poller class epoll_t
{ {
public: public:
typedef void* handle_t;
epoll_t (); epoll_t ();
~epoll_t (); ~epoll_t ();
// i_poller implementation. // "poller" concept.
handle_t add_fd (fd_t fd_, i_poll_events *events_); handle_t add_fd (fd_t fd_, struct i_poll_events *events_);
void rm_fd (handle_t handle_); void rm_fd (handle_t handle_);
void set_pollin (handle_t handle_); void set_pollin (handle_t handle_);
void reset_pollin (handle_t handle_); void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_); void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_); void reset_pollout (handle_t handle_);
void add_timer (i_poll_events *events_); void add_timer (struct i_poll_events *events_);
void cancel_timer (i_poll_events *events_); void cancel_timer (struct i_poll_events *events_);
int get_load (); int get_load ();
void start (); void start ();
void stop (); void stop ();
......
...@@ -17,6 +17,8 @@ ...@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "../bindings/c/zmq.h"
#include "err.hpp" #include "err.hpp"
#include "platform.hpp" #include "platform.hpp"
...@@ -24,8 +26,6 @@ ...@@ -24,8 +26,6 @@
const char *zmq::wsa_error() const char *zmq::wsa_error()
{ {
int errcode = WSAGetLastError (); int errcode = WSAGetLastError ();
// TODO: This is not a generic way to handle this... // TODO: This is not a generic way to handle this...
if (errcode == WSAEWOULDBLOCK) if (errcode == WSAEWOULDBLOCK)
...@@ -148,4 +148,43 @@ void zmq::win_error (char *buffer_, size_t buffer_size_) ...@@ -148,4 +148,43 @@ void zmq::win_error (char *buffer_, size_t buffer_size_)
zmq_assert (rc); zmq_assert (rc);
} }
void zmq::wsa_error_to_errno ()
{
int errcode = WSAGetLastError ();
switch (errcode) {
case WSAEINPROGRESS:
errno = EAGAIN;
return;
case WSAEBADF:
errno = EBADF;
return;
case WSAEINVAL:
errno = EINVAL;
return;
case WSAEMFILE:
errno = EMFILE;
return;
case WSAEFAULT:
errno = EFAULT;
return;
case WSAEPROTONOSUPPORT:
errno = EPROTONOSUPPORT;
return;
case WSAENOBUFS:
errno = ENOBUFS;
return;
case WSAENETDOWN:
errno = ENETDOWN;
return;
case WSAEADDRINUSE:
errno = EADDRINUSE;
return;
case WSAEADDRNOTAVAIL:
errno = EADDRNOTAVAIL;
return;
default:
wsa_assert (false);
}
}
#endif #endif
...@@ -41,6 +41,7 @@ namespace zmq ...@@ -41,6 +41,7 @@ namespace zmq
const char *wsa_error (); const char *wsa_error ();
void win_error (char *buffer_, size_t buffer_size_); void win_error (char *buffer_, size_t buffer_size_);
void wsa_error_to_errno ();
} }
......
...@@ -44,8 +44,6 @@ namespace zmq ...@@ -44,8 +44,6 @@ namespace zmq
void signal (int signal_); void signal (int signal_);
uint64_t poll (); uint64_t poll ();
uint64_t check (); uint64_t check ();
// Get the file descriptor associated with the object.
fd_t get_fd (); fd_t get_fd ();
private: private:
......
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_I_POLLER_HPP_INCLUDED__
#define __ZMQ_I_POLLER_HPP_INCLUDED__
#include "fd.hpp"
namespace zmq
{
union handle_t
{
fd_t fd;
void *ptr;
};
// Virtual interface to be used when polling on file descriptors.
struct i_poller
{
virtual ~i_poller () {};
// Add file descriptor to the polling set. Return handle
// representing the descriptor. 'events' interface will be used
// to invoke callback functions when event occurs.
virtual handle_t add_fd (fd_t fd_, struct i_poll_events *events_) = 0;
// Remove file descriptor identified by handle from the polling set.
virtual void rm_fd (handle_t handle_) = 0;
// Start polling for input from socket.
virtual void set_pollin (handle_t handle_) = 0;
// Stop polling for input from socket.
virtual void reset_pollin (handle_t handle_) = 0;
// Start polling for availability of the socket for writing.
virtual void set_pollout (handle_t handle_) = 0;
// Stop polling for availability of the socket for writing.
virtual void reset_pollout (handle_t handle_) = 0;
// Ask to be notified after some time. Actual interval varies between
// 0 and max_timer_period ms. Timer is destroyed once it expires or,
// optionally, when cancel_timer is called.
virtual void add_timer (struct i_poll_events *events_) = 0;
// Cancel the timer set by add_timer method.
virtual void cancel_timer (struct i_poll_events *events_) = 0;
// Returns load experienced by the I/O thread. Currently it's number
// of file descriptors handled by the poller, in the future we may
// use a metric taking actual traffic on the individual sockets into
// account.
virtual int get_load () = 0;
// Start the execution of the underlying I/O thread.
// This method is called from a foreign thread.
virtual void start () = 0;
// Ask underlying I/O thread to stop.
virtual void stop () = 0;
};
}
#endif
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#define __ZMQ_I_SIGNALER_HPP_INCLUDED__ #define __ZMQ_I_SIGNALER_HPP_INCLUDED__
#include "stdint.hpp" #include "stdint.hpp"
#include "fd.hpp"
namespace zmq namespace zmq
{ {
...@@ -42,6 +43,11 @@ namespace zmq ...@@ -42,6 +43,11 @@ namespace zmq
// Same as poll, however, if there is no signal available, // Same as poll, however, if there is no signal available,
// function returns zero immediately instead of waiting for a signal. // function returns zero immediately instead of waiting for a signal.
virtual uint64_t check () = 0; virtual uint64_t check () = 0;
// Returns file descriptor that allows waiting for signals. Specific
// signalers may not support this functionality. If so, the function
// returns retired_fd.
virtual fd_t get_fd () = 0;
}; };
} }
......
...@@ -36,7 +36,7 @@ void zmq::io_object_t::set_io_thread (io_thread_t *io_thread_) ...@@ -36,7 +36,7 @@ void zmq::io_object_t::set_io_thread (io_thread_t *io_thread_)
poller = io_thread_->get_poller (); poller = io_thread_->get_poller ();
} }
zmq::handle_t zmq::io_object_t::add_fd (fd_t fd_) zmq::io_object_t::handle_t zmq::io_object_t::add_fd (fd_t fd_)
{ {
return poller->add_fd (fd_, this); return poller->add_fd (fd_, this);
} }
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
#include <stddef.h> #include <stddef.h>
#include "i_poller.hpp" #include "poller.hpp"
#include "i_poll_events.hpp" #include "i_poll_events.hpp"
namespace zmq namespace zmq
...@@ -41,6 +41,8 @@ namespace zmq ...@@ -41,6 +41,8 @@ namespace zmq
protected: protected:
typedef poller_t::handle_t handle_t;
// Derived class can init/swap the underlying I/O thread. // Derived class can init/swap the underlying I/O thread.
// Caution: Remove all the file descriptors from the old I/O thread // Caution: Remove all the file descriptors from the old I/O thread
// before swapping to the new one! // before swapping to the new one!
...@@ -63,7 +65,7 @@ namespace zmq ...@@ -63,7 +65,7 @@ namespace zmq
private: private:
struct i_poller *poller; poller_t *poller;
io_object_t (const io_object_t&); io_object_t (const io_object_t&);
void operator = (const io_object_t&); void operator = (const io_object_t&);
......
...@@ -24,11 +24,6 @@ ...@@ -24,11 +24,6 @@
#include "platform.hpp" #include "platform.hpp"
#include "err.hpp" #include "err.hpp"
#include "command.hpp" #include "command.hpp"
#include "epoll.hpp"
#include "poll.hpp"
#include "select.hpp"
#include "devpoll.hpp"
#include "kqueue.hpp"
#include "dispatcher.hpp" #include "dispatcher.hpp"
#include "simple_semaphore.hpp" #include "simple_semaphore.hpp"
...@@ -36,39 +31,7 @@ zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_, ...@@ -36,39 +31,7 @@ zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_,
int flags_) : int flags_) :
object_t (dispatcher_, thread_slot_) object_t (dispatcher_, thread_slot_)
{ {
#if defined ZMQ_FORCE_SELECT poller = new poller_t;
poller = new select_t;
#elif defined ZMQ_FORCE_POLL
poller = new poll_t;
#elif defined ZMQ_FORCE_EPOLL
poller = new epoll_t;
#elif defined ZMQ_FORCE_DEVPOLL
poller = new devpoll_t;
#elif defined ZMQ_FORCE_KQUEUE
poller = new kqueue_t;
#elif defined ZMQ_HAVE_LINUX
poller = new epoll_t;
#elif defined ZMQ_HAVE_WINDOWS
poller = new select_t;
#elif defined ZMQ_HAVE_FREEBSD
poller = new kqueue_t;
#elif defined ZMQ_HAVE_OPENBSD
poller = new kqueue_t;
#elif defined ZMQ_HAVE_SOLARIS
poller = new devpoll_t;
#elif defined ZMQ_HAVE_OSX
poller = new kqueue_t;
#elif defined ZMQ_HAVE_QNXNTO
poller = new poll_t;
#elif defined ZMQ_HAVE_AIX
poller = new poll_t;
#elif defined ZMQ_HAVE_HPUX
poller = new devpoll_t;
#elif defined ZMQ_HAVE_OPENVMS
poller = new select_t;
#else
#error Unsupported platform
#endif
zmq_assert (poller); zmq_assert (poller);
signaler_handle = poller->add_fd (signaler.get_fd (), this); signaler_handle = poller->add_fd (signaler.get_fd (), this);
...@@ -134,7 +97,7 @@ void zmq::io_thread_t::timer_event () ...@@ -134,7 +97,7 @@ void zmq::io_thread_t::timer_event ()
zmq_assert (false); zmq_assert (false);
} }
zmq::i_poller *zmq::io_thread_t::get_poller () zmq::poller_t *zmq::io_thread_t::get_poller ()
{ {
zmq_assert (poller); zmq_assert (poller);
return poller; return poller;
......
...@@ -23,7 +23,7 @@ ...@@ -23,7 +23,7 @@
#include <vector> #include <vector>
#include "object.hpp" #include "object.hpp"
#include "i_poller.hpp" #include "poller.hpp"
#include "i_poll_events.hpp" #include "i_poll_events.hpp"
#include "fd_signaler.hpp" #include "fd_signaler.hpp"
...@@ -59,7 +59,7 @@ namespace zmq ...@@ -59,7 +59,7 @@ namespace zmq
void timer_event (); void timer_event ();
// Used by io_objects to retrieve the assciated poller object. // Used by io_objects to retrieve the assciated poller object.
struct i_poller *get_poller (); poller_t *get_poller ();
// Command handlers. // Command handlers.
void process_stop (); void process_stop ();
...@@ -74,10 +74,10 @@ namespace zmq ...@@ -74,10 +74,10 @@ namespace zmq
fd_signaler_t signaler; fd_signaler_t signaler;
// Handle associated with signaler's file descriptor. // Handle associated with signaler's file descriptor.
handle_t signaler_handle; poller_t::handle_t signaler_handle;
// I/O multiplexing is performed using a poller object. // I/O multiplexing is performed using a poller object.
i_poller *poller; poller_t *poller;
}; };
} }
......
...@@ -68,7 +68,8 @@ void zmq::kqueue_t::kevent_delete (fd_t fd_, short filter_) ...@@ -68,7 +68,8 @@ void zmq::kqueue_t::kevent_delete (fd_t fd_, short filter_)
errno_assert (rc != -1); errno_assert (rc != -1);
} }
zmq::handle_t zmq::kqueue_t::add_fd (fd_t fd_, i_poll_events *reactor_) zmq::kqueue_t::handle_t zmq::kqueue_t::add_fd (fd_t fd_,
i_poll_events *reactor_)
{ {
poll_entry_t *pe = new poll_entry_t; poll_entry_t *pe = new poll_entry_t;
zmq_assert (pe != NULL); zmq_assert (pe != NULL);
...@@ -78,14 +79,12 @@ zmq::handle_t zmq::kqueue_t::add_fd (fd_t fd_, i_poll_events *reactor_) ...@@ -78,14 +79,12 @@ zmq::handle_t zmq::kqueue_t::add_fd (fd_t fd_, i_poll_events *reactor_)
pe->flag_pollout = 0; pe->flag_pollout = 0;
pe->reactor = reactor_; pe->reactor = reactor_;
handle_t handle; return pe;
handle.ptr = pe;
return handle;
} }
void zmq::kqueue_t::rm_fd (handle_t handle_) void zmq::kqueue_t::rm_fd (handle_t handle_)
{ {
poll_entry_t *pe = (poll_entry_t*) handle_.ptr; poll_entry_t *pe = (poll_entry_t*) handle_;
if (pe->flag_pollin) if (pe->flag_pollin)
kevent_delete (pe->fd, EVFILT_READ); kevent_delete (pe->fd, EVFILT_READ);
if (pe->flag_pollout) if (pe->flag_pollout)
...@@ -96,28 +95,28 @@ void zmq::kqueue_t::rm_fd (handle_t handle_) ...@@ -96,28 +95,28 @@ void zmq::kqueue_t::rm_fd (handle_t handle_)
void zmq::kqueue_t::set_pollin (handle_t handle_) void zmq::kqueue_t::set_pollin (handle_t handle_)
{ {
poll_entry_t *pe = (poll_entry_t*) handle_.ptr; poll_entry_t *pe = (poll_entry_t*) handle_;
pe->flag_pollin = true; pe->flag_pollin = true;
kevent_add (pe->fd, EVFILT_READ, pe); kevent_add (pe->fd, EVFILT_READ, pe);
} }
void zmq::kqueue_t::reset_pollin (handle_t handle_) void zmq::kqueue_t::reset_pollin (handle_t handle_)
{ {
poll_entry_t *pe = (poll_entry_t*) handle_.ptr; poll_entry_t *pe = (poll_entry_t*) handle_;
pe->flag_pollin = false; pe->flag_pollin = false;
kevent_delete (pe->fd, EVFILT_READ); kevent_delete (pe->fd, EVFILT_READ);
} }
void zmq::kqueue_t::set_pollout (handle_t handle_) void zmq::kqueue_t::set_pollout (handle_t handle_)
{ {
poll_entry_t *pe = (poll_entry_t*) handle_.ptr; poll_entry_t *pe = (poll_entry_t*) handle_;
pe->flag_pollout = true; pe->flag_pollout = true;
kevent_add (pe->fd, EVFILT_WRITE, pe); kevent_add (pe->fd, EVFILT_WRITE, pe);
} }
void zmq::kqueue_t::reset_pollout (handle_t handle_) void zmq::kqueue_t::reset_pollout (handle_t handle_)
{ {
poll_entry_t *pe = (poll_entry_t*) handle_.ptr; poll_entry_t *pe = (poll_entry_t*) handle_;
pe->flag_pollout = false; pe->flag_pollout = false;
kevent_delete (pe->fd, EVFILT_WRITE); kevent_delete (pe->fd, EVFILT_WRITE);
} }
......
...@@ -26,7 +26,6 @@ ...@@ -26,7 +26,6 @@
#include <vector> #include <vector>
#include "i_poller.hpp"
#include "fd.hpp" #include "fd.hpp"
#include "thread.hpp" #include "thread.hpp"
#include "atomic_counter.hpp" #include "atomic_counter.hpp"
...@@ -37,22 +36,24 @@ namespace zmq ...@@ -37,22 +36,24 @@ namespace zmq
// Implements socket polling mechanism using the BSD-specific // Implements socket polling mechanism using the BSD-specific
// kqueue interface. // kqueue interface.
class kqueue_t : public i_poller class kqueue_t
{ {
public: public:
typedef void* handle_t;
kqueue_t (); kqueue_t ();
~kqueue_t (); ~kqueue_t ();
// i_poller implementation. // "poller" concept.
handle_t add_fd (fd_t fd_, i_poll_events *events_); handle_t add_fd (fd_t fd_, struct i_poll_events *events_);
void rm_fd (handle_t handle_); void rm_fd (handle_t handle_);
void set_pollin (handle_t handle_); void set_pollin (handle_t handle_);
void reset_pollin (handle_t handle_); void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_); void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_); void reset_pollout (handle_t handle_);
void add_timer (i_poll_events *events_); void add_timer (struct i_poll_events *events_);
void cancel_timer (i_poll_events *events_); void cancel_timer (struct i_poll_events *events_);
int get_load (); int get_load ();
void start (); void start ();
void stop (); void stop ();
......
...@@ -84,4 +84,15 @@ int zmq::p2p_t::xrecv (zmq_msg_t *msg_, int flags_) ...@@ -84,4 +84,15 @@ int zmq::p2p_t::xrecv (zmq_msg_t *msg_, int flags_)
return 0; return 0;
} }
bool zmq::p2p_t::xhas_in ()
{
zmq_assert (false);
return false;
}
bool zmq::p2p_t::xhas_out ()
{
zmq_assert (false);
return false;
}
...@@ -42,6 +42,8 @@ namespace zmq ...@@ -42,6 +42,8 @@ namespace zmq
int xsend (zmq_msg_t *msg_, int flags_); int xsend (zmq_msg_t *msg_, int flags_);
int xflush (); int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_); int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
private: private:
......
...@@ -36,6 +36,17 @@ zmq::reader_t::~reader_t () ...@@ -36,6 +36,17 @@ zmq::reader_t::~reader_t ()
{ {
} }
bool zmq::reader_t::check_read ()
{
// Check if there's an item in the pipe.
if (pipe->check_read ())
return true;
// If not, deactivate the pipe.
endpoint->kill (this);
return false;
}
bool zmq::reader_t::read (zmq_msg_t *msg_) bool zmq::reader_t::read (zmq_msg_t *msg_)
{ {
if (!pipe->read (msg_)) { if (!pipe->read (msg_)) {
......
...@@ -42,6 +42,9 @@ namespace zmq ...@@ -42,6 +42,9 @@ namespace zmq
void set_endpoint (i_endpoint *endpoint_); void set_endpoint (i_endpoint *endpoint_);
// Returns true if there is at least one message to read in the pipe.
bool check_read ();
// Reads a message to the underlying pipe. // Reads a message to the underlying pipe.
bool read (zmq_msg_t *msg_); bool read (zmq_msg_t *msg_);
......
...@@ -58,7 +58,7 @@ zmq::poll_t::~poll_t () ...@@ -58,7 +58,7 @@ zmq::poll_t::~poll_t ()
zmq_assert (load.get () == 0); zmq_assert (load.get () == 0);
} }
zmq::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_) zmq::poll_t::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_)
{ {
pollfd pfd = {fd_, 0, 0}; pollfd pfd = {fd_, 0, 0};
pollset.push_back (pfd); pollset.push_back (pfd);
...@@ -70,19 +70,17 @@ zmq::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_) ...@@ -70,19 +70,17 @@ zmq::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_)
// Increase the load metric of the thread. // Increase the load metric of the thread.
load.add (1); load.add (1);
handle_t handle; return fd_;
handle.fd = fd_;
return handle;
} }
void zmq::poll_t::rm_fd (handle_t handle_) void zmq::poll_t::rm_fd (handle_t handle_)
{ {
fd_t index = fd_table [handle_.fd].index; fd_t index = fd_table [handle_].index;
assert (index != retired_fd); assert (index != retired_fd);
// Mark the fd as unused. // Mark the fd as unused.
pollset [index].fd = retired_fd; pollset [index].fd = retired_fd;
fd_table [handle_.fd].index = retired_fd; fd_table [handle_].index = retired_fd;
retired = true; retired = true;
// Decrease the load metric of the thread. // Decrease the load metric of the thread.
...@@ -91,25 +89,25 @@ void zmq::poll_t::rm_fd (handle_t handle_) ...@@ -91,25 +89,25 @@ void zmq::poll_t::rm_fd (handle_t handle_)
void zmq::poll_t::set_pollin (handle_t handle_) void zmq::poll_t::set_pollin (handle_t handle_)
{ {
int index = fd_table [handle_.fd].index; int index = fd_table [handle_].index;
pollset [index].events |= POLLIN; pollset [index].events |= POLLIN;
} }
void zmq::poll_t::reset_pollin (handle_t handle_) void zmq::poll_t::reset_pollin (handle_t handle_)
{ {
int index = fd_table [handle_.fd].index; int index = fd_table [handle_].index;
pollset [index].events &= ~((short) POLLIN); pollset [index].events &= ~((short) POLLIN);
} }
void zmq::poll_t::set_pollout (handle_t handle_) void zmq::poll_t::set_pollout (handle_t handle_)
{ {
int index = fd_table [handle_.fd].index; int index = fd_table [handle_].index;
pollset [index].events |= POLLOUT; pollset [index].events |= POLLOUT;
} }
void zmq::poll_t::reset_pollout (handle_t handle_) void zmq::poll_t::reset_pollout (handle_t handle_)
{ {
int index = fd_table [handle_.fd].index; int index = fd_table [handle_].index;
pollset [index].events &= ~((short) POLLOUT); pollset [index].events &= ~((short) POLLOUT);
} }
......
...@@ -31,7 +31,6 @@ ...@@ -31,7 +31,6 @@
#include <stddef.h> #include <stddef.h>
#include <vector> #include <vector>
#include "i_poller.hpp"
#include "fd.hpp" #include "fd.hpp"
#include "thread.hpp" #include "thread.hpp"
#include "atomic_counter.hpp" #include "atomic_counter.hpp"
...@@ -42,22 +41,24 @@ namespace zmq ...@@ -42,22 +41,24 @@ namespace zmq
// Implements socket polling mechanism using the POSIX.1-2001 // Implements socket polling mechanism using the POSIX.1-2001
// poll() system call. // poll() system call.
class poll_t : public i_poller class poll_t
{ {
public: public:
typedef fd_t handle_t;
poll_t (); poll_t ();
~poll_t (); ~poll_t ();
// i_poller implementation. // "poller" concept.
handle_t add_fd (fd_t fd_, i_poll_events *events_); handle_t add_fd (fd_t fd_, struct i_poll_events *events_);
void rm_fd (handle_t handle_); void rm_fd (handle_t handle_);
void set_pollin (handle_t handle_); void set_pollin (handle_t handle_);
void reset_pollin (handle_t handle_); void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_); void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_); void reset_pollout (handle_t handle_);
void add_timer (i_poll_events *events_); void add_timer (struct i_poll_events *events_);
void cancel_timer (i_poll_events *events_); void cancel_timer (struct i_poll_events *events_);
int get_load (); int get_load ();
void start (); void start ();
void stop (); void stop ();
......
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_POLLER_HPP_INCLUDED__
#define __ZMQ_POLLER_HPP_INCLUDED__
#include "epoll.hpp"
#include "poll.hpp"
#include "select.hpp"
#include "devpoll.hpp"
#include "kqueue.hpp"
namespace zmq
{
#if defined ZMQ_FORCE_SELECT
typedef select_t poller_t;
#elif defined ZMQ_FORCE_POLL
typedef poll_t poller_t;
#elif defined ZMQ_FORCE_EPOLL
typedef epoll_t poller_t;
#elif defined ZMQ_FORCE_DEVPOLL
typedef devpoll_t poller_t;
#elif defined ZMQ_FORCE_KQUEUE
typedef kqueue_t poller_t;
#elif defined ZMQ_HAVE_LINUX
typedef epoll_t poller_t;
#elif defined ZMQ_HAVE_WINDOWS
typedef select_t poller_t;
#elif defined ZMQ_HAVE_FREEBSD
typedef kqueue_t poller_t;
#elif defined ZMQ_HAVE_OPENBSD
typedef kqueue_t poller_t;
#elif defined ZMQ_HAVE_SOLARIS
typedef devpoll_t poller_t;
#elif defined ZMQ_HAVE_OSX
typedef kqueue_t poller_t;
#elif defined ZMQ_HAVE_QNXNTO
typedef poll_t poller_t;
#elif defined ZMQ_HAVE_AIX
typedef poll_t poller_t;
#elif defined ZMQ_HAVE_HPUX
typedef devpoll_t poller_t;
#elif defined ZMQ_HAVE_OPENVMS
typedef select_t poller_t;
#else
#error Unsupported platform
#endif
}
#endif
...@@ -156,3 +156,14 @@ int zmq::pub_t::xrecv (zmq_msg_t *msg_, int flags_) ...@@ -156,3 +156,14 @@ int zmq::pub_t::xrecv (zmq_msg_t *msg_, int flags_)
return -1; return -1;
} }
bool zmq::pub_t::xhas_in ()
{
return false;
}
bool zmq::pub_t::xhas_out ()
{
// TODO: Reimplement when queue limits are added.
return true;
}
...@@ -43,6 +43,8 @@ namespace zmq ...@@ -43,6 +43,8 @@ namespace zmq
int xsend (zmq_msg_t *msg_, int flags_); int xsend (zmq_msg_t *msg_, int flags_);
int xflush (); int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_); int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
private: private:
......
...@@ -195,4 +195,21 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_) ...@@ -195,4 +195,21 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)
return -1; return -1;
} }
bool zmq::rep_t::xhas_in ()
{
for (int count = active; count != 0; count--) {
if (in_pipes [current]->check_read ())
return !waiting_for_reply;
current++;
if (current >= active)
current = 0;
}
return false;
}
bool zmq::rep_t::xhas_out ()
{
return waiting_for_reply;
}
...@@ -43,6 +43,8 @@ namespace zmq ...@@ -43,6 +43,8 @@ namespace zmq
int xsend (zmq_msg_t *msg_, int flags_); int xsend (zmq_msg_t *msg_, int flags_);
int xflush (); int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_); int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
private: private:
......
...@@ -195,4 +195,17 @@ int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_) ...@@ -195,4 +195,17 @@ int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_)
return 0; return 0;
} }
bool zmq::req_t::xhas_in ()
{
if (reply_pipe->check_read ())
return waiting_for_reply;
return false;
}
bool zmq::req_t::xhas_out ()
{
return !waiting_for_reply;
}
...@@ -43,6 +43,8 @@ namespace zmq ...@@ -43,6 +43,8 @@ namespace zmq
int xsend (zmq_msg_t *msg_, int flags_); int xsend (zmq_msg_t *msg_, int flags_);
int xflush (); int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_); int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
private: private:
......
...@@ -59,7 +59,7 @@ zmq::select_t::~select_t () ...@@ -59,7 +59,7 @@ zmq::select_t::~select_t ()
zmq_assert (load.get () == 0); zmq_assert (load.get () == 0);
} }
zmq::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_) zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_)
{ {
// Store the file descriptor. // Store the file descriptor.
fd_entry_t entry = {fd_, events_}; fd_entry_t entry = {fd_, events_};
...@@ -75,38 +75,33 @@ zmq::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_) ...@@ -75,38 +75,33 @@ zmq::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_)
// Increase the load metric of the thread. // Increase the load metric of the thread.
load.add (1); load.add (1);
handle_t handle; return fd_;
handle.fd = fd_;
return handle;
} }
void zmq::select_t::rm_fd (handle_t handle_) void zmq::select_t::rm_fd (handle_t handle_)
{ {
// Get file descriptor.
fd_t fd = handle_.fd;
// Mark the descriptor as retired. // Mark the descriptor as retired.
fd_set_t::iterator it; fd_set_t::iterator it;
for (it = fds.begin (); it != fds.end (); it ++) for (it = fds.begin (); it != fds.end (); it ++)
if (it->fd == fd) if (it->fd == handle_)
break; break;
zmq_assert (it != fds.end ()); zmq_assert (it != fds.end ());
it->fd = retired_fd; it->fd = retired_fd;
retired = true; retired = true;
// Stop polling on the descriptor. // Stop polling on the descriptor.
FD_CLR (fd, &source_set_in); FD_CLR (handle_, &source_set_in);
FD_CLR (fd, &source_set_out); FD_CLR (handle_, &source_set_out);
FD_CLR (fd, &source_set_err); FD_CLR (handle_, &source_set_err);
// Discard all events generated on this file descriptor. // Discard all events generated on this file descriptor.
FD_CLR (fd, &readfds); FD_CLR (handle_, &readfds);
FD_CLR (fd, &writefds); FD_CLR (handle_, &writefds);
FD_CLR (fd, &exceptfds); FD_CLR (handle_, &exceptfds);
// Adjust the maxfd attribute if we have removed the // Adjust the maxfd attribute if we have removed the
// highest-numbered file descriptor. // highest-numbered file descriptor.
if (fd == maxfd) { if (handle_ == maxfd) {
maxfd = retired_fd; maxfd = retired_fd;
for (fd_set_t::iterator it = fds.begin (); it != fds.end (); it ++) for (fd_set_t::iterator it = fds.begin (); it != fds.end (); it ++)
if (it->fd > maxfd) if (it->fd > maxfd)
...@@ -119,22 +114,22 @@ void zmq::select_t::rm_fd (handle_t handle_) ...@@ -119,22 +114,22 @@ void zmq::select_t::rm_fd (handle_t handle_)
void zmq::select_t::set_pollin (handle_t handle_) void zmq::select_t::set_pollin (handle_t handle_)
{ {
FD_SET (handle_.fd, &source_set_in); FD_SET (handle_, &source_set_in);
} }
void zmq::select_t::reset_pollin (handle_t handle_) void zmq::select_t::reset_pollin (handle_t handle_)
{ {
FD_CLR (handle_.fd, &source_set_in); FD_CLR (handle_, &source_set_in);
} }
void zmq::select_t::set_pollout (handle_t handle_) void zmq::select_t::set_pollout (handle_t handle_)
{ {
FD_SET (handle_.fd, &source_set_out); FD_SET (handle_, &source_set_out);
} }
void zmq::select_t::reset_pollout (handle_t handle_) void zmq::select_t::reset_pollout (handle_t handle_)
{ {
FD_CLR (handle_.fd, &source_set_out); FD_CLR (handle_, &source_set_out);
} }
void zmq::select_t::add_timer (i_poll_events *events_) void zmq::select_t::add_timer (i_poll_events *events_)
......
...@@ -34,7 +34,6 @@ ...@@ -34,7 +34,6 @@
#include <sys/select.h> #include <sys/select.h>
#endif #endif
#include "i_poller.hpp"
#include "fd.hpp" #include "fd.hpp"
#include "thread.hpp" #include "thread.hpp"
#include "atomic_counter.hpp" #include "atomic_counter.hpp"
...@@ -45,22 +44,24 @@ namespace zmq ...@@ -45,22 +44,24 @@ namespace zmq
// Implements socket polling mechanism using POSIX.1-2001 select() // Implements socket polling mechanism using POSIX.1-2001 select()
// function. // function.
class select_t : public i_poller class select_t
{ {
public: public:
typedef fd_t handle_t;
select_t (); select_t ();
~select_t (); ~select_t ();
// i_poller implementation. // "poller" concept.
handle_t add_fd (fd_t fd_, i_poll_events *events_); handle_t add_fd (fd_t fd_, struct i_poll_events *events_);
void rm_fd (handle_t handle_); void rm_fd (handle_t handle_);
void set_pollin (handle_t handle_); void set_pollin (handle_t handle_);
void reset_pollin (handle_t handle_); void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_); void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_); void reset_pollout (handle_t handle_);
void add_timer (i_poll_events *events_); void add_timer (struct i_poll_events *events_);
void cancel_timer (i_poll_events *events_); void cancel_timer (struct i_poll_events *events_);
int get_load (); int get_load ();
void start (); void start ();
void stop (); void stop ();
......
...@@ -364,6 +364,21 @@ int zmq::socket_base_t::close () ...@@ -364,6 +364,21 @@ int zmq::socket_base_t::close ()
return 0; return 0;
} }
zmq::app_thread_t *zmq::socket_base_t::get_thread ()
{
return app_thread;
}
bool zmq::socket_base_t::has_in ()
{
return xhas_in ();
}
bool zmq::socket_base_t::has_out ()
{
return xhas_out ();
}
bool zmq::socket_base_t::register_session (const char *name_, bool zmq::socket_base_t::register_session (const char *name_,
session_t *session_) session_t *session_)
{ {
......
...@@ -54,6 +54,16 @@ namespace zmq ...@@ -54,6 +54,16 @@ namespace zmq
int recv (zmq_msg_t *msg_, int flags_); int recv (zmq_msg_t *msg_, int flags_);
int close (); int close ();
// This function is used by the polling mechanism to determine
// whether the socket belongs to the application thread the poll
// is called from.
class app_thread_t *get_thread ();
// These functions are used by the polling mechanism to determine
// which events are to be reported from this socket.
bool has_in ();
bool has_out ();
// The list of sessions cannot be accessed via inter-thread // The list of sessions cannot be accessed via inter-thread
// commands as it is unacceptable to wait for the completion of the // commands as it is unacceptable to wait for the completion of the
// action till user application yields control of the application // action till user application yields control of the application
...@@ -88,6 +98,8 @@ namespace zmq ...@@ -88,6 +98,8 @@ namespace zmq
virtual int xsend (zmq_msg_t *msg_, int options_) = 0; virtual int xsend (zmq_msg_t *msg_, int options_) = 0;
virtual int xflush () = 0; virtual int xflush () = 0;
virtual int xrecv (zmq_msg_t *msg_, int options_) = 0; virtual int xrecv (zmq_msg_t *msg_, int options_) = 0;
virtual bool xhas_in () = 0;
virtual bool xhas_out () = 0;
// Socket options. // Socket options.
options_t options; options_t options;
......
...@@ -197,3 +197,16 @@ int zmq::sub_t::fq (zmq_msg_t *msg_, int flags_) ...@@ -197,3 +197,16 @@ int zmq::sub_t::fq (zmq_msg_t *msg_, int flags_)
errno = EAGAIN; errno = EAGAIN;
return -1; return -1;
} }
bool zmq::sub_t::xhas_in ()
{
// TODO: This is more complex as we have to ignore all the messages that
// don't fit the filter.
zmq_assert (false);
return false;
}
bool zmq::sub_t::xhas_out ()
{
return false;
}
...@@ -48,6 +48,8 @@ namespace zmq ...@@ -48,6 +48,8 @@ namespace zmq
int xsend (zmq_msg_t *msg_, int flags_); int xsend (zmq_msg_t *msg_, int flags_);
int xflush (); int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_); int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
private: private:
......
...@@ -50,8 +50,10 @@ int zmq::tcp_connecter_t::open () ...@@ -50,8 +50,10 @@ int zmq::tcp_connecter_t::open ()
// Create the socket. // Create the socket.
s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
// TODO: Convert error to errno. if (s == INVALID_SOCKET) {
wsa_assert (s != INVALID_SOCKET); wsa_error_to_errno ();
return -1;
}
// Set to non-blocking mode. // Set to non-blocking mode.
unsigned long argp = 1; unsigned long argp = 1;
...@@ -78,9 +80,7 @@ int zmq::tcp_connecter_t::open () ...@@ -78,9 +80,7 @@ int zmq::tcp_connecter_t::open ()
return -1; return -1;
} }
// TODO: Convert error to errno. wsa_error_to_errno ();
wsa_assert (rc == 0);
return -1; return -1;
} }
......
...@@ -48,8 +48,10 @@ int zmq::tcp_listener_t::set_address (const char *addr_) ...@@ -48,8 +48,10 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
// Create a listening socket. // Create a listening socket.
s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
// TODO: Convert error code to errno. if (s == INVALID_SOCKET) {
wsa_assert (s != INVALID_SOCKET); wsa_error_to_errno ();
return -1;
}
// Allow reusing of the address. // Allow reusing of the address.
int flag = 1; int flag = 1;
...@@ -65,12 +67,17 @@ int zmq::tcp_listener_t::set_address (const char *addr_) ...@@ -65,12 +67,17 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
// Bind the socket to the network interface and port. // Bind the socket to the network interface and port.
rc = bind (s, (struct sockaddr*) &addr, sizeof (addr)); rc = bind (s, (struct sockaddr*) &addr, sizeof (addr));
// TODO: Convert error code to errno. // TODO: Convert error code to errno.
wsa_assert (rc != SOCKET_ERROR); if (rc == SOCKET_ERROR) {
wsa_error_to_errno ();
return -1;
}
// Listen for incomming connections. // Listen for incomming connections.
rc = listen (s, 1); rc = listen (s, 1);
// TODO: Convert error code to errno. if (rc == SOCKET_ERROR) {
wsa_assert (rc != SOCKET_ERROR); wsa_error_to_errno ();
return -1;
}
return 0; return 0;
} }
......
...@@ -106,16 +106,12 @@ namespace zmq ...@@ -106,16 +106,12 @@ namespace zmq
return true; return true;
} }
// Reads an item from the pipe. Returns false if there is no value. // Check whether item is available for reading.
// available. inline bool check_read ()
inline bool read (T *value_)
{ {
// Was the value prefetched already? If so, return it. // Was the value prefetched already? If so, return.
if (&queue.front () != r) { if (&queue.front () != r)
*value_ = queue.front ();
queue.pop ();
return true; return true;
}
// There's no prefetched value, so let us prefetch more values. // There's no prefetched value, so let us prefetch more values.
// (Note that D is a template parameter. Becaue of that one of // (Note that D is a template parameter. Becaue of that one of
...@@ -165,6 +161,18 @@ namespace zmq ...@@ -165,6 +161,18 @@ namespace zmq
return false; return false;
} }
// There was at least one value prefetched.
return true;
}
// Reads an item from the pipe. Returns false if there is no value.
// available.
inline bool read (T *value_)
{
// Try to prefetch a value.
if (!check_read ())
return false;
// There was at least one value prefetched. // There was at least one value prefetched.
// Return it to the caller. // Return it to the caller.
*value_ = queue.front (); *value_ = queue.front ();
......
...@@ -58,3 +58,8 @@ uint64_t zmq::ypollset_t::check () ...@@ -58,3 +58,8 @@ uint64_t zmq::ypollset_t::check ()
{ {
return (uint64_t) bits.xchg (0); return (uint64_t) bits.xchg (0);
} }
zmq::fd_t zmq::ypollset_t::get_fd ()
{
return retired_fd;
}
...@@ -42,6 +42,7 @@ namespace zmq ...@@ -42,6 +42,7 @@ namespace zmq
void signal (int signal_); void signal (int signal_);
uint64_t poll (); uint64_t poll ();
uint64_t check (); uint64_t check ();
fd_t get_fd ();
private: private:
......
...@@ -25,11 +25,16 @@ ...@@ -25,11 +25,16 @@
#include <new> #include <new>
#include "socket_base.hpp" #include "socket_base.hpp"
#include "err.hpp" #include "app_thread.hpp"
#include "dispatcher.hpp" #include "dispatcher.hpp"
#include "msg_content.hpp" #include "msg_content.hpp"
#include "platform.hpp" #include "platform.hpp"
#include "stdint.hpp" #include "stdint.hpp"
#include "err.hpp"
#if defined ZMQ_HAVE_LINUX
#include <poll.h>
#endif
#if !defined ZMQ_HAVE_WINDOWS #if !defined ZMQ_HAVE_WINDOWS
#include <unistd.h> #include <unistd.h>
...@@ -44,6 +49,14 @@ const char *zmq_strerror (int errnum_) ...@@ -44,6 +49,14 @@ const char *zmq_strerror (int errnum_)
return "Not supported"; return "Not supported";
case EPROTONOSUPPORT: case EPROTONOSUPPORT:
return "Protocol not supported"; return "Protocol not supported";
case ENOBUFS:
return "No buffer space available";
case ENETDOWN:
return "Network is down";
case EADDRINUSE:
return "Address in use";
case EADDRNOTAVAIL:
return "Address not available";
#endif #endif
case EMTHREAD: case EMTHREAD:
return "Number of preallocated application threads exceeded"; return "Number of preallocated application threads exceeded";
...@@ -246,6 +259,116 @@ int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_) ...@@ -246,6 +259,116 @@ int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_)
return (((zmq::socket_base_t*) s_)->recv (msg_, flags_)); return (((zmq::socket_base_t*) s_)->recv (msg_, flags_));
} }
int zmq_poll (zmq_pollitem_t *items_, int nitems_)
{
// TODO: Replace the polling mechanism by the virtualised framework
// used in 0MQ I/O threads. That'll make the thing work on all platforms.
#if !defined ZMQ_HAVE_LINUX
errno = ENOTSUP;
return -1;
#else
pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));
zmq_assert (pollfds);
int npollfds = 0;
int nsockets = 0;
zmq::app_thread_t *app_thread = NULL;
for (int i = 0; i != nitems_; i++) {
// 0MQ sockets.
if (items_ [i].socket) {
// Get the app_thread the socket is living in. If there are two
// sockets in the same pollset with different app threads, fail.
zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket;
if (app_thread) {
if (app_thread != s->get_thread ()) {
free (pollfds);
errno = EFAULT;
return -1;
}
}
else
app_thread = s->get_thread ();
nsockets++;
continue;
}
// Raw file descriptors.
pollfds [npollfds].fd = items_ [i].fd;
pollfds [npollfds].events =
(items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) |
(items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0);
npollfds++;
}
// If there's at least one 0MQ socket in the pollset we have to poll
// for 0MQ commands. If ZMQ_POLL was not set, fail.
if (nsockets) {
pollfds [npollfds].fd = app_thread->get_signaler ()->get_fd ();
if (pollfds [npollfds].fd == zmq::retired_fd) {
free (pollfds);
errno = ENOTSUP;
return -1;
}
pollfds [npollfds].events = POLLIN;
npollfds++;
}
int nevents = 0;
bool initial = true;
while (!nevents) {
// Wait for activity. In the first iteration just check for events,
// don't wait. Waiting would prevent exiting on any events that may
// already be signaled on 0MQ sockets.
int rc = poll (pollfds, npollfds, initial ? 0 : -1);
if (rc == -1 && errno == EINTR)
continue;
errno_assert (rc >= 0);
initial = false;
// Process 0MQ commands if needed.
if (nsockets && pollfds [npollfds -1].revents & POLLIN)
app_thread->process_commands (false, false);
// Check for the events.
int pollfd_pos = 0;
for (int i = 0; i != nitems_; i++) {
// If the poll item is a raw file descriptor, simply convert
// the events to zmq_pollitem_t-style format.
if (!items_ [i].socket) {
items_ [i].revents =
(pollfds [pollfd_pos].revents & POLLIN ? ZMQ_POLLIN : 0) |
(pollfds [pollfd_pos].revents & POLLOUT ? ZMQ_POLLOUT : 0);
if (items_ [i].revents)
nevents++;
pollfd_pos++;
continue;
}
// The poll item is a 0MQ socket.
zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket;
items_ [i].revents = 0;
if ((items_ [i].events & ZMQ_POLLOUT) && s->has_out ())
items_ [i].revents |= ZMQ_POLLOUT;
if ((items_ [i].events & ZMQ_POLLIN) && s->has_in ())
items_ [i].revents |= ZMQ_POLLIN;
if (items_ [i].revents)
nevents++;
}
}
free (pollfds);
return nevents;
#endif
}
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
static uint64_t now () static uint64_t now ()
......
...@@ -137,6 +137,12 @@ void zmq::zmq_engine_t::out_event () ...@@ -137,6 +137,12 @@ void zmq::zmq_engine_t::out_event ()
void zmq::zmq_engine_t::revive () void zmq::zmq_engine_t::revive ()
{ {
set_pollout (handle); set_pollout (handle);
// Speculative write: The assumption is that at the moment new message
// was sent by the user the socket is probably available for writing.
// Thus we try to write the data to socket avoiding polling for POLLOUT.
// Consequently, the latency should be better in request/reply scenarios.
out_event ();
} }
void zmq::zmq_engine_t::error () void zmq::zmq_engine_t::error ()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment