Commit cc631c4c authored by Martin Sustrik's avatar Martin Sustrik

ZMQII-18: Implement I/O multiplexing (first approximation)

parent f2ff2c6e
......@@ -353,6 +353,39 @@ ZMQ_EXPORT int zmq_flush (void *s);
// EFSM - function cannot be called at the moment.
ZMQ_EXPORT int zmq_recv (void *s, zmq_msg_t *msg, int flags);
////////////////////////////////////////////////////////////////////////////////
// I/O multiplexing.
////////////////////////////////////////////////////////////////////////////////
#define ZMQ_POLLIN 1
#define ZMQ_POLLOUT 2
// 'socket' is a 0MQ socket we want to poll on. If set to NULL, native file
// descriptor (socket) 'fd' will be used instead. 'events' defines event we
// are going to poll on - combination of ZMQ_POLLIN and ZMQ_POLLOUT. Error
// event does not exist for portability reasons. Errors from native sockets
// are reported as ZMQ_POLLIN. It's client's responsibilty to identify the
// error afterwards. 'revents' field is filled in after function returns. It's
// a combination of ZMQ_POLLIN and/or ZMQ_POLLOUT depending on the state of the
// socket.
typedef struct
{
void *socket;
int fd;
short events;
short revents;
} zmq_pollitem_t;
// Polls for the items specified by 'items'. Number of items in the array is
// determined by 'nitems' argument. Returns number of items signaled, -1
// in the case of error.
//
// Errors: EFAULT - there's a 0MQ socket in the pollset belonging to
// a different thread.
// ENOTSUP - 0MQ context was initialised without ZMQ_POLL flag.
// I/O multiplexing is disabled.
ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems);
////////////////////////////////////////////////////////////////////////////////
// Helper functions.
////////////////////////////////////////////////////////////////////////////////
......
......@@ -200,6 +200,11 @@ namespace zmq
throw error_t ();
}
inline operator void* ()
{
return ptr;
}
inline void setsockopt (int option_, const void *optval_,
size_t optvallen_)
{
......
......@@ -44,8 +44,6 @@ namespace zmq
void signal (int signal_);
uint64_t poll ();
uint64_t check ();
// Get the file descriptor associated with the object.
fd_t get_fd ();
private:
......
......@@ -21,6 +21,7 @@
#define __ZMQ_I_SIGNALER_HPP_INCLUDED__
#include "stdint.hpp"
#include "fd.hpp"
namespace zmq
{
......@@ -42,6 +43,11 @@ namespace zmq
// Same as poll, however, if there is no signal available,
// function returns zero immediately instead of waiting for a signal.
virtual uint64_t check () = 0;
// Returns file descriptor that allows waiting for signals. Specific
// signalers may not support this functionality. If so, the function
// returns retired_fd.
virtual fd_t get_fd () = 0;
};
}
......
......@@ -84,4 +84,15 @@ int zmq::p2p_t::xrecv (zmq_msg_t *msg_, int flags_)
return 0;
}
bool zmq::p2p_t::xhas_in ()
{
zmq_assert (false);
return false;
}
bool zmq::p2p_t::xhas_out ()
{
zmq_assert (false);
return false;
}
......@@ -42,6 +42,8 @@ namespace zmq
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
private:
......
......@@ -156,3 +156,14 @@ int zmq::pub_t::xrecv (zmq_msg_t *msg_, int flags_)
return -1;
}
bool zmq::pub_t::xhas_in ()
{
return false;
}
bool zmq::pub_t::xhas_out ()
{
// TODO: Reimplement when queue limits are added.
return true;
}
......@@ -43,6 +43,8 @@ namespace zmq
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
private:
......
......@@ -195,4 +195,21 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)
return -1;
}
bool zmq::rep_t::xhas_in ()
{
for (int count = active; count != 0; count--) {
if (in_pipes [current]->check_read ())
return !waiting_for_reply;
current++;
if (current >= active)
current = 0;
}
return false;
}
bool zmq::rep_t::xhas_out ()
{
return waiting_for_reply;
}
......@@ -43,6 +43,8 @@ namespace zmq
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
private:
......
......@@ -195,4 +195,17 @@ int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_)
return 0;
}
bool zmq::req_t::xhas_in ()
{
if (reply_pipe->check_read ())
return waiting_for_reply;
return false;
}
bool zmq::req_t::xhas_out ()
{
return !waiting_for_reply;
}
......@@ -43,6 +43,8 @@ namespace zmq
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
private:
......
......@@ -364,6 +364,21 @@ int zmq::socket_base_t::close ()
return 0;
}
zmq::app_thread_t *zmq::socket_base_t::get_thread ()
{
return app_thread;
}
bool zmq::socket_base_t::has_in ()
{
return xhas_in ();
}
bool zmq::socket_base_t::has_out ()
{
return xhas_out ();
}
bool zmq::socket_base_t::register_session (const char *name_,
session_t *session_)
{
......
......@@ -54,6 +54,16 @@ namespace zmq
int recv (zmq_msg_t *msg_, int flags_);
int close ();
// This function is used by the polling mechanism to determine
// whether the socket belongs to the application thread the poll
// is called from.
class app_thread_t *get_thread ();
// These functions are used by the polling mechanism to determine
// which events are to be reported from this socket.
bool has_in ();
bool has_out ();
// The list of sessions cannot be accessed via inter-thread
// commands as it is unacceptable to wait for the completion of the
// action till user application yields control of the application
......@@ -88,6 +98,8 @@ namespace zmq
virtual int xsend (zmq_msg_t *msg_, int options_) = 0;
virtual int xflush () = 0;
virtual int xrecv (zmq_msg_t *msg_, int options_) = 0;
virtual bool xhas_in () = 0;
virtual bool xhas_out () = 0;
// Socket options.
options_t options;
......
......@@ -197,3 +197,16 @@ int zmq::sub_t::fq (zmq_msg_t *msg_, int flags_)
errno = EAGAIN;
return -1;
}
bool zmq::sub_t::xhas_in ()
{
// TODO: This is more complex as we have to ignore all the messages that
// don't fit the filter.
zmq_assert (false);
return false;
}
bool zmq::sub_t::xhas_out ()
{
return false;
}
......@@ -48,6 +48,8 @@ namespace zmq
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
private:
......
......@@ -29,6 +29,7 @@ zmq::ypollset_t::~ypollset_t ()
void zmq::ypollset_t::signal (int signal_)
{
printf ("++signal\n");
zmq_assert (signal_ >= 0 && signal_ < wait_signal);
if (bits.btsr (signal_, wait_signal))
sem.post ();
......@@ -58,3 +59,8 @@ uint64_t zmq::ypollset_t::check ()
{
return (uint64_t) bits.xchg (0);
}
zmq::fd_t zmq::ypollset_t::get_fd ()
{
return retired_fd;
}
......@@ -42,6 +42,7 @@ namespace zmq
void signal (int signal_);
uint64_t poll ();
uint64_t check ();
fd_t get_fd ();
private:
......
......@@ -25,11 +25,16 @@
#include <new>
#include "socket_base.hpp"
#include "err.hpp"
#include "app_thread.hpp"
#include "dispatcher.hpp"
#include "msg_content.hpp"
#include "platform.hpp"
#include "stdint.hpp"
#include "err.hpp"
#if defined ZMQ_HAVE_LINUX
#include <poll.h>
#endif
#if !defined ZMQ_HAVE_WINDOWS
#include <unistd.h>
......@@ -246,6 +251,116 @@ int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_)
return (((zmq::socket_base_t*) s_)->recv (msg_, flags_));
}
int zmq_poll (zmq_pollitem_t *items_, int nitems_)
{
// TODO: Replace the polling mechanism by the virtualised framework
// used in 0MQ I/O threads. That'll make the thing work on all platforms.
#if !defined ZMQ_HAVE_LINUX
errno = ENOTSUP;
return -1;
#else
pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));
zmq_assert (pollfds);
int npollfds = 0;
int nsockets = 0;
zmq::app_thread_t *app_thread = NULL;
for (int i = 0; i != nitems_; i++) {
// 0MQ sockets.
if (items_ [i].socket) {
// Get the app_thread the socket is living in. If there are two
// sockets in the same pollset with different app threads, fail.
zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket;
if (app_thread) {
if (app_thread != s->get_thread ()) {
free (pollfds);
errno = EFAULT;
return -1;
}
}
else
app_thread = s->get_thread ();
nsockets++;
continue;
}
// Raw file descriptors.
pollfds [npollfds].fd = items_ [i].fd;
pollfds [npollfds].events =
(items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) |
(items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0);
npollfds++;
}
// If there's at least one 0MQ socket in the pollset we have to poll
// for 0MQ commands. If ZMQ_POLL was not set, fail.
if (nsockets) {
pollfds [npollfds].fd = app_thread->get_signaler ()->get_fd ();
if (pollfds [npollfds].fd == zmq::retired_fd) {
free (pollfds);
errno = ENOTSUP;
return -1;
}
pollfds [npollfds].events = POLLIN;
npollfds++;
}
int nevents = 0;
bool initial = true;
while (!nevents) {
// Wait for activity. In the first iteration just check for events,
// don't wait. Waiting would prevent exiting on any events that may
// already be signaled on 0MQ sockets.
int rc = poll (pollfds, npollfds, initial ? 0 : -1);
if (rc == -1 && errno == EINTR)
continue;
errno_assert (rc >= 0);
initial = false;
// Process 0MQ commands if needed.
if (nsockets && pollfds [npollfds -1].revents & POLLIN)
app_thread->process_commands (false, false);
// Check for the events.
int pollfd_pos = 0;
for (int i = 0; i != nitems_; i++) {
// If the poll item is a raw file descriptor, simply convert
// the events to zmq_pollitem_t-style format.
if (!items_ [i].socket) {
items_ [i].revents =
(pollfds [pollfd_pos].revents & POLLIN ? ZMQ_POLLIN : 0) |
(pollfds [pollfd_pos].revents & POLLOUT ? ZMQ_POLLOUT : 0);
if (items_ [i].revents)
nevents++;
pollfd_pos++;
continue;
}
// The poll item is a 0MQ socket.
zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket;
items_ [i].revents = 0;
if ((items_ [i].events & ZMQ_POLLOUT) && s->has_out ())
items_ [i].revents |= ZMQ_POLLOUT;
if ((items_ [i].events & ZMQ_POLLIN) && s->has_in ())
items_ [i].revents |= ZMQ_POLLIN;
if (items_ [i].revents)
nevents++;
}
}
free (pollfds);
return nevents;
#endif
}
#if defined ZMQ_HAVE_WINDOWS
static uint64_t now ()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment