Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
188e99c0
Commit
188e99c0
authored
Jan 24, 2014
by
Pieter Hintjens
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #854 from lalebarde/master
add a proxy hook
parents
a7065519
9ae6a91f
Show whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
347 additions
and
21 deletions
+347
-21
zmq_proxy_hook.txt
doc/zmq_proxy_hook.txt
+201
-0
zmq.h
include/zmq.h
+8
-0
proxy.cpp
src/proxy.cpp
+26
-8
proxy.hpp
src/proxy.hpp
+13
-2
zmq.cpp
src/zmq.cpp
+20
-1
test_proxy.cpp
tests/test_proxy.cpp
+79
-10
No files found.
doc/zmq_proxy_hook.txt
0 → 100644
View file @
188e99c0
zmq_proxy_hook(3)
=================
NAME
----
zmq_proxy_hook - start built-in 0MQ proxy with an hook to modify the messages
between the frontend and the backend
SYNOPSIS
--------
*int zmq_proxy_hook (const void '*frontend', const void '*backend',
const void '*capture', const void '*hook', const void '*control');*
DESCRIPTION
-----------
The _zmq_proxy_hook()_ function starts the built-in 0MQ proxy in the
current application thread, as _zmq_proxy()_ or _zmq_proxy_steerable()_ do.
Please, refer to these functions for the general description and usage.
We describe here only the additional hook provided by the structure "hook"
passed as a fith argument.
If the hook structure pointer is not NULL, the proxy supports a hook defined as
a structure 'zmq_proxy_hook_t' containing a data pointer to any data type and
the address of two functions of type 'zmq_hook_f'. The first function,
'front2back_hook' is to manipulate the message received from the frontend, before
it is sent to the backend. The second one, 'back2front_hook' is for the way back.
Both functions receive as an argument in addition to a pointer to the message, the
pointer to the data passed in the 'zmq_proxy_hook_t' structure. This data makes it
possible to manage statefull behaviours in the proxy. They receive also the frame
number n_ which is 1 for the first frame, n for the nth one, 0 for the last one. This
enable to manage specifically the identity frame when ROUTER | STREAM sockets are
concerned. Moreover, to give the hook full capabilities, the three sockets passed
as parameters to the proxy are also provided to the hook functions, enabling to
consume some frames or to add others:
----
typedef int (*zmq_hook_f)(void *frontend, void *backend, void *capture,
zmq_msg_t* msg_, size_t n_, void *data_);
typedef struct zmq_proxy_hook_t {
void *data;
zmq_hook_f front2back_hook;
zmq_hook_f back2front_hook;
} zmq_proxy_hook_t;
----
If the hook pointer is NULL, zmq_proxy_hook behaves exactly as if zmq_proxy
or zmq_proxy_steerable had been called.
Refer to linkzmq:zmq_socket[3] for a description of the available socket types.
Refer to linkzmq:zmq_proxy[3] for a description of the zmq_proxy.
Refer to linkzmq:zmq_proxy_steerable[3] for a description of the zmq_proxy_steerable.
EXAMPLE USAGE
-------------
Filter
------
The most simple use is to simply filter the messages for example against vulgarity.
Messages are simply scanned against a dictionnary and target words are replaced.
ROUTER | STREAM / ROUTER | STREAM proxy
---------------------------------------
The data field enables to multiplex as desired identities in a ROUTER/ROUTER or in a
STREAM/STREAM proxy or what ever. Such architecture enables also custom load balancers.
Sticky ROUTER / ROUTER proxy
----------------------------
The data field enables to manage sticky identity pairing in a ROUTER/ROUTER proxy.
Security mechanism proxying
---------------------------
We expect to be able to proxy CURVE with the use of this feature.
Tests
-----
In an existing application, just change zmq_proxy or zmq_proxy_steerable for
zmq_proxy_hook to test anythink, even "Man in the middle" attacks ws security
mechanisms with a STREAM/STREAM proxy.
RETURN VALUE
------------
The _zmq_proxy_hook()_ function returns the same values than zmq_proxy
or zmq_proxy_steerable in the same conditions of use.
EXAMPLE
-------
This simple example aims at uppercasing the traffic between the frontend and the
backend, and lowercasing it on the way back.
.Setup the hook
----
struct stats_t {
int qt_upper_case;
int qt_lower_case;
} stats = {NULL, 0, 0};
int
upper_case(void*, void*, void*, zmq_msg_t* msg_, size_t n_, void *stats_)
{
size_t size = zmq_msg_size(msg_);
if (!size || n_ == 1) return 0; // skip identity and 0 frames
char* message = (char*) zmq_msg_data(msg_);
for (size_t i = 0; i < size; i++)
if ('a' <= message[i] && message[i] <= 'z')
message[i] += 'A' - 'a';
struct stats_t* stats = (struct stats_t*) stats_;
stats->qt_upper_case++;
return 0;
}
int
lower_case(void*, void*, void*, zmq_msg_t* msg_, size_t n_, void *stats_)
{
size_t size = zmq_msg_size(msg_);
if (!size || n_ == 1) return 0; // skip identity and 0 frames
char* message = (char*) zmq_msg_data(msg_);
for (size_t i = 0; i < size; i++)
if ('A' <= message[i] && message[i] <= 'Z')
message[i] += 'a' - 'A';
struct stats_t* stats = (struct stats_t*) stats_;
stats->qt_lower_case++;
return 0;
}
zmq_proxy_hook_t hook = {
&stats, // data used by the hook functions, passed as void* data_
upper_case, // hook for messages going from frontend to backend
lower_case // hook for messages going from backend to frontend
};
----
.in main:
----
int
main (void)
{
setup_test_environment ();
void *context = zmq_ctx_new ();
assert (context);
// Create frontend, backend and control sockets
void *frontend = zmq_socket (context, ZMQ_ROUTER);
assert (backend);
void *backend = zmq_socket (context, ZMQ_DEALER);
assert (frontend);
void *control = zmq_socket (context, ZMQ_PUB);
assert (control);
// Bind sockets to TCP ports
assert (zmq_bind (frontend, "tcp://*:5555") == 0);
assert (zmq_bind (backend, "tcp://*:5556") == 0);
assert (zmq_connect (control, "tcp://*:5557") == 0);
// Start the queue proxy, which runs until ETERM or "TERMINATE"
// received on the control socket
zmq_proxy_hook (frontend, backend, NULL, &hook, control);
printf("frontend to backend hook hits = %d\nbackend to frontend hook hits = %d\n", stats.qt_upper_case, stats.qt_lower_case);
// close sockets and context
rc = zmq_close (control);
assert (rc == 0);
rc = zmq_close (backend);
assert (rc == 0);
rc = zmq_close (frontend);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0;
}
----
.somewhere, the proxy is stopped with:
----
rc = zmq_send (control, "TERMINATE", 9, 0); // stops the hooked proxy
assert (rc == 9);
----
.cf test_proxy.cpp for a full implementation of this test, with clients and workers.
SEE ALSO
--------
linkzmq:zmq_proxy[3]
linkzmq:zmq_proxy_steerable[3]
linkzmq:zmq_bind[3]
linkzmq:zmq_connect[3]
linkzmq:zmq_socket[3]
linkzmq:zmq[7]
AUTHORS
-------
This page was written by the 0MQ community. To make a change please
read the 0MQ Contribution Policy at <http://www.zeromq.org/docs:contributing>.
include/zmq.h
View file @
188e99c0
...
@@ -400,6 +400,14 @@ ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
...
@@ -400,6 +400,14 @@ ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
ZMQ_EXPORT
int
zmq_proxy
(
void
*
frontend
,
void
*
backend
,
void
*
capture
);
ZMQ_EXPORT
int
zmq_proxy
(
void
*
frontend
,
void
*
backend
,
void
*
capture
);
ZMQ_EXPORT
int
zmq_proxy_steerable
(
void
*
frontend
,
void
*
backend
,
void
*
capture
,
void
*
control
);
ZMQ_EXPORT
int
zmq_proxy_steerable
(
void
*
frontend
,
void
*
backend
,
void
*
capture
,
void
*
control
);
ZMQ_EXPORT
int
zmq_proxy_hook
(
void
*
frontend
,
void
*
backend
,
void
*
capture
,
void
*
hook
,
void
*
control
);
typedef
int
(
*
zmq_hook_f
)(
void
*
frontend
,
void
*
backend
,
void
*
capture
,
zmq_msg_t
*
msg_
,
size_t
n_
,
void
*
data_
);
typedef
struct
zmq_proxy_hook_t
{
void
*
data
;
zmq_hook_f
front2back_hook
;
zmq_hook_f
back2front_hook
;
}
zmq_proxy_hook_t
;
/* Encode a binary key as printable text using ZMQ RFC 32 */
/* Encode a binary key as printable text using ZMQ RFC 32 */
ZMQ_EXPORT
char
*
zmq_z85_encode
(
char
*
dest
,
uint8_t
*
data
,
size_t
size
);
ZMQ_EXPORT
char
*
zmq_z85_encode
(
char
*
dest
,
uint8_t
*
data
,
size_t
size
);
...
...
src/proxy.cpp
View file @
188e99c0
...
@@ -53,7 +53,8 @@
...
@@ -53,7 +53,8 @@
// zmq.h must be included *after* poll.h for AIX to build properly
// zmq.h must be included *after* poll.h for AIX to build properly
#include "../include/zmq.h"
#include "../include/zmq.h"
int
capture
(
int
capture
(
class
zmq
::
socket_base_t
*
capture_
,
class
zmq
::
socket_base_t
*
capture_
,
zmq
::
msg_t
&
msg_
,
zmq
::
msg_t
&
msg_
,
int
more_
=
0
)
int
more_
=
0
)
...
@@ -74,15 +75,18 @@ int capture(
...
@@ -74,15 +75,18 @@ int capture(
return
0
;
return
0
;
}
}
int
forward
(
int
forward
(
class
zmq
::
socket_base_t
*
from_
,
class
zmq
::
socket_base_t
*
from_
,
class
zmq
::
socket_base_t
*
to_
,
class
zmq
::
socket_base_t
*
to_
,
class
zmq
::
socket_base_t
*
capture_
,
class
zmq
::
socket_base_t
*
capture_
,
zmq
::
msg_t
&
msg_
)
zmq
::
msg_t
&
msg_
,
zmq
::
hook_f
do_hook_
,
void
*
data_
)
{
{
int
more
;
int
more
;
size_t
moresz
;
size_t
moresz
;
while
(
true
)
{
for
(
size_t
n
=
1
;;
n
++
)
{
int
rc
=
from_
->
recv
(
&
msg_
,
0
);
int
rc
=
from_
->
recv
(
&
msg_
,
0
);
if
(
unlikely
(
rc
<
0
))
if
(
unlikely
(
rc
<
0
))
return
-
1
;
return
-
1
;
...
@@ -97,6 +101,13 @@ int forward(
...
@@ -97,6 +101,13 @@ int forward(
if
(
unlikely
(
rc
<
0
))
if
(
unlikely
(
rc
<
0
))
return
-
1
;
return
-
1
;
// Hook
if
(
do_hook_
)
{
rc
=
(
*
do_hook_
)(
from_
,
to_
,
capture_
,
&
msg_
,
more
?
n
:
0
,
data_
);
// first message: n == 1, mth message: n == m, last message: n == 0
if
(
unlikely
(
rc
<
0
))
return
-
1
;
}
rc
=
to_
->
send
(
&
msg_
,
more
?
ZMQ_SNDMORE
:
0
);
rc
=
to_
->
send
(
&
msg_
,
more
?
ZMQ_SNDMORE
:
0
);
if
(
unlikely
(
rc
<
0
))
if
(
unlikely
(
rc
<
0
))
return
-
1
;
return
-
1
;
...
@@ -106,12 +117,16 @@ int forward(
...
@@ -106,12 +117,16 @@ int forward(
return
0
;
return
0
;
}
}
int
zmq
::
proxy
(
int
zmq
::
proxy
(
class
socket_base_t
*
frontend_
,
class
socket_base_t
*
frontend_
,
class
socket_base_t
*
backend_
,
class
socket_base_t
*
backend_
,
class
socket_base_t
*
capture_
,
class
socket_base_t
*
capture_
,
class
socket_base_t
*
control_
)
class
socket_base_t
*
control_
,
zmq
::
proxy_hook_t
*
hook_
)
{
{
static
zmq
::
proxy_hook_t
dummy_hook
=
{
NULL
,
NULL
,
NULL
};
msg_t
msg
;
msg_t
msg
;
int
rc
=
msg
.
init
();
int
rc
=
msg
.
init
();
if
(
rc
!=
0
)
if
(
rc
!=
0
)
...
@@ -172,17 +187,20 @@ int zmq::proxy (
...
@@ -172,17 +187,20 @@ int zmq::proxy (
zmq_assert
(
false
);
zmq_assert
(
false
);
}
}
}
}
// Check if a hook is used
if
(
!
hook_
)
hook_
=
&
dummy_hook
;
// Process a request
// Process a request
if
(
state
==
active
if
(
state
==
active
&&
items
[
0
].
revents
&
ZMQ_POLLIN
)
{
&&
items
[
0
].
revents
&
ZMQ_POLLIN
)
{
rc
=
forward
(
frontend_
,
backend_
,
capture_
,
msg
);
rc
=
forward
(
frontend_
,
backend_
,
capture_
,
msg
,
hook_
->
front2back_hook
,
hook_
->
data
);
if
(
unlikely
(
rc
<
0
))
if
(
unlikely
(
rc
<
0
))
return
-
1
;
return
-
1
;
}
}
// Process a reply
// Process a reply
if
(
state
==
active
if
(
state
==
active
&&
items
[
1
].
revents
&
ZMQ_POLLIN
)
{
&&
items
[
1
].
revents
&
ZMQ_POLLIN
)
{
rc
=
forward
(
backend_
,
frontend_
,
capture_
,
msg
);
rc
=
forward
(
backend_
,
frontend_
,
capture_
,
msg
,
hook_
->
back2front_hook
,
hook_
->
data
);
if
(
unlikely
(
rc
<
0
))
if
(
unlikely
(
rc
<
0
))
return
-
1
;
return
-
1
;
}
}
...
...
src/proxy.hpp
View file @
188e99c0
...
@@ -22,11 +22,22 @@
...
@@ -22,11 +22,22 @@
namespace
zmq
namespace
zmq
{
{
typedef
int
(
*
hook_f
)(
void
*
frontend
,
void
*
backend
,
void
*
capture
,
void
*
msg_
,
size_t
n_
,
void
*
data_
);
struct
proxy_hook_t
{
void
*
data
;
hook_f
front2back_hook
;
hook_f
back2front_hook
;
};
int
proxy
(
int
proxy
(
class
socket_base_t
*
frontend_
,
class
socket_base_t
*
frontend_
,
class
socket_base_t
*
backend_
,
class
socket_base_t
*
backend_
,
class
socket_base_t
*
capture_
,
class
socket_base_t
*
capture_
=
NULL
,
class
socket_base_t
*
control_
=
NULL
);
// backward compatibility without this argument
class
socket_base_t
*
control_
=
NULL
,
// backward compatibility without this argument
proxy_hook_t
*
hook_
=
NULL
// backward compatibility without this argument
);
}
}
#endif
#endif
src/zmq.cpp
View file @
188e99c0
...
@@ -1018,6 +1018,11 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
...
@@ -1018,6 +1018,11 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
// The proxy functionality
// The proxy functionality
// Compile time check whether proxy_hook_t fits into zmq_proxy_hook_t.
typedef
char
check_proxy_hook_t_size
[
sizeof
(
zmq
::
proxy_hook_t
)
==
sizeof
(
zmq_proxy_hook_t
)
?
1
:
-
1
];
int
zmq_proxy
(
void
*
frontend_
,
void
*
backend_
,
void
*
capture_
)
int
zmq_proxy
(
void
*
frontend_
,
void
*
backend_
,
void
*
capture_
)
{
{
if
(
!
frontend_
||
!
backend_
)
{
if
(
!
frontend_
||
!
backend_
)
{
...
@@ -1043,11 +1048,25 @@ int zmq_proxy_steerable (void *frontend_, void *backend_, void *capture_, void *
...
@@ -1043,11 +1048,25 @@ int zmq_proxy_steerable (void *frontend_, void *backend_, void *capture_, void *
(
zmq
::
socket_base_t
*
)
control_
);
(
zmq
::
socket_base_t
*
)
control_
);
}
}
int
zmq_proxy_hook
(
void
*
frontend_
,
void
*
backend_
,
void
*
capture_
,
void
*
hook_
,
void
*
control_
)
{
if
(
!
frontend_
||
!
backend_
)
{
errno
=
EFAULT
;
return
-
1
;
}
return
zmq
::
proxy
(
(
zmq
::
socket_base_t
*
)
frontend_
,
(
zmq
::
socket_base_t
*
)
backend_
,
(
zmq
::
socket_base_t
*
)
capture_
,
(
zmq
::
socket_base_t
*
)
control_
,
(
zmq
::
proxy_hook_t
*
)
hook_
);
}
// The deprecated device functionality
// The deprecated device functionality
int
zmq_device
(
int
/* type */
,
void
*
frontend_
,
void
*
backend_
)
int
zmq_device
(
int
/* type */
,
void
*
frontend_
,
void
*
backend_
)
{
{
return
zmq
::
proxy
(
return
zmq
::
proxy
(
(
zmq
::
socket_base_t
*
)
frontend_
,
(
zmq
::
socket_base_t
*
)
frontend_
,
(
zmq
::
socket_base_t
*
)
backend_
,
NULL
);
(
zmq
::
socket_base_t
*
)
backend_
);
}
}
tests/test_proxy.cpp
View file @
188e99c0
...
@@ -41,6 +41,48 @@
...
@@ -41,6 +41,48 @@
#define QT_CLIENTS 3
#define QT_CLIENTS 3
#define is_verbose 0
#define is_verbose 0
// Our test Hook that uppercase the message from the frontend to the backend and vice versa
struct
stats_t
{
void
*
ctx
;
// not usefull for the kook itself, but convenient to provide the thread with it without building an additional struct for arguments
int
qt_upper_case
;
int
qt_lower_case
;
}
stats
=
{
NULL
,
0
,
0
};
int
upper_case
(
void
*
,
void
*
,
void
*
,
zmq_msg_t
*
msg_
,
size_t
n_
,
void
*
stats_
)
{
size_t
size
=
zmq_msg_size
(
msg_
);
if
(
!
size
||
n_
==
1
)
return
0
;
// skip identity and 0 frames
char
*
message
=
(
char
*
)
zmq_msg_data
(
msg_
);
for
(
size_t
i
=
0
;
i
<
size
;
i
++
)
if
(
'a'
<=
message
[
i
]
&&
message
[
i
]
<=
'z'
)
message
[
i
]
+=
'A'
-
'a'
;
struct
stats_t
*
stats
=
(
struct
stats_t
*
)
stats_
;
stats
->
qt_upper_case
++
;
return
0
;
}
int
lower_case
(
void
*
,
void
*
,
void
*
,
zmq_msg_t
*
msg_
,
size_t
n_
,
void
*
stats_
)
{
size_t
size
=
zmq_msg_size
(
msg_
);
if
(
!
size
||
n_
==
1
)
return
0
;
// skip identity and 0 frames
char
*
message
=
(
char
*
)
zmq_msg_data
(
msg_
);
for
(
size_t
i
=
0
;
i
<
size
;
i
++
)
if
(
'A'
<=
message
[
i
]
&&
message
[
i
]
<=
'Z'
)
message
[
i
]
+=
'a'
-
'A'
;
struct
stats_t
*
stats
=
(
struct
stats_t
*
)
stats_
;
stats
->
qt_lower_case
++
;
return
0
;
}
zmq_proxy_hook_t
hook
=
{
&
stats
,
// data used by the hook functions if needed, NULL otherwise
upper_case
,
// hook for messages going from frontend to backend
lower_case
// hook for messages going from backend to frontend
};
static
void
static
void
client_task
(
void
*
ctx
)
client_task
(
void
*
ctx
)
{
{
...
@@ -86,13 +128,19 @@ client_task (void *ctx)
...
@@ -86,13 +128,19 @@ client_task (void *ctx)
}
}
if
(
items
[
1
].
revents
&
ZMQ_POLLIN
)
{
if
(
items
[
1
].
revents
&
ZMQ_POLLIN
)
{
rc
=
zmq_recv
(
control
,
content
,
CONTENT_SIZE_MAX
,
0
);
rc
=
zmq_recv
(
control
,
content
,
CONTENT_SIZE_MAX
,
0
);
if
(
is_verbose
)
printf
(
"client receive - identity = %s command = %s
\n
"
,
identity
,
content
);
if
(
rc
>
0
)
{
if
(
memcmp
(
content
,
"TERMINATE"
,
9
)
==
0
)
{
if
(
is_verbose
)
{
if
(
rc
==
9
&&
memcmp
(
content
,
"TERMINATE"
,
9
)
==
0
)
content
[
9
]
=
'\0'
;
// required to have a clean output since '\0' is not included in the command
printf
(
"client receive - identity = %s command = %s
\n
"
,
identity
,
content
);
}
if
(
memcmp
(
content
,
"STOP"
,
4
)
==
0
)
{
run
=
false
;
run
=
false
;
break
;
break
;
}
}
}
}
}
}
}
sprintf
(
content
,
"request #%03d"
,
++
request_nbr
);
// CONTENT_SIZE
sprintf
(
content
,
"request #%03d"
,
++
request_nbr
);
// CONTENT_SIZE
rc
=
zmq_send
(
client
,
content
,
CONTENT_SIZE
,
0
);
rc
=
zmq_send
(
client
,
content
,
CONTENT_SIZE
,
0
);
assert
(
rc
==
CONTENT_SIZE
);
assert
(
rc
==
CONTENT_SIZE
);
...
@@ -113,8 +161,11 @@ client_task (void *ctx)
...
@@ -113,8 +161,11 @@ client_task (void *ctx)
static
void
server_worker
(
void
*
ctx
);
static
void
server_worker
(
void
*
ctx
);
void
void
server_task
(
void
*
ctx
)
server_task
(
void
*
arg
)
{
{
zmq_proxy_hook_t
*
hook
=
(
zmq_proxy_hook_t
*
)
arg
;
struct
stats_t
*
stats
=
(
struct
stats_t
*
)
hook
->
data
;
void
*
ctx
=
stats
->
ctx
;
// Frontend socket talks to clients over TCP
// Frontend socket talks to clients over TCP
void
*
frontend
=
zmq_socket
(
ctx
,
ZMQ_ROUTER
);
void
*
frontend
=
zmq_socket
(
ctx
,
ZMQ_ROUTER
);
assert
(
frontend
);
assert
(
frontend
);
...
@@ -142,7 +193,13 @@ server_task (void *ctx)
...
@@ -142,7 +193,13 @@ server_task (void *ctx)
threads
[
thread_nbr
]
=
zmq_threadstart
(
&
server_worker
,
ctx
);
threads
[
thread_nbr
]
=
zmq_threadstart
(
&
server_worker
,
ctx
);
// Connect backend to frontend via a proxy
// Connect backend to frontend via a proxy
zmq_proxy_steerable
(
frontend
,
backend
,
NULL
,
control
);
if
(
is_verbose
)
printf
(
"---------- standard proxy ----------
\n
"
);
zmq_proxy_steerable
(
frontend
,
backend
,
NULL
,
control
);
// until TERMINATE is sent on control
// Connect backend to frontend via a hooked proxy
if
(
is_verbose
)
printf
(
"---------- hooked proxy ----------
\n
"
);
zmq_proxy_hook
(
frontend
,
backend
,
NULL
,
hook
,
control
);
// until TERMINATE is sent on control
for
(
thread_nbr
=
0
;
thread_nbr
<
QT_WORKERS
;
thread_nbr
++
)
for
(
thread_nbr
=
0
;
thread_nbr
<
QT_WORKERS
;
thread_nbr
++
)
zmq_threadclose
(
threads
[
thread_nbr
]);
zmq_threadclose
(
threads
[
thread_nbr
]);
...
@@ -182,9 +239,12 @@ server_worker (void *ctx)
...
@@ -182,9 +239,12 @@ server_worker (void *ctx)
while
(
run
)
{
while
(
run
)
{
rc
=
zmq_recv
(
control
,
content
,
CONTENT_SIZE_MAX
,
ZMQ_DONTWAIT
);
// usually, rc == -1 (no message)
rc
=
zmq_recv
(
control
,
content
,
CONTENT_SIZE_MAX
,
ZMQ_DONTWAIT
);
// usually, rc == -1 (no message)
if
(
rc
>
0
)
{
if
(
rc
>
0
)
{
if
(
is_verbose
)
if
(
is_verbose
)
{
if
(
rc
==
9
&&
memcmp
(
content
,
"TERMINATE"
,
9
)
==
0
)
content
[
9
]
=
'\0'
;
// required to have a clean output since '\0' is not included in the command
printf
(
"server_worker receives command = %s
\n
"
,
content
);
printf
(
"server_worker receives command = %s
\n
"
,
content
);
if
(
memcmp
(
content
,
"TERMINATE"
,
9
)
==
0
)
}
if
(
memcmp
(
content
,
"STOP"
,
4
)
==
0
)
run
=
false
;
run
=
false
;
}
}
// The DEALER socket gives us the reply envelope and message
// The DEALER socket gives us the reply envelope and message
...
@@ -218,7 +278,8 @@ server_worker (void *ctx)
...
@@ -218,7 +278,8 @@ server_worker (void *ctx)
// The main thread simply starts several clients and a server, and then
// The main thread simply starts several clients and a server, and then
// waits for the server to finish.
// waits for the server to finish.
int
main
(
void
)
int
main
(
void
)
{
{
setup_test_environment
();
setup_test_environment
();
...
@@ -233,11 +294,19 @@ int main (void)
...
@@ -233,11 +294,19 @@ int main (void)
void
*
threads
[
QT_CLIENTS
+
1
];
void
*
threads
[
QT_CLIENTS
+
1
];
for
(
int
i
=
0
;
i
<
QT_CLIENTS
;
i
++
)
for
(
int
i
=
0
;
i
<
QT_CLIENTS
;
i
++
)
threads
[
i
]
=
zmq_threadstart
(
&
client_task
,
ctx
);
threads
[
i
]
=
zmq_threadstart
(
&
client_task
,
ctx
);
threads
[
QT_CLIENTS
]
=
zmq_threadstart
(
&
server_task
,
ctx
)
;
stats
.
ctx
=
ctx
;
msleep
(
500
);
// Run for 500 ms then quit
threads
[
QT_CLIENTS
]
=
zmq_threadstart
(
&
server_task
,
&
hook
);
rc
=
zmq_send
(
control
,
"TERMINATE"
,
9
,
0
);
msleep
(
500
);
// Run for 500 ms the standard proxy
rc
=
zmq_send
(
control
,
"TERMINATE"
,
9
,
0
);
// stops the standard proxy
assert
(
rc
==
9
);
assert
(
rc
==
9
);
msleep
(
200
);
// Run for 200 ms the standard proxy
rc
=
zmq_send
(
control
,
"TERMINATE"
,
9
,
0
);
// stops the hooked proxy
assert
(
rc
==
9
);
rc
=
zmq_send
(
control
,
"STOP"
,
5
,
0
);
// stops clients and workers (\0 is sent to ease the printf of the verbose mode)
assert
(
rc
==
5
);
if
(
is_verbose
)
printf
(
"frontend to backend hook hits = %d
\n
backend to frontend hook hits = %d
\n
"
,
stats
.
qt_upper_case
,
stats
.
qt_lower_case
);
rc
=
zmq_close
(
control
);
rc
=
zmq_close
(
control
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment