zmq_proxy_steerable.txt 3.91 KB
Newer Older
1 2 3 4 5
zmq_proxy_steerable(3)
======================

NAME
----
6
zmq_proxy_steerable - built-in 0MQ proxy with control flow
7 8 9 10 11


SYNOPSIS
--------
*int zmq_proxy_steerable (const void '*frontend', const void '*backend', 
Pieter Hintjens's avatar
Pieter Hintjens committed
12
     const void '*capture', const void '*control');*
13 14 15 16 17 18 19 20 21 22


DESCRIPTION
-----------
The _zmq_proxy_steerable()_ function starts the built-in 0MQ proxy in the 
current application thread, as _zmq_proxy()_ do. Please, refer to this function
for the general description and usage. We describe here only the additional 
control flow provided by the socket passed as the fourth argument "control".

If the control socket is not NULL, the proxy supports control flow. If 
23 24
'PAUSE' is received on this socket, the proxy suspends its activities. If 
'RESUME' is received, it goes on. If 'TERMINATE' is received, it terminates
25
smoothly. If 'STATISTICS' is received, the proxy will reply on the control socket
26 27
sending a multipart message with 8 frames, each with an unsigned integer 64-bit
wide that provide in the following order:
28 29 30 31 32 33 34 35
 - number of messages received by the frontend socket
 - number of bytes received by the frontend socket
 - number of messages sent out the frontend socket
 - number of bytes sent out the frontend socket
 - number of messages received by the backend socket
 - number of bytes received by the backend socket
 - number of messages sent out the backend socket
 - number of bytes sent out the backend socket
36

37 38 39
At start, the proxy runs normally as if zmq_proxy was used.

If the control socket is NULL, the function behave exactly as if linkzmq:zmq_proxy[3]
40 41 42 43 44 45 46 47 48 49 50 51 52
had been called.


Refer to linkzmq:zmq_socket[3] for a description of the available socket types.
Refer to linkzmq:zmq_proxy[3] for a description of the zmq_proxy.

EXAMPLE USAGE
-------------
cf zmq_proxy

RETURN VALUE
------------
The _zmq_proxy_steerable()_ function returns 0 if TERMINATE is sent to its
53 54 55
control socket. Otherwise, it returns `-1` and 'errno' set to *ETERM* or
*EINTR* (the 0MQ 'context' associated with either of the specified sockets was
terminated).
56 57 58 59 60 61 62 63 64 65 66 67 68


EXAMPLE
-------
.Creating a shared queue proxy
----
//  Create frontend, backend and control sockets
void *frontend = zmq_socket (context, ZMQ_ROUTER);
assert (backend);
void *backend = zmq_socket (context, ZMQ_DEALER);
assert (frontend);
void *control = zmq_socket (context, ZMQ_SUB);
assert (control);
Pieter Hintjens's avatar
Pieter Hintjens committed
69

70 71 72 73
//  Bind sockets to TCP ports
assert (zmq_bind (frontend, "tcp://*:5555") == 0);
assert (zmq_bind (backend, "tcp://*:5556") == 0);
assert (zmq_connect (control, "tcp://*:5557") == 0);
Pieter Hintjens's avatar
Pieter Hintjens committed
74

75 76
// Subscribe to the control socket since we have chosen SUB here
assert (zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0));
Pieter Hintjens's avatar
Pieter Hintjens committed
77 78 79

//  Start the queue proxy, which runs until ETERM or "TERMINATE" 
//  received on the control socket
80
zmq_proxy_steerable (frontend, backend, NULL, control);
81 82 83 84 85 86
----
.Set up a controller in another node, process or whatever
----
void *control = zmq_socket (context, ZMQ_PUB);
assert (control);
assert (zmq_bind (control, "tcp://*:5557") == 0);
Pieter Hintjens's avatar
Pieter Hintjens committed
87

88 89
// pause the proxy
assert (zmq_send (control, "PAUSE", 5, 0) == 0);
Pieter Hintjens's avatar
Pieter Hintjens committed
90

91
// resume the proxy
92
assert (zmq_send (control, "RESUME", 6, 0) == 0);
Pieter Hintjens's avatar
Pieter Hintjens committed
93

94
// terminate the proxy
95
assert (zmq_send (control, "TERMINATE", 9, 0) == 0);
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110

// check statistics
assert (zmq_send (control, "STATISTICS", 10, 0) == 0);
zmq_msg_t stats_msg;

while (1) {
    assert (zmq_msg_init (&stats_msg) == 0);
    assert (zmq_recvmsg (control, &stats_msg, 0) == sizeof (uint64_t));
    assert (rc == sizeof (uint64_t));
    printf ("Stat: %lu\n", *(unsigned long int *)zmq_msg_data (&stats_msg));
    if (!zmq_msg_get (&stats_msg, ZMQ_MORE))
        break;
    assert (zmq_msg_close (&stats_msg) == 0);
}
assert (zmq_msg_close (&stats_msg) == 0);
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
---


SEE ALSO
--------
linkzmq:zmq_proxy[3]
linkzmq:zmq_bind[3]
linkzmq:zmq_connect[3]
linkzmq:zmq_socket[3]
linkzmq:zmq[7]


AUTHORS
-------
This page was written by the 0MQ community. To make a change please
read the 0MQ Contribution Policy at <http://www.zeromq.org/docs:contributing>.