Commit 5db28752 authored by Pieter Hintjens's avatar Pieter Hintjens

Removed 'device' concept and introduced proxies

* zmq_device is now a wrapper that calls zmq_proxy
* zmq_proxy adds capture socket
parent aaac4b84
MAN3 = zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_device.3 \
MAN3 = zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_proxy.3 \
zmq_ctx_new.3 zmq_ctx_destroy.3 zmq_ctx_get.3 zmq_ctx_set.3 \
zmq_init.3 zmq_term.3 zmq_ctx_set_monitor.3\
zmq_msg_close.3 zmq_msg_copy.3 zmq_msg_data.3 zmq_msg_init.3 \
......
......@@ -169,14 +169,12 @@ Local in-process (inter-thread) communication transport::
linkzmq:zmq_inproc[7]
Devices
Proxies
~~~~~~~
0MQ provides 'devices', which are building blocks that act as intermediate
nodes in complex messaging topologies. Devices can act as brokers that other
nodes connect to, proxies that connect through to other nodes, or any mix of
these two models.
You can start a device in an application thread, see linkzmq:zmq_device[3].
0MQ provides 'proxies' to create fanout and fan-in topologies. A proxy connects
a 'frontend' socket to a 'backend' socket and switches all messages between the
two sockets, opaquely. A proxy may optionally capture all traffic to a third
socket. To start a proxy in an application thread, use linkzmq:zmq_proxy[3].
ERROR HANDLING
......
zmq_device(3)
=============
NAME
----
zmq_device - start built-in 0MQ device
SYNOPSIS
--------
*int zmq_device (int 'device', const void '*frontend', const void '*backend');*
DESCRIPTION
-----------
The _zmq_device()_ function starts a built-in 0MQ device. The 'device' argument
is one of:
'ZMQ_QUEUE'::
starts a queue device
'ZMQ_FORWARDER'::
starts a forwarder device
'ZMQ_STREAMER'::
starts a streamer device
The device connects a frontend socket to a backend socket. Conceptually, data
flows from frontend to backend. Depending on the socket types, replies may flow
in the opposite direction.
Before calling _zmq_device()_ you must set any socket options, and connect or
bind both frontend and backend sockets. The two conventional device models are:
*proxy*::
bind frontend socket to an endpoint, and connect backend socket to
downstream components. A proxy device model does not require changes to
the downstream topology but that topology is static (any changes require
reconfiguring the device).
*broker*::
bind frontend socket to one endpoint and bind backend socket to a second
endpoint. Downstream components must now connect into the device. A broker
device model allows a dynamic downstream topology (components can come and
go at any time).
_zmq_device()_ runs in the current thread and returns only if/when the current
context is closed.
QUEUE DEVICE
------------
'ZMQ_QUEUE' creates a shared queue that collects requests from a set of clients,
and distributes these fairly among a set of services. Requests are fair-queued
from frontend connections and load-balanced between backend connections.
Replies automatically return to the client that made the original request.
This device is part of the 'request-reply' pattern. The frontend speaks to
clients and the backend speaks to services. You should use 'ZMQ_QUEUE' with a
'ZMQ_ROUTER' socket for the frontend and a 'ZMQ_DEALER' socket for the backend.
Other combinations are not documented.
Refer to linkzmq:zmq_socket[3] for a description of these socket types.
FORWARDER DEVICE
----------------
'ZMQ_FORWARDER' collects messages from a set of publishers and forwards these to
a set of subscribers. You will generally use this to bridge networks, e.g. read
on TCP unicast and forward on multicast.
This device is part of the 'publish-subscribe' pattern. The frontend speaks to
publishers and the backend speaks to subscribers. You should use
'ZMQ_FORWARDER' with a 'ZMQ_SUB' socket for the frontend and a 'ZMQ_PUB' socket
for the backend. Other combinations are not documented.
Refer to linkzmq:zmq_socket[3] for a description of these socket types.
STREAMER DEVICE
---------------
'ZMQ_STREAMER' collects tasks from a set of pushers and forwards these to a set
of pullers. You will generally use this to bridge networks. Messages are
fair-queued from pushers and load-balanced to pullers.
This device is part of the 'pipeline' pattern. The frontend speaks to pushers
and the backend speaks to pullers. You should use 'ZMQ_STREAMER' with a
'ZMQ_PULL' socket for the frontend and a 'ZMQ_PUSH' socket for the backend.
Other combinations are not documented.
Refer to linkzmq:zmq_socket[3] for a description of these socket types.
RETURN VALUE
------------
The _zmq_device()_ function always returns `-1` and 'errno' set to *ETERM* (the
0MQ 'context' associated with either of the specified sockets was terminated).
EXAMPLE
-------
.Creating a queue broker
----
// Create frontend and backend sockets
void *frontend = zmq_socket (context, ZMQ_ROUTER);
assert (backend);
void *backend = zmq_socket (context, ZMQ_DEALER);
assert (frontend);
// Bind both sockets to TCP ports
assert (zmq_bind (frontend, "tcp://*:5555") == 0);
assert (zmq_bind (backend, "tcp://*:5556") == 0);
// Start a queue device
zmq_device (ZMQ_QUEUE, frontend, backend);
----
SEE ALSO
--------
linkzmq:zmq_bind[3]
linkzmq:zmq_connect[3]
linkzmq:zmq_socket[3]
linkzmq:zmq[7]
AUTHORS
-------
This 0MQ manual page was written by Pieter Hintjens <ph@imatix.com>
zmq_proxy(3)
============
NAME
----
zmq_proxy - start built-in 0MQ proxy
SYNOPSIS
--------
*int zmq_proxy (const void '*frontend', const void '*backend', const void '*capture');*
DESCRIPTION
-----------
The _zmq_proxy()_ function starts the built-in 0MQ proxy in the current
application thread.
The proxy connects a frontend socket to a backend socket. Conceptually, data
flows from frontend to backend. Depending on the socket types, replies may flow
in the opposite direction. The direction is conceptual only; the proxy is fully
symmetric and there is no technical difference between frontend and backend.
Before calling _zmq_proxy()_ you must set any socket options, and connect or
bind both frontend and backend sockets. The two conventional proxy models are:
_zmq_proxy()_ runs in the current thread and returns only if/when the current
context is closed.
If the capture socket is not NULL, the proxy shall send all messages, received
on both frontend and backend, to the capture socket. The capture socket should
be a 'ZMQ_PUB', 'ZMQ_DEALER', 'ZMQ_PUSH', or 'ZMQ_PAIR' socket.
Refer to linkzmq:zmq_socket[3] for a description of the available socket types.
EXAMPLE USAGE
-------------
Shared Queue
~~~~~~~~~~~~
When the frontend is a ZMQ_ROUTER socket, and the backend is a ZMQ_DEALER
socket, the proxy shall act as a shared queue that collects requests from a
set of clients, and distributes these fairly among a set of services.
Requests shall be fair-queued from frontend connections and distributed evenly
across backend connections. Replies shall automatically return to the client
that made the original request.
Forwarder
~~~~~~~~~
When the frontend is a ZMQ_XSUB socket, and the backend is a ZMQ_XPUB socket,
the proxy shall act as a message forwarder that collects messages from a set
of publishers and forwards these to a set of subscribers. This may be used to
bridge networks transports, e.g. read on tcp:// and forward on pgm://.
Streamer
~~~~~~~~
When the frontend is a ZMQ_PULL socket, and the backend is a ZMQ_PUSH socket,
the proxy shall collect tasks from a set of clients and forwards these to a set
of workers using the pipeline pattern.
RETURN VALUE
------------
The _zmq_proxy()_ function always returns `-1` and 'errno' set to *ETERM* (the
0MQ 'context' associated with either of the specified sockets was terminated).
EXAMPLE
-------
.Creating a shared queue proxy
----
// Create frontend and backend sockets
void *frontend = zmq_socket (context, ZMQ_ROUTER);
assert (backend);
void *backend = zmq_socket (context, ZMQ_DEALER);
assert (frontend);
// Bind both sockets to TCP ports
assert (zmq_bind (frontend, "tcp://*:5555") == 0);
assert (zmq_bind (backend, "tcp://*:5556") == 0);
// Start the queue proxy, which runs until ETERM
zmq_proxy (frontend, backend, NULL);
----
SEE ALSO
--------
linkzmq:zmq_bind[3]
linkzmq:zmq_connect[3]
linkzmq:zmq_socket[3]
linkzmq:zmq[7]
AUTHORS
-------
This 0MQ manual page was written by Pieter Hintjens <ph@imatix.com>
......@@ -371,15 +371,16 @@ typedef struct
ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
/******************************************************************************/
/* Devices - Experimental. */
/******************************************************************************/
// Built-in message proxy (3-way)
ZMQ_EXPORT int zmq_proxy (void *frontend, void *backend, void *capture);
// Deprecated aliases
#define ZMQ_STREAMER 1
#define ZMQ_FORWARDER 2
#define ZMQ_QUEUE 3
ZMQ_EXPORT int zmq_device (int device, void *insocket, void* outsocket);
// Deprecated method
ZMQ_EXPORT int zmq_device (int type, void *frontend, void *backend);
#undef ZMQ_EXPORT
......
......@@ -16,7 +16,6 @@ libzmq_la_SOURCES = \
config.hpp \
ctx.hpp \
decoder.hpp \
device.hpp \
devpoll.hpp \
dist.hpp \
encoder.hpp \
......@@ -51,6 +50,7 @@ libzmq_la_SOURCES = \
poller.hpp \
poller_base.hpp \
pair.hpp \
proxy.hpp \
pub.hpp \
pull.hpp \
push.hpp \
......@@ -83,7 +83,6 @@ libzmq_la_SOURCES = \
clock.cpp \
ctx.cpp \
decoder.cpp \
device.cpp \
devpoll.cpp \
dist.cpp \
encoder.cpp \
......@@ -113,6 +112,7 @@ libzmq_la_SOURCES = \
poller_base.cpp \
pull.cpp \
push.cpp \
proxy.cpp \
reaper.cpp \
pub.cpp \
random.cpp \
......
......@@ -21,13 +21,15 @@
#include <stddef.h>
#include "../include/zmq.h"
#include "platform.hpp"
#include "device.hpp"
#include "proxy.hpp"
#include "socket_base.hpp"
#include "likely.hpp"
#include "err.hpp"
int zmq::device (class socket_base_t *insocket_,
class socket_base_t *outsocket_)
int zmq::proxy (
class socket_base_t *frontend_,
class socket_base_t *backend_,
class socket_base_t *capture_)
{
msg_t msg;
int rc = msg.init ();
......@@ -37,14 +39,11 @@ int zmq::device (class socket_base_t *insocket_,
// The algorithm below assumes ratio of requests and replies processed
// under full load to be 1:1.
// TODO: The current implementation drops messages when
// any of the pipes becomes full.
int more;
size_t moresz;
zmq_pollitem_t items [] = {
{ insocket_, 0, ZMQ_POLLIN, 0 },
{ outsocket_, 0, ZMQ_POLLIN, 0 }
{ frontend_, 0, ZMQ_POLLIN, 0 },
{ backend_, 0, ZMQ_POLLIN, 0 }
};
while (true) {
// Wait while there are either requests or replies to process.
......@@ -52,38 +51,64 @@ int zmq::device (class socket_base_t *insocket_,
if (unlikely (rc < 0))
return -1;
// Process a request.
// Process a request
if (items [0].revents & ZMQ_POLLIN) {
while (true) {
rc = insocket_->recv (&msg, 0);
rc = frontend_->recv (&msg, 0);
if (unlikely (rc < 0))
return -1;
moresz = sizeof more;
rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
rc = frontend_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
if (unlikely (rc < 0))
return -1;
rc = outsocket_->send (&msg, more? ZMQ_SNDMORE: 0);
// Copy message to capture socket if any
if (capture_) {
msg_t ctrl;
rc = ctrl.init ();
if (unlikely (rc < 0))
return -1;
rc = ctrl.copy (msg);
if (unlikely (rc < 0))
return -1;
rc = capture_->send (&ctrl, more? ZMQ_SNDMORE: 0);
if (unlikely (rc < 0))
return -1;
}
rc = backend_->send (&msg, more? ZMQ_SNDMORE: 0);
if (unlikely (rc < 0))
return -1;
if (more == 0)
break;
}
}
// Process a reply.
// Process a reply
if (items [1].revents & ZMQ_POLLIN) {
while (true) {
rc = outsocket_->recv (&msg, 0);
rc = backend_->recv (&msg, 0);
if (unlikely (rc < 0))
return -1;
moresz = sizeof more;
rc = outsocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
rc = backend_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
if (unlikely (rc < 0))
return -1;
rc = insocket_->send (&msg, more? ZMQ_SNDMORE: 0);
// Copy message to capture socket if any
if (capture_) {
msg_t ctrl;
rc = ctrl.init ();
if (unlikely (rc < 0))
return -1;
rc = ctrl.copy (msg);
if (unlikely (rc < 0))
return -1;
rc = capture_->send (&ctrl, more? ZMQ_SNDMORE: 0);
if (unlikely (rc < 0))
return -1;
}
rc = frontend_->send (&msg, more? ZMQ_SNDMORE: 0);
if (unlikely (rc < 0))
return -1;
if (more == 0)
......
/*
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
Copyright (c) 2007-2012 iMatix Corporation
Copyright (c) 2007-2012 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
......@@ -18,15 +18,15 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_DEVICE_HPP_INCLUDED__
#define __ZMQ_DEVICE_HPP_INCLUDED__
#ifndef __ZMQ_PROXY_HPP_INCLUDED__
#define __ZMQ_PROXY_HPP_INCLUDED__
namespace zmq
{
int device (class socket_base_t *insocket_,
class socket_base_t *outsocket_);
int proxy (
class socket_base_t *frontend_,
class socket_base_t *backend_,
class socket_base_t *control_);
}
#endif
......@@ -70,7 +70,7 @@ struct iovec {
#include <stdlib.h>
#include <new>
#include "device.hpp"
#include "proxy.hpp"
#include "socket_base.hpp"
#include "stdint.hpp"
#include "config.hpp"
......@@ -962,21 +962,27 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
#undef ZMQ_POLL_BASED_ON_POLL
#endif
int zmq_device (int device_, void *insocket_, void *outsocket_)
// The proxy functionality
int zmq_proxy (void *frontend_, void *backend_, void *control_)
{
if (!insocket_ || !outsocket_) {
if (!frontend_ || !backend_) {
errno = EFAULT;
return -1;
}
return zmq::proxy (
(zmq::socket_base_t*) frontend_,
(zmq::socket_base_t*) backend_,
(zmq::socket_base_t*) control_);
}
if (device_ != ZMQ_FORWARDER && device_ != ZMQ_QUEUE &&
device_ != ZMQ_STREAMER) {
errno = EINVAL;
return -1;
}
// The deprecated device functionality
return zmq::device ((zmq::socket_base_t*) insocket_,
(zmq::socket_base_t*) outsocket_);
int zmq_device (int type, void *frontend_, void *backend_)
{
return zmq::proxy (
(zmq::socket_base_t*) frontend_,
(zmq::socket_base_t*) backend_, NULL);
}
////////////////////////////////////////////////////////////////////////////////
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment