Commit eb3e63e2 authored by Francesco Montorsi's avatar Francesco Montorsi Committed by Luca Boccassi

Hwm tests and docs (#3242)

Add new HWM tests and more detailed documentation
parent 07502111
...@@ -148,6 +148,7 @@ test_security_zap ...@@ -148,6 +148,7 @@ test_security_zap
test_socket_null test_socket_null
test_xpub_verbose test_xpub_verbose
test_mock_pub_sub test_mock_pub_sub
test_proxy_hwm
unittest_ip_resolver unittest_ip_resolver
unittest_mtrie unittest_mtrie
unittest_poller unittest_poller
......
...@@ -422,6 +422,7 @@ test_apps = \ ...@@ -422,6 +422,7 @@ test_apps = \
tests/test_inproc_connect \ tests/test_inproc_connect \
tests/test_issue_566 \ tests/test_issue_566 \
tests/test_proxy \ tests/test_proxy \
tests/test_proxy_hwm \
tests/test_proxy_single_socket \ tests/test_proxy_single_socket \
tests/test_proxy_terminate \ tests/test_proxy_terminate \
tests/test_getsockopt_memset \ tests/test_getsockopt_memset \
...@@ -628,6 +629,10 @@ tests_test_issue_566_LDADD = src/libzmq.la ...@@ -628,6 +629,10 @@ tests_test_issue_566_LDADD = src/libzmq.la
tests_test_proxy_SOURCES = tests/test_proxy.cpp tests_test_proxy_SOURCES = tests/test_proxy.cpp
tests_test_proxy_LDADD = src/libzmq.la tests_test_proxy_LDADD = src/libzmq.la
tests_test_proxy_hwm_SOURCES = tests/test_proxy_hwm.cpp
tests_test_proxy_hwm_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_proxy_hwm_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_proxy_single_socket_SOURCES = tests/test_proxy_single_socket.cpp tests_test_proxy_single_socket_SOURCES = tests/test_proxy_single_socket.cpp
tests_test_proxy_single_socket_LDADD = src/libzmq.la tests_test_proxy_single_socket_LDADD = src/libzmq.la
......
...@@ -640,6 +640,10 @@ blocking or dropping sent messages. Refer to the individual socket descriptions ...@@ -640,6 +640,10 @@ blocking or dropping sent messages. Refer to the individual socket descriptions
in linkzmq:zmq_socket[3] for details on the exact action taken for each socket in linkzmq:zmq_socket[3] for details on the exact action taken for each socket
type. type.
NOTE: 0MQ does not guarantee that the socket will be able to queue as many as ZMQ_RCVHWM
messages, and the actual limit may be lower or higher, depending on socket transport.
A notable example is for sockets using TCP transport; see linkzmq:zmq_tcp[7].
[horizontal] [horizontal]
Option value type:: int Option value type:: int
Option value unit:: messages Option value unit:: messages
...@@ -858,7 +862,9 @@ type. ...@@ -858,7 +862,9 @@ type.
NOTE: 0MQ does not guarantee that the socket will accept as many as ZMQ_SNDHWM NOTE: 0MQ does not guarantee that the socket will accept as many as ZMQ_SNDHWM
messages, and the actual limit may be as much as 90% lower depending on the messages, and the actual limit may be as much as 90% lower depending on the
flow of messages on the socket. flow of messages on the socket. The socket may even be able to accept more messages
than the ZMQ_SNDHWM threshold; a notable example is for sockets using TCP transport;
see linkzmq:zmq_tcp[7].
[horizontal] [horizontal]
Option value type:: int Option value type:: int
......
...@@ -69,6 +69,30 @@ A 'peer address' may be specified by either of the following: ...@@ -69,6 +69,30 @@ A 'peer address' may be specified by either of the following:
Note: A description of the ZeroMQ Message Transport Protocol (ZMTP) which is Note: A description of the ZeroMQ Message Transport Protocol (ZMTP) which is
used by the TCP transport can be found at <http://rfc.zeromq.org/spec:15> used by the TCP transport can be found at <http://rfc.zeromq.org/spec:15>
HWM
---
For the TCP transport, the high water mark (HWM) mechanism works in conjunction
with the TCP socket buffers handled at OS level.
Depending on the OS and several other factors the size of such TCP buffers will
be different. Moreover TCP buffers provided by the OS will accomodate a varying
number of messages depending on the size of messages (unlike ZMQ HWM settings
the TCP socket buffers are measured in bytes and not messages).
This may result in apparently inexplicable behaviors: e.g., you may expect that
setting ZMQ_SNDHWM to 100 on a socket using TCP transport will have the effect
of blocking the transmission of the 101-th message if the receiver is slow.
This is very unlikely when using TCP transport since OS TCP buffers will typically
provide enough buffering to allow you sending much more than 100 messages.
Of course if the receiver is slow, transmitting on a TCP ZMQ socket will eventually trigger
the "mute state" of the socket; simply don't rely on the exact HWM value.
Obviously the same considerations apply for the receive HWM (see ZMQ_RCVHWM).
EXAMPLES EXAMPLES
-------- --------
.Assigning a local address to a socket .Assigning a local address to a socket
......
...@@ -86,6 +86,7 @@ if(NOT WIN32) ...@@ -86,6 +86,7 @@ if(NOT WIN32)
test_rebind_ipc test_rebind_ipc
test_reqrep_ipc test_reqrep_ipc
test_proxy test_proxy
test_proxy_hwm
test_proxy_single_socket test_proxy_single_socket
test_proxy_terminate test_proxy_terminate
test_getsockopt_memset test_getsockopt_memset
......
...@@ -30,6 +30,8 @@ ...@@ -30,6 +30,8 @@
#include "testutil.hpp" #include "testutil.hpp"
#include "testutil_unity.hpp" #include "testutil_unity.hpp"
#define SOCKET_STRING_LEN (MAX_SOCKET_STRING * 4)
void setUp () void setUp ()
{ {
setup_test_context (); setup_test_context ();
...@@ -40,17 +42,20 @@ void tearDown () ...@@ -40,17 +42,20 @@ void tearDown ()
teardown_test_context (); teardown_test_context ();
} }
// const int MAX_SENDS = 10000; int test_defaults (int send_hwm_, int msg_cnt_, const char *endpoint)
int test_defaults (int send_hwm_, int msg_cnt_)
{ {
// Set up bind socket size_t len = SOCKET_STRING_LEN;
char pub_endpoint[SOCKET_STRING_LEN];
// Set up and bind PUB socket
void *pub_socket = test_context_socket (ZMQ_PUB); void *pub_socket = test_context_socket (ZMQ_PUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub_socket, "inproc://a")); TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub_socket, endpoint));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (pub_socket, ZMQ_LAST_ENDPOINT, pub_endpoint, &len));
// Set up connect socket // Set up and connect SUB socket
void *sub_socket = test_context_socket (ZMQ_SUB); void *sub_socket = test_context_socket (ZMQ_SUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub_socket, "inproc://a")); TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub_socket, pub_endpoint));
//set a hwm on publisher //set a hwm on publisher
TEST_ASSERT_SUCCESS_ERRNO ( TEST_ASSERT_SUCCESS_ERRNO (
...@@ -58,17 +63,22 @@ int test_defaults (int send_hwm_, int msg_cnt_) ...@@ -58,17 +63,22 @@ int test_defaults (int send_hwm_, int msg_cnt_)
TEST_ASSERT_SUCCESS_ERRNO ( TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sub_socket, ZMQ_SUBSCRIBE, 0, 0)); zmq_setsockopt (sub_socket, ZMQ_SUBSCRIBE, 0, 0));
// Send until we block msleep (
SETTLE_TIME); // give some time to background threads to perform PUB-SUB connection
// Send until we reach "mute" state
int send_count = 0; int send_count = 0;
while (send_count < msg_cnt_ while (send_count < msg_cnt_
&& zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT) == 0) && zmq_send (pub_socket, "test message", 13, ZMQ_DONTWAIT) == 13)
++send_count; ++send_count;
TEST_ASSERT_EQUAL_INT (send_hwm_, send_count);
msleep (SETTLE_TIME); msleep (SETTLE_TIME);
// Now receive all sent messages // Now receive all sent messages
int recv_count = 0; int recv_count = 0;
while (0 == zmq_recv (sub_socket, NULL, 0, ZMQ_DONTWAIT)) { char dummybuff[64];
while (13 == zmq_recv (sub_socket, &dummybuff, 64, ZMQ_DONTWAIT)) {
++recv_count; ++recv_count;
} }
...@@ -85,23 +95,27 @@ int receive (void *socket_) ...@@ -85,23 +95,27 @@ int receive (void *socket_)
{ {
int recv_count = 0; int recv_count = 0;
// Now receive all sent messages // Now receive all sent messages
while (0 == zmq_recv (socket_, NULL, 0, ZMQ_DONTWAIT)) { while (0 == zmq_recv (socket_, NULL, 0, 0)) {
++recv_count; ++recv_count;
} }
return recv_count; return recv_count;
} }
int test_blocking (int send_hwm_, int msg_cnt_, const char *endpoint)
int test_blocking (int send_hwm_, int msg_cnt_)
{ {
size_t len = SOCKET_STRING_LEN;
char pub_endpoint[SOCKET_STRING_LEN];
// Set up bind socket // Set up bind socket
void *pub_socket = test_context_socket (ZMQ_PUB); void *pub_socket = test_context_socket (ZMQ_PUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub_socket, "inproc://a")); TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub_socket, endpoint));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (pub_socket, ZMQ_LAST_ENDPOINT, pub_endpoint, &len));
// Set up connect socket // Set up connect socket
void *sub_socket = test_context_socket (ZMQ_SUB); void *sub_socket = test_context_socket (ZMQ_SUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub_socket, "inproc://a")); TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub_socket, pub_endpoint));
//set a hwm on publisher //set a hwm on publisher
TEST_ASSERT_SUCCESS_ERRNO ( TEST_ASSERT_SUCCESS_ERRNO (
...@@ -109,9 +123,14 @@ int test_blocking (int send_hwm_, int msg_cnt_) ...@@ -109,9 +123,14 @@ int test_blocking (int send_hwm_, int msg_cnt_)
int wait = 1; int wait = 1;
TEST_ASSERT_SUCCESS_ERRNO ( TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (pub_socket, ZMQ_XPUB_NODROP, &wait, sizeof (wait))); zmq_setsockopt (pub_socket, ZMQ_XPUB_NODROP, &wait, sizeof (wait)));
int timeout_ms = 10;
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
sub_socket, ZMQ_RCVTIMEO, &timeout_ms, sizeof (timeout_ms)));
TEST_ASSERT_SUCCESS_ERRNO ( TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sub_socket, ZMQ_SUBSCRIBE, 0, 0)); zmq_setsockopt (sub_socket, ZMQ_SUBSCRIBE, 0, 0));
msleep (SETTLE_TIME);
// Send until we block // Send until we block
int send_count = 0; int send_count = 0;
int recv_count = 0; int recv_count = 0;
...@@ -120,13 +139,15 @@ int test_blocking (int send_hwm_, int msg_cnt_) ...@@ -120,13 +139,15 @@ int test_blocking (int send_hwm_, int msg_cnt_)
if (rc == 0) { if (rc == 0) {
++send_count; ++send_count;
} else if (-1 == rc) { } else if (-1 == rc) {
// if the PUB socket blocks due to HWM, errno should be EAGAIN:
TEST_ASSERT_EQUAL_INT (EAGAIN, errno); TEST_ASSERT_EQUAL_INT (EAGAIN, errno);
recv_count += receive (sub_socket); recv_count += receive (sub_socket);
TEST_ASSERT_EQUAL_INT (send_count, recv_count);
} }
} }
msleep (2 * SETTLE_TIME); // required for TCP transport
recv_count += receive (sub_socket); recv_count += receive (sub_socket);
TEST_ASSERT_EQUAL_INT (send_count, recv_count);
// Clean up // Clean up
test_context_socket_close (sub_socket); test_context_socket_close (sub_socket);
...@@ -142,7 +163,7 @@ void test_reset_hwm () ...@@ -142,7 +163,7 @@ void test_reset_hwm ()
const int first_count = 9999; const int first_count = 9999;
const int second_count = 1100; const int second_count = 1100;
int hwm = 11024; int hwm = 11024;
char my_endpoint[MAX_SOCKET_STRING]; char my_endpoint[SOCKET_STRING_LEN];
// Set up bind socket // Set up bind socket
void *pub_socket = test_context_socket (ZMQ_PUB); void *pub_socket = test_context_socket (ZMQ_PUB);
...@@ -199,25 +220,51 @@ void test_reset_hwm () ...@@ -199,25 +220,51 @@ void test_reset_hwm ()
test_context_socket_close (pub_socket); test_context_socket_close (pub_socket);
} }
void test_defaults_1000 () void test_tcp ()
{
// send 1000 msg on hwm 1000, receive 1000, on TCP transport
TEST_ASSERT_EQUAL_INT (1000,
test_defaults (1000, 1000, "tcp://127.0.0.1:*"));
// send 100 msg on hwm 100, receive 100
TEST_ASSERT_EQUAL_INT (100, test_defaults (100, 100, "tcp://127.0.0.1:*"));
// send 6000 msg on hwm 2000, drops above hwm, only receive hwm:
TEST_ASSERT_EQUAL_INT (6000,
test_blocking (2000, 6000, "tcp://127.0.0.1:*"));
}
void test_inproc ()
{ {
// send 1000 msg on hwm 1000, receive 1000 TEST_ASSERT_EQUAL_INT (1000, test_defaults (1000, 1000, "inproc://a"));
TEST_ASSERT_EQUAL_INT (1000, test_defaults (1000, 1000)); TEST_ASSERT_EQUAL_INT (100, test_defaults (100, 100, "inproc://b"));
TEST_ASSERT_EQUAL_INT (6000, test_blocking (2000, 6000, "inproc://c"));
} }
void test_blocking_2000 () #ifndef ZMQ_HAVE_WINDOWS
void test_ipc ()
{ {
// send 6000 msg on hwm 2000, drops above hwm, only receive hwm TEST_ASSERT_EQUAL_INT (1000, test_defaults (1000, 1000, "ipc://*"));
TEST_ASSERT_EQUAL_INT (6000, test_blocking (2000, 6000)); TEST_ASSERT_EQUAL_INT (100, test_defaults (100, 100, "ipc://*"));
TEST_ASSERT_EQUAL_INT (6000, test_blocking (2000, 6000, "ipc://*"));
} }
#endif
int main () int main ()
{ {
setup_test_environment (); setup_test_environment ();
UNITY_BEGIN (); UNITY_BEGIN ();
RUN_TEST (test_defaults_1000);
RUN_TEST (test_blocking_2000); // repeat the test for both TCP, INPROC and IPC transports:
RUN_TEST (test_tcp);
RUN_TEST (test_inproc);
#ifndef ZMQ_HAVE_WINDOWS
RUN_TEST (test_ipc);
#endif
RUN_TEST (test_reset_hwm); RUN_TEST (test_reset_hwm);
return UNITY_END (); return UNITY_END ();
} }
/*
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
#include <unity.h>
//
// Asynchronous proxy test using ZMQ_XPUB_NODROP and HWM:
//
// Topology:
//
// PUB SUB
// | |
// \-----> XSUB -> XPUB -----/
// ^^^^^^^^^^^^^^
// ZMQ proxy
//
// All connections use "inproc" transport and have artificially-low HWMs set.
// Then the PUB socket starts flooding the Proxy. The SUB is artificially slow
// at receiving messages.
// This scenario simulates what happens when a SUB is slower than
// its PUB: since ZMQ_XPUB_NODROP=1, the XPUB will block and then
// also the PUB socket will block.
// The result is that 2*HWM messages will be sent before the PUB blocks.
//
// In the meanwhile asking statistics to the Proxy must NOT be blocking.
//
#define HWM 10
#define NUM_BYTES_PER_MSG 50000
typedef struct
{
void *context;
const char *frontend_endpoint;
const char *backend_endpoint;
const char *control_endpoint;
bool subscriber_received_all;
} proxy_hwm_cfg_t;
static void lower_tcp_buff (void *sock_)
{
int sndBuff;
size_t sndBuffSz = sizeof sndBuff;
int rc = zmq_getsockopt (sock_, ZMQ_SNDBUF, &sndBuff, &sndBuffSz);
assert (rc == 0);
int newBuff = 1000;
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sock_, ZMQ_SNDBUF, &newBuff, sizeof (newBuff)));
rc = zmq_getsockopt (sock_, ZMQ_SNDBUF, &sndBuff, &sndBuffSz);
assert (rc == 0);
}
static void lower_hwm (void *skt)
{
int send_hwm_ = HWM;
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (skt, ZMQ_SNDHWM, &send_hwm_, sizeof (send_hwm_)));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (skt, ZMQ_RCVHWM, &send_hwm_, sizeof (send_hwm_)));
}
static void publisher_thread_main (void *pvoid)
{
proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid;
void *pubsocket = zmq_socket (cfg->context, ZMQ_PUB);
assert (pubsocket);
lower_hwm (pubsocket);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pubsocket, cfg->frontend_endpoint));
int optval = 1;
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (pubsocket, ZMQ_XPUB_NODROP, &optval, sizeof (optval)));
msleep (SETTLE_TIME);
uint64_t send_count = 0;
while (true) {
zmq_msg_t msg;
int rc = zmq_msg_init_size (&msg, NUM_BYTES_PER_MSG);
assert (rc == 0);
/* Fill in message content with 'AAAAAA' */
memset (zmq_msg_data (&msg), 'A', NUM_BYTES_PER_MSG);
/* Send the message to the socket */
rc = zmq_msg_send (&msg, pubsocket, ZMQ_DONTWAIT);
if (rc != -1) {
send_count++;
} else {
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
break;
}
}
// VERIFY EXPECTED RESULTS
TEST_ASSERT (4 * HWM == send_count || 2 * HWM == send_count);
// CLEANUP
zmq_close (pubsocket);
}
static void subscriber_thread_main (void *pvoid)
{
proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid;
void *subsocket = zmq_socket (cfg->context, ZMQ_SUB);
assert (subsocket);
lower_hwm (subsocket);
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (subsocket, ZMQ_SUBSCRIBE, 0, 0));
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (subsocket, cfg->backend_endpoint));
lower_tcp_buff (subsocket);
// receive all sent messages
uint64_t rxsuccess = 0;
bool success = true;
while (success) {
zmq_msg_t msg;
int rc = zmq_msg_init (&msg);
assert (rc == 0);
rc = zmq_msg_recv (&msg, subsocket, 0);
if (rc != -1) {
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
rxsuccess++;
// after receiving 1st message, set a finite timeout (default is infinite)
int timeout_ms = 100;
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
subsocket, ZMQ_RCVTIMEO, &timeout_ms, sizeof (timeout_ms)));
} else {
break;
}
msleep (100);
}
// VERIFY EXPECTED RESULTS
TEST_ASSERT (4 * HWM == rxsuccess || 2 * HWM == rxsuccess);
// INFORM THAT WE COMPLETED:
cfg->subscriber_received_all = true;
// CLEANUP
zmq_close (subsocket);
}
bool recv_stat (void *sock_, bool last_, uint64_t *res)
{
zmq_msg_t stats_msg;
int rc = zmq_msg_init (&stats_msg);
assert (rc == 0);
rc = zmq_msg_recv (&stats_msg, sock_, 0); //ZMQ_DONTWAIT);
if (rc == -1 && errno == EAGAIN) {
rc = zmq_msg_close (&stats_msg);
assert (rc == 0);
return false; // cannot retrieve the stat
}
assert (rc == sizeof (uint64_t));
memcpy (res, zmq_msg_data (&stats_msg), zmq_msg_size (&stats_msg));
rc = zmq_msg_close (&stats_msg);
assert (rc == 0);
int more;
size_t moresz = sizeof more;
rc = zmq_getsockopt (sock_, ZMQ_RCVMORE, &more, &moresz);
assert (rc == 0);
assert ((last_ && !more) || (!last_ && more));
return true;
}
// Utility function to interrogate the proxy:
typedef struct
{
uint64_t msg_in;
uint64_t bytes_in;
uint64_t msg_out;
uint64_t bytes_out;
} zmq_socket_stats_t;
typedef struct
{
zmq_socket_stats_t frontend;
zmq_socket_stats_t backend;
} zmq_proxy_stats_t;
bool check_proxy_stats (void *control_proxy_)
{
zmq_proxy_stats_t total_stats;
int rc;
rc = zmq_send (control_proxy_, "STATISTICS", 10, ZMQ_DONTWAIT);
assert (rc == 10 || (rc == -1 && errno == EAGAIN));
if (rc == -1 && errno == EAGAIN) {
return false;
}
// first frame of the reply contains FRONTEND stats:
if (!recv_stat (control_proxy_, false, &total_stats.frontend.msg_in)) {
return false;
}
recv_stat (control_proxy_, false, &total_stats.frontend.bytes_in);
recv_stat (control_proxy_, false, &total_stats.frontend.msg_out);
recv_stat (control_proxy_, false, &total_stats.frontend.bytes_out);
// second frame of the reply contains BACKEND stats:
recv_stat (control_proxy_, false, &total_stats.backend.msg_in);
recv_stat (control_proxy_, false, &total_stats.backend.bytes_in);
recv_stat (control_proxy_, false, &total_stats.backend.msg_out);
recv_stat (control_proxy_, true, &total_stats.backend.bytes_out);
return true;
}
static void proxy_stats_asker_thread_main (void *pvoid)
{
proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid;
// CONTROL REQ
void *control_req =
zmq_socket (cfg->context,
ZMQ_REQ); // this one can be used to send command to the proxy
assert (control_req);
// connect CONTROL-REQ: a socket to which send commands
int rc = zmq_connect (control_req, cfg->control_endpoint);
assert (rc == 0);
// IMPORTANT: by setting the tx/rx timeouts, we avoid getting blocked when interrogating a proxy which is
// itself blocked in a zmq_msg_send() on its XPUB socket having ZMQ_XPUB_NODROP=1!
int optval = 10;
rc = zmq_setsockopt (control_req, ZMQ_SNDTIMEO, &optval, sizeof (optval));
assert (rc == 0);
rc = zmq_setsockopt (control_req, ZMQ_RCVTIMEO, &optval, sizeof (optval));
assert (rc == 0);
optval = 10;
rc =
zmq_setsockopt (control_req, ZMQ_REQ_CORRELATE, &optval, sizeof (optval));
assert (rc == 0);
rc =
zmq_setsockopt (control_req, ZMQ_REQ_RELAXED, &optval, sizeof (optval));
assert (rc == 0);
// Start!
while (!cfg->subscriber_received_all) {
#ifdef ZMQ_BUILD_DRAFT_API
check_proxy_stats (control_req);
#endif
usleep (1000); // 1ms -> in best case we will get 1000updates/second
}
// Ask the proxy to exit: the subscriber has received all messages
rc = zmq_send (control_req, "TERMINATE", 9, 0);
assert (rc == 9);
zmq_close (control_req);
}
static void proxy_thread_main (void *pvoid)
{
proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid;
int rc;
// FRONTEND SUB
void *frontend_xsub = zmq_socket (
cfg->context,
ZMQ_XSUB); // the frontend is the one exposed to internal threads (INPROC)
assert (frontend_xsub);
lower_hwm (frontend_xsub);
// bind FRONTEND
rc = zmq_bind (frontend_xsub, cfg->frontend_endpoint);
assert (rc == 0);
// BACKEND PUB
void *backend_xpub = zmq_socket (
cfg->context,
ZMQ_XPUB); // the backend is the one exposed to the external world (TCP)
assert (backend_xpub);
int optval = 1;
rc =
zmq_setsockopt (backend_xpub, ZMQ_XPUB_NODROP, &optval, sizeof (optval));
assert (rc == 0);
lower_hwm (backend_xpub);
// bind BACKEND
rc = zmq_bind (backend_xpub, cfg->backend_endpoint);
assert (rc == 0);
// CONTROL REP
void *control_rep = zmq_socket (
cfg->context,
ZMQ_REP); // this one is used by the proxy to receive&reply to commands
assert (control_rep);
// bind CONTROL
rc = zmq_bind (control_rep, cfg->control_endpoint);
assert (rc == 0);
// start proxying!
zmq_proxy_steerable (frontend_xsub, backend_xpub, NULL, control_rep);
zmq_close (frontend_xsub);
zmq_close (backend_xpub);
zmq_close (control_rep);
}
// The main thread simply starts several clients and a server, and then
// waits for the server to finish.
int main (void)
{
setup_test_environment ();
void *context = zmq_ctx_new ();
assert (context);
// START ALL SECONDARY THREADS
proxy_hwm_cfg_t cfg;
cfg.context = context;
cfg.frontend_endpoint = "inproc://frontend";
cfg.backend_endpoint = "inproc://backend";
cfg.control_endpoint = "inproc://ctrl";
cfg.subscriber_received_all = false;
void *proxy = zmq_threadstart (&proxy_thread_main, (void *) &cfg);
assert (proxy != 0);
void *publisher = zmq_threadstart (&publisher_thread_main, (void *) &cfg);
assert (publisher != 0);
void *subscriber = zmq_threadstart (&subscriber_thread_main, (void *) &cfg);
assert (subscriber != 0);
void *asker =
zmq_threadstart (&proxy_stats_asker_thread_main, (void *) &cfg);
assert (asker != 0);
// CLEANUP
zmq_threadclose (publisher);
zmq_threadclose (subscriber);
zmq_threadclose (asker);
zmq_threadclose (proxy);
int rc = zmq_ctx_term (context);
assert (rc == 0);
return 0;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment