Commit 8665f9a0 authored by Mikko Koppanen's avatar Mikko Koppanen

Merge pull request #275 from pieterh/master

Fixed issue LIBZMQ-333 (zmq_devices is missing)
parents 32c85e0e 9ccfbf8d
...@@ -90,7 +90,14 @@ This package contains ZeroMQ related development libraries and header files. ...@@ -90,7 +90,14 @@ This package contains ZeroMQ related development libraries and header files.
%{_libdir}/libzmq.so.1 %{_libdir}/libzmq.so.1
%{_libdir}/libzmq.so.1.0.0 %{_libdir}/libzmq.so.1.0.0
%attr(0755,root,root) %{_bindir}/zmq_forwarder
%attr(0755,root,root) %{_bindir}/zmq_queue
%attr(0755,root,root) %{_bindir}/zmq_streamer
%{_mandir}/man7/zmq.7.gz %{_mandir}/man7/zmq.7.gz
%{_mandir}/man1/zmq_forwarder.1.gz
%{_mandir}/man1/zmq_queue.1.gz
%{_mandir}/man1/zmq_streamer.1.gz
%files devel %files devel
%defattr(-,root,root,-) %defattr(-,root,root,-)
......
MAN3 = zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_init.3 \ MAN3 = zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_device.3 zmq_init.3 \
zmq_msg_close.3 zmq_msg_copy.3 zmq_msg_data.3 zmq_msg_init.3 \ zmq_msg_close.3 zmq_msg_copy.3 zmq_msg_data.3 zmq_msg_init.3 \
zmq_msg_init_data.3 zmq_msg_init_size.3 zmq_msg_move.3 zmq_msg_size.3 \ zmq_msg_init_data.3 zmq_msg_init_size.3 zmq_msg_move.3 zmq_msg_size.3 \
zmq_msg_send.3 zmq_msg_recv.3 \ zmq_msg_send.3 zmq_msg_recv.3 \
......
...@@ -154,6 +154,16 @@ Local in-process (inter-thread) communication transport:: ...@@ -154,6 +154,16 @@ Local in-process (inter-thread) communication transport::
linkzmq:zmq_inproc[7] linkzmq:zmq_inproc[7]
Devices
~~~~~~~
0MQ provides 'devices', which are building blocks that act as intermediate
nodes in complex messaging topologies. Devices can act as brokers that other
nodes connect to, proxies that connect through to other nodes, or any mix of
these two models.
You can start a device in an application thread, see linkzmq:zmq_device[3].
ERROR HANDLING ERROR HANDLING
-------------- --------------
The 0MQ library functions handle errors using the standard conventions found on The 0MQ library functions handle errors using the standard conventions found on
......
zmq_device(3)
=============
NAME
----
zmq_device - start built-in 0MQ device
SYNOPSIS
--------
*int zmq_device (int 'device', const void '*frontend', const void '*backend');*
DESCRIPTION
-----------
The _zmq_device()_ function starts a built-in 0MQ device. The 'device' argument
is one of:
'ZMQ_QUEUE'::
starts a queue device
'ZMQ_FORWARDER'::
starts a forwarder device
'ZMQ_STREAMER'::
starts a streamer device
The device connects a frontend socket to a backend socket. Conceptually, data
flows from frontend to backend. Depending on the socket types, replies may flow
in the opposite direction.
Before calling _zmq_device()_ you must set any socket options, and connect or
bind both frontend and backend sockets. The two conventional device models are:
*proxy*::
bind frontend socket to an endpoint, and connect backend socket to
downstream components. A proxy device model does not require changes to
the downstream topology but that topology is static (any changes require
reconfiguring the device).
*broker*::
bind frontend socket to one endpoint and bind backend socket to a second
endpoint. Downstream components must now connect into the device. A broker
device model allows a dynamic downstream topology (components can come and
go at any time).
_zmq_device()_ runs in the current thread and returns only if/when the current
context is closed.
QUEUE DEVICE
------------
'ZMQ_QUEUE' creates a shared queue that collects requests from a set of clients,
and distributes these fairly among a set of services. Requests are fair-queued
from frontend connections and load-balanced between backend connections.
Replies automatically return to the client that made the original request.
This device is part of the 'request-reply' pattern. The frontend speaks to
clients and the backend speaks to services. You should use 'ZMQ_QUEUE' with a
'ZMQ_ROUTER' socket for the frontend and a 'ZMQ_DEALER' socket for the backend.
Other combinations are not documented.
Refer to linkzmq:zmq_socket[3] for a description of these socket types.
FORWARDER DEVICE
----------------
'ZMQ_FORWARDER' collects messages from a set of publishers and forwards these to
a set of subscribers. You will generally use this to bridge networks, e.g. read
on TCP unicast and forward on multicast.
This device is part of the 'publish-subscribe' pattern. The frontend speaks to
publishers and the backend speaks to subscribers. You should use
'ZMQ_FORWARDER' with a 'ZMQ_SUB' socket for the frontend and a 'ZMQ_PUB' socket
for the backend. Other combinations are not documented.
Refer to linkzmq:zmq_socket[3] for a description of these socket types.
STREAMER DEVICE
---------------
'ZMQ_STREAMER' collects tasks from a set of pushers and forwards these to a set
of pullers. You will generally use this to bridge networks. Messages are
fair-queued from pushers and load-balanced to pullers.
This device is part of the 'pipeline' pattern. The frontend speaks to pushers
and the backend speaks to pullers. You should use 'ZMQ_STREAMER' with a
'ZMQ_PULL' socket for the frontend and a 'ZMQ_PUSH' socket for the backend.
Other combinations are not documented.
Refer to linkzmq:zmq_socket[3] for a description of these socket types.
RETURN VALUE
------------
The _zmq_device()_ function always returns `-1` and 'errno' set to *ETERM* (the
0MQ 'context' associated with either of the specified sockets was terminated).
EXAMPLE
-------
.Creating a queue broker
----
// Create frontend and backend sockets
void *frontend = zmq_socket (context, ZMQ_ROUTER);
assert (backend);
void *backend = zmq_socket (context, ZMQ_DEALER);
assert (frontend);
// Bind both sockets to TCP ports
assert (zmq_bind (frontend, "tcp://*:5555") == 0);
assert (zmq_bind (backend, "tcp://*:5556") == 0);
// Start a queue device
zmq_device (ZMQ_QUEUE, frontend, backend);
----
SEE ALSO
--------
linkzmq:zmq_bind[3]
linkzmq:zmq_connect[3]
linkzmq:zmq_socket[3]
linkzmq:zmq[7]
AUTHORS
-------
This 0MQ manual page was written by Pieter Hintjens <ph@imatix.com>
...@@ -149,9 +149,6 @@ remove the first part of the message and use it to determine the _identity_ of ...@@ -149,9 +149,6 @@ remove the first part of the message and use it to determine the _identity_ of
the peer the message shall be routed to. If the peer does not exist anymore the peer the message shall be routed to. If the peer does not exist anymore
the message shall be silently discarded. the message shall be silently discarded.
Previously this socket was called 'ZMQ_XREP' and that name remains available
for backwards compatibility.
When a 'ZMQ_ROUTER' socket enters an exceptional state due to having reached the When a 'ZMQ_ROUTER' socket enters an exceptional state due to having reached the
high water mark for all peers, or if there are no peers at all, then any high water mark for all peers, or if there are no peers at all, then any
messages sent to the socket shall be dropped until the exceptional state ends. messages sent to the socket shall be dropped until the exceptional state ends.
......
...@@ -279,6 +279,16 @@ typedef struct ...@@ -279,6 +279,16 @@ typedef struct
ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout); ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
/******************************************************************************/
/* Devices - Experimental. */
/******************************************************************************/
#define ZMQ_STREAMER 1
#define ZMQ_FORWARDER 2
#define ZMQ_QUEUE 3
ZMQ_EXPORT int zmq_device (int device, void * insocket, void* outsocket);
#undef ZMQ_EXPORT #undef ZMQ_EXPORT
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -16,6 +16,7 @@ libzmq_la_SOURCES = \ ...@@ -16,6 +16,7 @@ libzmq_la_SOURCES = \
config.hpp \ config.hpp \
ctx.hpp \ ctx.hpp \
decoder.hpp \ decoder.hpp \
device.hpp \
devpoll.hpp \ devpoll.hpp \
dist.hpp \ dist.hpp \
encoder.hpp \ encoder.hpp \
...@@ -81,6 +82,7 @@ libzmq_la_SOURCES = \ ...@@ -81,6 +82,7 @@ libzmq_la_SOURCES = \
clock.cpp \ clock.cpp \
ctx.cpp \ ctx.cpp \
decoder.cpp \ decoder.cpp \
device.cpp \
devpoll.cpp \ devpoll.cpp \
dist.cpp \ dist.cpp \
encoder.cpp \ encoder.cpp \
......
/*
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stddef.h>
#include "../include/zmq.h"
#include "device.hpp"
#include "socket_base.hpp"
#include "likely.hpp"
#include "err.hpp"
int zmq::device (class socket_base_t *insocket_,
class socket_base_t *outsocket_)
{
msg_t msg;
int rc = msg.init ();
if (rc != 0) {
return -1;
}
int64_t more;
size_t moresz;
zmq_pollitem_t items [2];
items [0].socket = insocket_;
items [0].fd = 0;
items [0].events = ZMQ_POLLIN;
items [0].revents = 0;
items [1].socket = outsocket_;
items [1].fd = 0;
items [1].events = ZMQ_POLLIN;
items [1].revents = 0;
while (true) {
// Wait while there are either requests or replies to process.
rc = zmq_poll (&items [0], 2, -1);
if (unlikely (rc < 0)) {
return -1;
}
// The algorithm below asumes ratio of request and replies processed
// under full load to be 1:1. Although processing requests replies
// first is tempting it is suspectible to DoS attacks (overloading
// the system with unsolicited replies).
// Process a request.
if (items [0].revents & ZMQ_POLLIN) {
while (true) {
rc = insocket_->recv (&msg, 0);
if (unlikely (rc < 0)) {
return -1;
}
moresz = sizeof (more);
rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
if (unlikely (rc < 0)) {
return -1;
}
rc = outsocket_->send (&msg, more ? ZMQ_SNDMORE : 0);
if (unlikely (rc < 0)) {
return -1;
}
if (!more)
break;
}
}
// Process a reply.
if (items [1].revents & ZMQ_POLLIN) {
while (true) {
rc = outsocket_->recv (&msg, 0);
if (unlikely (rc < 0)) {
return -1;
}
moresz = sizeof (more);
rc = outsocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
if (unlikely (rc < 0)) {
return -1;
}
rc = insocket_->send (&msg, more ? ZMQ_SNDMORE : 0);
if (unlikely (rc < 0)) {
return -1;
}
if (!more)
break;
}
}
}
return 0;
}
/*
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_DEVICE_HPP_INCLUDED__
#define __ZMQ_DEVICE_HPP_INCLUDED__
namespace zmq
{
int device (class socket_base_t *insocket_,
class socket_base_t *outsocket_);
}
#endif
...@@ -68,6 +68,7 @@ struct iovec { ...@@ -68,6 +68,7 @@ struct iovec {
#include <stdlib.h> #include <stdlib.h>
#include <new> #include <new>
#include "device.hpp"
#include "socket_base.hpp" #include "socket_base.hpp"
#include "stdint.hpp" #include "stdint.hpp"
#include "config.hpp" #include "config.hpp"
...@@ -910,3 +911,49 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) ...@@ -910,3 +911,49 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
#if defined ZMQ_POLL_BASED_ON_POLL #if defined ZMQ_POLL_BASED_ON_POLL
#undef ZMQ_POLL_BASED_ON_POLL #undef ZMQ_POLL_BASED_ON_POLL
#endif #endif
int zmq_device (int device_, void *insocket_, void *outsocket_)
{
if (!insocket_ || !outsocket_) {
errno = EFAULT;
return -1;
}
if (device_ != ZMQ_FORWARDER && device_ != ZMQ_QUEUE &&
device_ != ZMQ_STREAMER) {
errno = EINVAL;
return -1;
}
return zmq::device ((zmq::socket_base_t*) insocket_,
(zmq::socket_base_t*) outsocket_);
}
////////////////////////////////////////////////////////////////////////////////
// 0MQ utils - to be used by perf tests
////////////////////////////////////////////////////////////////////////////////
void zmq_sleep (int seconds_)
{
#if defined ZMQ_HAVE_WINDOWS
Sleep (seconds_ * 1000);
#else
sleep (seconds_);
#endif
}
void *zmq_stopwatch_start ()
{
uint64_t *watch = (uint64_t*) malloc (sizeof (uint64_t));
alloc_assert (watch);
*watch = zmq::clock_t::now_us ();
return (void*) watch;
}
unsigned long zmq_stopwatch_stop (void *watch_)
{
uint64_t end = zmq::clock_t::now_us ();
uint64_t start = *(uint64_t*) watch_;
free (watch_);
return (unsigned long) (end - start);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment