Commit a6e05ad5 authored by Pieter Hintjens's avatar Pieter Hintjens

Merge pull request #891 from lalebarde/master

Reverse zmq_proxy_chain and zmq_proxy_hook
parents b54a168d 3fb800c1
......@@ -9,7 +9,7 @@ MAN3 = zmq_bind.3 zmq_unbind.3 zmq_connect.3 zmq_disconnect.3 zmq_close.3 \
zmq_socket.3 zmq_socket_monitor.3 zmq_poll.3 \
zmq_errno.3 zmq_strerror.3 zmq_version.3 \
zmq_sendmsg.3 zmq_recvmsg.3 zmq_init.3 zmq_term.3 \
zmq_proxy.3 zmq_proxy_steerable.3 zmq_proxy_chain.3 zmq_proxy_hook.3 \
zmq_proxy.3 zmq_proxy_steerable.3 \
zmq_z85_encode.3 zmq_z85_decode.3 zmq_curve_keypair.3
MAN7 = zmq.7 zmq_tcp.7 zmq_pgm.7 zmq_epgm.7 zmq_inproc.7 zmq_ipc.7 \
......
zmq_proxy_chain(3)
==================
NAME
----
zmq_proxy_chain - start built-in 0MQ a proxy chain in the same thread
control flow
SYNOPSIS
--------
*int zmq_proxy_chain (const void '**frontends', const void '**backends',
const void '*capture', const void **hooks_, const void '*control');*
DESCRIPTION
-----------
The _zmq_proxy_chain()_ function starts the built-in 0MQ proxy in the
current application thread, as _zmq_proxy()_, _zmq_proxy_steerable()_, or
_zmq_proxy_hook()_ do. Please, refer to these functions for their general
description and usage. We describe here only the additional proxy chaining
capability.
Note that compared to the other proxy functions, the arguments _frontends_,
_backends_ and _hooks_ receive arrays instead of single values. Say one need
to implement the following architecture:
*Process client proxy1 proxy2 worker*
| |-----------| |----------| |
*socket* cl f1 b1 f2 b2 wk
*endpoint* |c----e1-----b| |c----e2-----b| |c----e3----b|
Note: "c" is for connect, "b" for bind.
With the other proxy functions, one needs typically one thread for each proxy:
----
thread 1: zmq_proxy(f1, b1);
thread 2: zmq_proxy(f2, b2);
----
With _zmq_proxy_chain_, it can be performed with only one thread:
----
void** f = {f1, f2, NULL);
void** b = {b1, b2, NULL);
single thread: zmq_proxy_chain(f, b, NULL, NULL, NULL);
----
Note: the three NULL arguments are for capture, hooks, and control, since
_zmq_proxy_chain_ is built on top of _zmq_proxy_hook_, itself built on top
of _zmq_proxy_steerable_, itself built on top of _zmq_proxy_. Of course, hook and
steering features can be used along with chaining.
We have limited the number of sockets that can be chained in a single command to 10, what
should be largely sufficient. The reason is to avoid dynamic memory allocation.
Arguments frontends and backends shall be arrays of sockets of type void*, terminated
by NULL. Both arrays shall terminate by NULL at the same indice, otherwise, an error is
returned. Argument hooks shall be NULL or of the same length than the socket arrays. No
NULL is required at the end of hooks. Any number of elements may be NULL where no hook
is implemented in some proxies.
Refer to linkzmq:zmq_socket[3] for a description of the available socket types.
Refer to linkzmq:zmq_proxy[3] for a description of the zmq_proxy.
Refer to linkzmq:zmq_proxy_steerable[3] for a description of the zmq_steerable.
Refer to linkzmq:zmq_proxy_hook[3] for a description of the zmq_hook.
EXAMPLE USAGE
-------------
_zmq_proxy_chain_ aims at building protocol layers by easing the chaining of some
proxies typically by chaining:
DEALER | ROUTER <---> STREAM <---> DEALER
in the same thread. Any kind of protocol feature can be added via hooks.
cf also zmq_proxy, zmq_proxy_steerable, zmq_proxy_hook.
RETURN VALUE
------------
The _zmq_proxy_chain()_ function returns 0 if TERMINATE is sent to its
control socket. Otherwise, it returns `-1` and 'errno' set to *ETERM* (the
0MQ 'context' associated with either of the specified sockets was terminated).
EXAMPLE
-------
cf test_proxy_chain.cpp
An example capable of proxying CURVE will be added soon.
SEE ALSO
--------
linkzmq:zmq_proxy[3]
linkzmq:zmq_proxy_steerable[3]
linkzmq:zmq_proxy_hook[3]
linkzmq:zmq_bind[3]
linkzmq:zmq_connect[3]
linkzmq:zmq_socket[3]
linkzmq:zmq[7]
AUTHORS
-------
This page was written by the 0MQ community. To make a change please
read the 0MQ Contribution Policy at <http://www.zeromq.org/docs:contributing>.
zmq_proxy_hook(3)
=================
NAME
----
zmq_proxy_hook - start built-in 0MQ proxy with an hook to modify the messages
between the frontend and the backend
SYNOPSIS
--------
*int zmq_proxy_hook (const void '*frontend', const void '*backend',
const void '*capture', const void '*hook', const void '*control');*
DESCRIPTION
-----------
The _zmq_proxy_hook()_ function starts the built-in 0MQ proxy in the
current application thread, as _zmq_proxy()_ or _zmq_proxy_steerable()_ do.
Please, refer to these functions for the general description and usage.
We describe here only the additional hook provided by the structure "hook"
passed as a fith argument.
If the hook structure pointer is not NULL, the proxy supports a hook defined as
a structure 'zmq_proxy_hook_t' containing a data pointer to any data type and
the address of two functions of type 'zmq_hook_f'. The first function,
'front2back_hook' is to manipulate the message received from the frontend, before
it is sent to the backend. The second one, 'back2front_hook' is for the way back.
Both functions receive as an argument in addition to a pointer to the message, the
pointer to the data passed in the 'zmq_proxy_hook_t' structure. This data makes it
possible to manage stateful behaviours in the proxy. They receive also the frame
number n_ which is 1 for the first frame, n for the nth one, 0 for the last one. This
enable to manage specifically the identity frame when ROUTER | STREAM sockets are
concerned. Moreover, to give the hook full capabilities, the three sockets passed
as parameters to the proxy are also provided to the hook functions, enabling to
consume some frames or to add others:
----
typedef int (*zmq_hook_f)(void *frontend, void *backend, void *capture,
zmq_msg_t* msg_, size_t n_, void *data_);
typedef struct zmq_proxy_hook_t {
void *data;
zmq_hook_f front2back_hook;
zmq_hook_f back2front_hook;
} zmq_proxy_hook_t;
----
If the hook pointer is NULL, zmq_proxy_hook behaves exactly as if zmq_proxy
or zmq_proxy_steerable had been called.
Refer to linkzmq:zmq_socket[3] for a description of the available socket types.
Refer to linkzmq:zmq_proxy[3] for a description of the zmq_proxy.
Refer to linkzmq:zmq_proxy_steerable[3] for a description of the zmq_proxy_steerable.
EXAMPLE USAGE
-------------
Filter
~~~~~~
The most simple use is to simply filter the messages for example against vulgarity.
Messages are simply scanned against a dictionnary and target words are replaced.
ROUTER | STREAM / ROUTER | STREAM proxy
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The data field enables to multiplex as desired identities in a ROUTER/ROUTER or in a
STREAM/STREAM proxy or what ever. Such architecture enables also custom load balancers.
Sticky ROUTER / ROUTER proxy
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The data field enables to manage sticky identity pairing in a ROUTER/ROUTER proxy.
Security mechanism proxying
~~~~~~~~~~~~~~~~~~~~~~~~~~~
We expect to be able to proxy CURVE with the use of this feature.
Tests
~~~~~
In an existing application, just change zmq_proxy or zmq_proxy_steerable for
zmq_proxy_hook to test anythink, even "Man in the middle" attacks ws security
mechanisms with a STREAM/STREAM proxy.
RETURN VALUE
------------
The _zmq_proxy_hook()_ function returns the same values than zmq_proxy
or zmq_proxy_steerable in the same conditions of use.
EXAMPLE
-------
This simple example aims at uppercasing the traffic between the frontend and the
backend, and lowercasing it on the way back.
.Setup the hook
----
struct stats_t {
int qt_upper_case;
int qt_lower_case;
} stats = {NULL, 0, 0};
int
upper_case(void*, void*, void*, zmq_msg_t* msg_, size_t n_, void *stats_)
{
size_t size = zmq_msg_size(msg_);
if (!size || n_ == 1) return 0; // skip identity and 0 frames
char* message = (char*) zmq_msg_data(msg_);
for (size_t i = 0; i < size; i++)
if ('a' <= message[i] && message[i] <= 'z')
message[i] += 'A' - 'a';
struct stats_t* stats = (struct stats_t*) stats_;
stats->qt_upper_case++;
return 0;
}
int
lower_case(void*, void*, void*, zmq_msg_t* msg_, size_t n_, void *stats_)
{
size_t size = zmq_msg_size(msg_);
if (!size || n_ == 1) return 0; // skip identity and 0 frames
char* message = (char*) zmq_msg_data(msg_);
for (size_t i = 0; i < size; i++)
if ('A' <= message[i] && message[i] <= 'Z')
message[i] += 'a' - 'A';
struct stats_t* stats = (struct stats_t*) stats_;
stats->qt_lower_case++;
return 0;
}
zmq_proxy_hook_t hook = {
&stats, // data used by the hook functions, passed as void* data_
upper_case, // hook for messages going from frontend to backend
lower_case // hook for messages going from backend to frontend
};
----
.in main:
----
int
main (void)
{
setup_test_environment ();
void *context = zmq_ctx_new ();
assert (context);
// Create frontend, backend and control sockets
void *frontend = zmq_socket (context, ZMQ_ROUTER);
assert (backend);
void *backend = zmq_socket (context, ZMQ_DEALER);
assert (frontend);
void *control = zmq_socket (context, ZMQ_PUB);
assert (control);
// Bind sockets to TCP ports
assert (zmq_bind (frontend, "tcp://*:5555") == 0);
assert (zmq_bind (backend, "tcp://*:5556") == 0);
assert (zmq_connect (control, "tcp://*:5557") == 0);
// Start the queue proxy, which runs until ETERM or "TERMINATE"
// received on the control socket
zmq_proxy_hook (frontend, backend, NULL, &hook, control);
printf("frontend to backend hook hits = %d\nbackend to frontend hook hits = %d\n", stats.qt_upper_case, stats.qt_lower_case);
// close sockets and context
rc = zmq_close (control);
assert (rc == 0);
rc = zmq_close (backend);
assert (rc == 0);
rc = zmq_close (frontend);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0;
}
----
.somewhere, the proxy is stopped with:
----
rc = zmq_send (control, "TERMINATE", 9, 0); // stops the hooked proxy
assert (rc == 9);
----
.cf test_proxy.cpp for a full implementation of this test, with clients and workers.
SEE ALSO
--------
linkzmq:zmq_proxy[3]
linkzmq:zmq_proxy_steerable[3]
linkzmq:zmq_bind[3]
linkzmq:zmq_connect[3]
linkzmq:zmq_socket[3]
linkzmq:zmq[7]
AUTHORS
-------
This page was written by the 0MQ community. To make a change please
read the 0MQ Contribution Policy at <http://www.zeromq.org/docs:contributing>.
......@@ -398,18 +398,8 @@ ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
/* Built-in message proxy (3-way) */
#define ZMQ_PROXY_CHAIN_MAX_LENGTH 10
ZMQ_EXPORT int zmq_proxy (void *frontend, void *backend, void *capture);
ZMQ_EXPORT int zmq_proxy_steerable (void *frontend, void *backend, void *capture, void *control);
ZMQ_EXPORT int zmq_proxy_hook (void *frontend, void *backend, void *capture, void *hook, void *control);
ZMQ_EXPORT int zmq_proxy_chain (void **frontends_, void **backends_, void *capture_, void **hooks_, void *control_);
typedef int (*zmq_hook_f)(void *frontend, void *backend, void *capture, zmq_msg_t* msg_, size_t n_, void *data_);
typedef struct zmq_proxy_hook_t {
void *data;
zmq_hook_f front2back_hook;
zmq_hook_f back2front_hook;
} zmq_proxy_hook_t;
/* Encode a binary key as printable text using ZMQ RFC 32 */
ZMQ_EXPORT char *zmq_z85_encode (char *dest, uint8_t *data, size_t size);
......
......@@ -53,8 +53,7 @@
// zmq.h must be included *after* poll.h for AIX to build properly
#include "../include/zmq.h"
int
capture(
int capture(
class zmq::socket_base_t *capture_,
zmq::msg_t& msg_,
int more_ = 0)
......@@ -75,18 +74,15 @@ capture(
return 0;
}
int
forward(
int forward(
class zmq::socket_base_t *from_,
class zmq::socket_base_t *to_,
class zmq::socket_base_t *capture_,
zmq::msg_t& msg_,
zmq::hook_f do_hook_,
void *data_)
zmq::msg_t& msg_)
{
int more;
size_t moresz;
for (size_t n = 1;; n++) {
while (true) {
int rc = from_->recv (&msg_, 0);
if (unlikely (rc < 0))
return -1;
......@@ -101,13 +97,6 @@ forward(
if (unlikely (rc < 0))
return -1;
// Hook
if (do_hook_) {
rc = (*do_hook_)(from_, to_, capture_, &msg_, more ? n : 0, data_); // first message: n == 1, mth message: n == m, last message: n == 0
if (unlikely (rc < 0))
return -1;
}
rc = to_->send (&msg_, more? ZMQ_SNDMORE: 0);
if (unlikely (rc < 0))
return -1;
......@@ -117,13 +106,11 @@ forward(
return 0;
}
int
zmq::proxy (
class socket_base_t **frontend_,
class socket_base_t **backend_,
int zmq::proxy (
class socket_base_t *frontend_,
class socket_base_t *backend_,
class socket_base_t *capture_,
class socket_base_t *control_,
zmq::proxy_hook_t **hook_)
class socket_base_t *control_)
{
msg_t msg;
int rc = msg.init ();
......@@ -135,41 +122,12 @@ zmq::proxy (
int more;
size_t moresz;
size_t n = 0; // number of pair of sockets: the array ends with NULL
for (;; n++) { // counts the number of pair of sockets
if (!frontend_[n] && !backend_[n])
break;
if (!frontend_[n] || !backend_[n]) {
errno = EFAULT;
return -1;
}
}
if (!n) {
errno = EFAULT;
return -1;
}
// avoid dynamic allocation as we have no guarranty to reach the deallocator => limit the chain length
zmq_assert(n <= ZMQ_PROXY_CHAIN_MAX_LENGTH);
zmq_pollitem_t items [2 * ZMQ_PROXY_CHAIN_MAX_LENGTH + 1]; // +1 for the control socket
static zmq_pollitem_t null_item = { NULL, 0, ZMQ_POLLIN, 0 };
static zmq::proxy_hook_t dummy_hook = {NULL, NULL, NULL};
static zmq::proxy_hook_t* no_hooks[ZMQ_PROXY_CHAIN_MAX_LENGTH];
if (!hook_)
hook_ = no_hooks;
else
for (size_t i = 0; i < n; i++)
if (!hook_[i]) // Check if a hook is used
hook_[i] = &dummy_hook;
for (size_t i = 0; i < n; i++) {
memcpy(&items[2 * i], &null_item, sizeof(null_item));
items[2 * i].socket = frontend_[i];
memcpy(&items[2 * i + 1], &null_item, sizeof(null_item));
items[2 * i + 1].socket = backend_[i];
no_hooks[i] = &dummy_hook;
}
memcpy(&items[2 * n], &null_item, sizeof(null_item));
items[2 * n].socket = control_;
int qt_poll_items = (control_ ? 2 * n + 1 : 2 * n);
zmq_pollitem_t items [] = {
{ frontend_, 0, ZMQ_POLLIN, 0 },
{ backend_, 0, ZMQ_POLLIN, 0 },
{ control_, 0, ZMQ_POLLIN, 0 }
};
int qt_poll_items = (control_ ? 3 : 2);
// Proxy can be in these three states
enum {
......@@ -185,7 +143,7 @@ zmq::proxy (
return -1;
// Process a control command if any
if (control_ && items [2 * n].revents & ZMQ_POLLIN) {
if (control_ && items [2].revents & ZMQ_POLLIN) {
rc = control_->recv (&msg, 0);
if (unlikely (rc < 0))
return -1;
......@@ -214,24 +172,20 @@ zmq::proxy (
zmq_assert (false);
}
}
// process each pair of sockets
for (size_t i = 0; i < n; i++) {
// Process a request
if (state == active
&& items [2 * i].revents & ZMQ_POLLIN) {
rc = forward(frontend_[i], backend_[i], capture_, msg, hook_[i]->front2back_hook, hook_[i]->data);
&& items [0].revents & ZMQ_POLLIN) {
rc = forward(frontend_, backend_, capture_,msg);
if (unlikely (rc < 0))
return -1;
}
// Process a reply
if (state == active
&& items [2 * i + 1].revents & ZMQ_POLLIN) {
rc = forward(backend_[i], frontend_[i], capture_, msg, hook_[i]->back2front_hook, hook_[i]->data);
&& items [1].revents & ZMQ_POLLIN) {
rc = forward(backend_, frontend_, capture_,msg);
if (unlikely (rc < 0))
return -1;
}
}
}
return 0;
}
......@@ -22,22 +22,11 @@
namespace zmq
{
typedef int (*hook_f)(void *frontend, void *backend, void *capture, void* msg_, size_t n_, void *data_);
struct proxy_hook_t
{
void *data;
hook_f front2back_hook;
hook_f back2front_hook;
};
int proxy (
class socket_base_t **frontend_,
class socket_base_t **backend_,
class socket_base_t *capture_ = NULL,
class socket_base_t *control_ = NULL, // backward compatibility without this argument
proxy_hook_t **hook_ = NULL // backward compatibility without this argument
);
class socket_base_t *frontend_,
class socket_base_t *backend_,
class socket_base_t *capture_,
class socket_base_t *control_ = NULL); // backward compatibility without this argument
}
#endif
......@@ -1018,62 +1018,36 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
// The proxy functionality
// Compile time check whether proxy_hook_t fits into zmq_proxy_hook_t.
typedef char check_proxy_hook_t_size
[sizeof (zmq::proxy_hook_t) == sizeof (zmq_proxy_hook_t) ? 1 : -1];
int zmq_proxy (void *frontend_, void *backend_, void *capture_)
{
zmq::socket_base_t* frontends_[] = {(zmq::socket_base_t*) frontend_, NULL};
zmq::socket_base_t* backends_[] = {(zmq::socket_base_t*) backend_, NULL};
if (!frontend_ || !backend_) {
errno = EFAULT;
return -1;
}
return zmq::proxy (
(zmq::socket_base_t**) frontends_,
(zmq::socket_base_t**) backends_,
(zmq::socket_base_t*) frontend_,
(zmq::socket_base_t*) backend_,
(zmq::socket_base_t*) capture_);
}
int zmq_proxy_steerable (void *frontend_, void *backend_, void *capture_, void *control_)
{
zmq::socket_base_t* frontends_[] = {(zmq::socket_base_t*) frontend_, NULL};
zmq::socket_base_t* backends_[] = {(zmq::socket_base_t*) backend_, NULL};
if (!frontend_ || !backend_) {
errno = EFAULT;
return -1;
}
return zmq::proxy (
(zmq::socket_base_t**) frontends_,
(zmq::socket_base_t**) backends_,
(zmq::socket_base_t*) frontend_,
(zmq::socket_base_t*) backend_,
(zmq::socket_base_t*) capture_,
(zmq::socket_base_t*) control_);
}
int zmq_proxy_hook (void *frontend_, void *backend_, void *capture_, void *hook_, void *control_)
{
zmq::socket_base_t* frontends_[] = {(zmq::socket_base_t*) frontend_, NULL};
zmq::socket_base_t* backends_[] = {(zmq::socket_base_t*) backend_, NULL};
zmq::proxy_hook_t* hooks_[] = {(zmq::proxy_hook_t*) hook_};
return zmq::proxy (
(zmq::socket_base_t**) frontends_,
(zmq::socket_base_t**) backends_,
(zmq::socket_base_t*) capture_,
(zmq::socket_base_t*) control_,
(zmq::proxy_hook_t**) hooks_);
}
int zmq_proxy_chain (void **frontends_, void **backends_, void *capture_, void **hooks_, void *control_)
{
return zmq::proxy (
(zmq::socket_base_t**) frontends_,
(zmq::socket_base_t**) backends_,
(zmq::socket_base_t*) capture_,
(zmq::socket_base_t*) control_,
(zmq::proxy_hook_t**) hooks_);
}
// The deprecated device functionality
int zmq_device (int /* type */, void *frontend_, void *backend_)
{
zmq::socket_base_t* frontends_[] = {(zmq::socket_base_t*) frontend_, NULL};
zmq::socket_base_t* backends_[] = {(zmq::socket_base_t*) backend_, NULL};
return zmq::proxy (
(zmq::socket_base_t**) frontends_,
(zmq::socket_base_t**) backends_);
(zmq::socket_base_t*) frontend_,
(zmq::socket_base_t*) backend_, NULL);
}
......@@ -44,7 +44,6 @@ noinst_PROGRAMS = test_system \
test_inproc_connect \
test_issue_566 \
test_proxy \
test_proxy_chain \
test_abstract_ipc \
test_many_sockets \
test_ipc_wildcard \
......@@ -111,7 +110,6 @@ test_conflate_SOURCES = test_conflate.cpp
test_inproc_connect_SOURCES = test_inproc_connect.cpp
test_issue_566_SOURCES = test_issue_566.cpp
test_proxy_SOURCES = test_proxy.cpp
test_proxy_chain_SOURCES = test_proxy_chain.cpp
test_abstract_ipc_SOURCES = test_abstract_ipc.cpp
test_many_sockets_SOURCES = test_many_sockets.cpp
test_ipc_wildcard_SOURCES = test_ipc_wildcard.cpp
......
......@@ -41,48 +41,6 @@
#define QT_CLIENTS 3
#define is_verbose 0
// Our test Hook that uppercase the message from the frontend to the backend and vice versa
struct stats_t {
void *ctx; // not usefull for the kook itself, but convenient to provide the thread with it without building an additional struct for arguments
int qt_upper_case;
int qt_lower_case;
} stats = {NULL, 0, 0};
int
upper_case(void*, void*, void*, zmq_msg_t* msg_, size_t n_, void *stats_)
{
size_t size = zmq_msg_size(msg_);
if (!size || n_ == 1) return 0; // skip identity and 0 frames
char* message = (char*) zmq_msg_data(msg_);
for (size_t i = 0; i < size; i++)
if ('a' <= message[i] && message[i] <= 'z')
message[i] += 'A' - 'a';
struct stats_t* stats = (struct stats_t*) stats_;
stats->qt_upper_case++;
return 0;
}
int
lower_case(void*, void*, void*, zmq_msg_t* msg_, size_t n_, void *stats_)
{
size_t size = zmq_msg_size(msg_);
if (!size || n_ == 1) return 0; // skip identity and 0 frames
char* message = (char*) zmq_msg_data(msg_);
for (size_t i = 0; i < size; i++)
if ('A' <= message[i] && message[i] <= 'Z')
message[i] += 'a' - 'A';
struct stats_t* stats = (struct stats_t*) stats_;
stats->qt_lower_case++;
return 0;
}
zmq_proxy_hook_t hook = {
&stats, // data used by the hook functions if needed, NULL otherwise
upper_case, // hook for messages going from frontend to backend
lower_case // hook for messages going from backend to frontend
};
static void
client_task (void *ctx)
{
......@@ -128,19 +86,13 @@ client_task (void *ctx)
}
if (items [1].revents & ZMQ_POLLIN) {
rc = zmq_recv (control, content, CONTENT_SIZE_MAX, 0);
if (rc > 0) {
if (is_verbose) {
if (rc == 9 && memcmp(content, "TERMINATE", 9) == 0)
content[9] = '\0'; // required to have a clean output since '\0' is not included in the command
printf("client receive - identity = %s command = %s\n", identity, content);
}
if (memcmp (content, "STOP", 4) == 0) {
if (is_verbose) printf("client receive - identity = %s command = %s\n", identity, content);
if (memcmp (content, "TERMINATE", 9) == 0) {
run = false;
break;
}
}
}
}
sprintf(content, "request #%03d", ++request_nbr); // CONTENT_SIZE
rc = zmq_send (client, content, CONTENT_SIZE, 0);
assert (rc == CONTENT_SIZE);
......@@ -161,11 +113,8 @@ client_task (void *ctx)
static void server_worker (void *ctx);
void
server_task (void *arg)
server_task (void *ctx)
{
zmq_proxy_hook_t* hook = (zmq_proxy_hook_t*) arg;
struct stats_t* stats = (struct stats_t*) hook->data;
void* ctx = stats->ctx;
// Frontend socket talks to clients over TCP
void *frontend = zmq_socket (ctx, ZMQ_ROUTER);
assert (frontend);
......@@ -193,13 +142,7 @@ server_task (void *arg)
threads[thread_nbr] = zmq_threadstart (&server_worker, ctx);
// Connect backend to frontend via a proxy
if (is_verbose)
printf("---------- standard proxy ----------\n");
zmq_proxy_steerable (frontend, backend, NULL, control); // until TERMINATE is sent on control
// Connect backend to frontend via a hooked proxy
if (is_verbose)
printf("---------- hooked proxy ----------\n");
zmq_proxy_hook (frontend, backend, NULL, hook, control); // until TERMINATE is sent on control
zmq_proxy_steerable (frontend, backend, NULL, control);
for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++)
zmq_threadclose (threads[thread_nbr]);
......@@ -239,12 +182,9 @@ server_worker (void *ctx)
while (run) {
rc = zmq_recv (control, content, CONTENT_SIZE_MAX, ZMQ_DONTWAIT); // usually, rc == -1 (no message)
if (rc > 0) {
if (is_verbose) {
if (rc == 9 && memcmp(content, "TERMINATE", 9) == 0)
content[9] = '\0'; // required to have a clean output since '\0' is not included in the command
if (is_verbose)
printf("server_worker receives command = %s\n", content);
}
if (memcmp (content, "STOP", 4) == 0)
if (memcmp (content, "TERMINATE", 9) == 0)
run = false;
}
// The DEALER socket gives us the reply envelope and message
......@@ -278,8 +218,7 @@ server_worker (void *ctx)
// The main thread simply starts several clients and a server, and then
// waits for the server to finish.
int
main (void)
int main (void)
{
setup_test_environment ();
......@@ -294,19 +233,11 @@ main (void)
void *threads [QT_CLIENTS + 1];
for (int i = 0; i < QT_CLIENTS; i++)
threads[i] = zmq_threadstart (&client_task, ctx);
stats.ctx = ctx;
threads[QT_CLIENTS] = zmq_threadstart (&server_task, &hook);
threads[QT_CLIENTS] = zmq_threadstart (&server_task, ctx);
msleep (500); // Run for 500 ms then quit
msleep (500); // Run for 500 ms the standard proxy
rc = zmq_send (control, "TERMINATE", 9, 0); // stops the standard proxy
rc = zmq_send (control, "TERMINATE", 9, 0);
assert (rc == 9);
msleep (200); // Run for 200 ms the standard proxy
rc = zmq_send (control, "TERMINATE", 9, 0); // stops the hooked proxy
assert (rc == 9);
rc = zmq_send (control, "STOP", 5, 0); // stops clients and workers (\0 is sent to ease the printf of the verbose mode)
assert (rc == 5);
if (is_verbose) printf("frontend to backend hook hits = %d\nbackend to frontend hook hits = %d\n", stats.qt_upper_case, stats.qt_lower_case);
rc = zmq_close (control);
assert (rc == 0);
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment