Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
bc25366f
Commit
bc25366f
authored
Feb 13, 2014
by
Laurent Alebarde
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Revert "add proxy_chain, a multi proxies chaining in the same thread feature"
This reverts commit
bc7441f5
.
parent
b54a168d
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
56 additions
and
477 deletions
+56
-477
zmq_proxy_chain.txt
doc/zmq_proxy_chain.txt
+0
-103
zmq.h
include/zmq.h
+0
-2
proxy.cpp
src/proxy.cpp
+30
-58
proxy.hpp
src/proxy.hpp
+5
-5
zmq.cpp
src/zmq.cpp
+21
-28
Makefile.am
tests/Makefile.am
+0
-2
test_proxy_chain.cpp
tests/test_proxy_chain.cpp
+0
-279
No files found.
doc/zmq_proxy_chain.txt
deleted
100644 → 0
View file @
b54a168d
zmq_proxy_chain(3)
==================
NAME
----
zmq_proxy_chain - start built-in 0MQ a proxy chain in the same thread
control flow
SYNOPSIS
--------
*int zmq_proxy_chain (const void '**frontends', const void '**backends',
const void '*capture', const void **hooks_, const void '*control');*
DESCRIPTION
-----------
The _zmq_proxy_chain()_ function starts the built-in 0MQ proxy in the
current application thread, as _zmq_proxy()_, _zmq_proxy_steerable()_, or
_zmq_proxy_hook()_ do. Please, refer to these functions for their general
description and usage. We describe here only the additional proxy chaining
capability.
Note that compared to the other proxy functions, the arguments _frontends_,
_backends_ and _hooks_ receive arrays instead of single values. Say one need
to implement the following architecture:
*Process client proxy1 proxy2 worker*
| |-----------| |----------| |
*socket* cl f1 b1 f2 b2 wk
*endpoint* |c----e1-----b| |c----e2-----b| |c----e3----b|
Note: "c" is for connect, "b" for bind.
With the other proxy functions, one needs typically one thread for each proxy:
----
thread 1: zmq_proxy(f1, b1);
thread 2: zmq_proxy(f2, b2);
----
With _zmq_proxy_chain_, it can be performed with only one thread:
----
void** f = {f1, f2, NULL);
void** b = {b1, b2, NULL);
single thread: zmq_proxy_chain(f, b, NULL, NULL, NULL);
----
Note: the three NULL arguments are for capture, hooks, and control, since
_zmq_proxy_chain_ is built on top of _zmq_proxy_hook_, itself built on top
of _zmq_proxy_steerable_, itself built on top of _zmq_proxy_. Of course, hook and
steering features can be used along with chaining.
We have limited the number of sockets that can be chained in a single command to 10, what
should be largely sufficient. The reason is to avoid dynamic memory allocation.
Arguments frontends and backends shall be arrays of sockets of type void*, terminated
by NULL. Both arrays shall terminate by NULL at the same indice, otherwise, an error is
returned. Argument hooks shall be NULL or of the same length than the socket arrays. No
NULL is required at the end of hooks. Any number of elements may be NULL where no hook
is implemented in some proxies.
Refer to linkzmq:zmq_socket[3] for a description of the available socket types.
Refer to linkzmq:zmq_proxy[3] for a description of the zmq_proxy.
Refer to linkzmq:zmq_proxy_steerable[3] for a description of the zmq_steerable.
Refer to linkzmq:zmq_proxy_hook[3] for a description of the zmq_hook.
EXAMPLE USAGE
-------------
_zmq_proxy_chain_ aims at building protocol layers by easing the chaining of some
proxies typically by chaining:
DEALER | ROUTER <---> STREAM <---> DEALER
in the same thread. Any kind of protocol feature can be added via hooks.
cf also zmq_proxy, zmq_proxy_steerable, zmq_proxy_hook.
RETURN VALUE
------------
The _zmq_proxy_chain()_ function returns 0 if TERMINATE is sent to its
control socket. Otherwise, it returns `-1` and 'errno' set to *ETERM* (the
0MQ 'context' associated with either of the specified sockets was terminated).
EXAMPLE
-------
cf test_proxy_chain.cpp
An example capable of proxying CURVE will be added soon.
SEE ALSO
--------
linkzmq:zmq_proxy[3]
linkzmq:zmq_proxy_steerable[3]
linkzmq:zmq_proxy_hook[3]
linkzmq:zmq_bind[3]
linkzmq:zmq_connect[3]
linkzmq:zmq_socket[3]
linkzmq:zmq[7]
AUTHORS
-------
This page was written by the 0MQ community. To make a change please
read the 0MQ Contribution Policy at <http://www.zeromq.org/docs:contributing>.
include/zmq.h
View file @
bc25366f
...
...
@@ -398,11 +398,9 @@ ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
/* Built-in message proxy (3-way) */
#define ZMQ_PROXY_CHAIN_MAX_LENGTH 10
ZMQ_EXPORT
int
zmq_proxy
(
void
*
frontend
,
void
*
backend
,
void
*
capture
);
ZMQ_EXPORT
int
zmq_proxy_steerable
(
void
*
frontend
,
void
*
backend
,
void
*
capture
,
void
*
control
);
ZMQ_EXPORT
int
zmq_proxy_hook
(
void
*
frontend
,
void
*
backend
,
void
*
capture
,
void
*
hook
,
void
*
control
);
ZMQ_EXPORT
int
zmq_proxy_chain
(
void
**
frontends_
,
void
**
backends_
,
void
*
capture_
,
void
**
hooks_
,
void
*
control_
);
typedef
int
(
*
zmq_hook_f
)(
void
*
frontend
,
void
*
backend
,
void
*
capture
,
zmq_msg_t
*
msg_
,
size_t
n_
,
void
*
data_
);
typedef
struct
zmq_proxy_hook_t
{
...
...
src/proxy.cpp
View file @
bc25366f
...
...
@@ -119,12 +119,14 @@ forward(
int
zmq
::
proxy
(
class
socket_base_t
*
*
frontend_
,
class
socket_base_t
*
*
backend_
,
class
socket_base_t
*
capture_
,
class
socket_base_t
*
control_
,
zmq
::
proxy_hook_t
*
*
hook_
)
class
socket_base_t
*
frontend_
,
class
socket_base_t
*
backend_
,
class
socket_base_t
*
capture_
,
class
socket_base_t
*
control_
,
zmq
::
proxy_hook_t
*
hook_
)
{
static
zmq
::
proxy_hook_t
dummy_hook
=
{
NULL
,
NULL
,
NULL
};
msg_t
msg
;
int
rc
=
msg
.
init
();
if
(
rc
!=
0
)
...
...
@@ -135,41 +137,12 @@ zmq::proxy (
int
more
;
size_t
moresz
;
size_t
n
=
0
;
// number of pair of sockets: the array ends with NULL
for
(;;
n
++
)
{
// counts the number of pair of sockets
if
(
!
frontend_
[
n
]
&&
!
backend_
[
n
])
break
;
if
(
!
frontend_
[
n
]
||
!
backend_
[
n
])
{
errno
=
EFAULT
;
return
-
1
;
}
}
if
(
!
n
)
{
errno
=
EFAULT
;
return
-
1
;
}
// avoid dynamic allocation as we have no guarranty to reach the deallocator => limit the chain length
zmq_assert
(
n
<=
ZMQ_PROXY_CHAIN_MAX_LENGTH
);
zmq_pollitem_t
items
[
2
*
ZMQ_PROXY_CHAIN_MAX_LENGTH
+
1
];
// +1 for the control socket
static
zmq_pollitem_t
null_item
=
{
NULL
,
0
,
ZMQ_POLLIN
,
0
};
static
zmq
::
proxy_hook_t
dummy_hook
=
{
NULL
,
NULL
,
NULL
};
static
zmq
::
proxy_hook_t
*
no_hooks
[
ZMQ_PROXY_CHAIN_MAX_LENGTH
];
if
(
!
hook_
)
hook_
=
no_hooks
;
else
for
(
size_t
i
=
0
;
i
<
n
;
i
++
)
if
(
!
hook_
[
i
])
// Check if a hook is used
hook_
[
i
]
=
&
dummy_hook
;
for
(
size_t
i
=
0
;
i
<
n
;
i
++
)
{
memcpy
(
&
items
[
2
*
i
],
&
null_item
,
sizeof
(
null_item
));
items
[
2
*
i
].
socket
=
frontend_
[
i
];
memcpy
(
&
items
[
2
*
i
+
1
],
&
null_item
,
sizeof
(
null_item
));
items
[
2
*
i
+
1
].
socket
=
backend_
[
i
];
no_hooks
[
i
]
=
&
dummy_hook
;
}
memcpy
(
&
items
[
2
*
n
],
&
null_item
,
sizeof
(
null_item
));
items
[
2
*
n
].
socket
=
control_
;
int
qt_poll_items
=
(
control_
?
2
*
n
+
1
:
2
*
n
);
zmq_pollitem_t
items
[]
=
{
{
frontend_
,
0
,
ZMQ_POLLIN
,
0
},
{
backend_
,
0
,
ZMQ_POLLIN
,
0
},
{
control_
,
0
,
ZMQ_POLLIN
,
0
}
};
int
qt_poll_items
=
(
control_
?
3
:
2
);
// Proxy can be in these three states
enum
{
...
...
@@ -185,7 +158,7 @@ zmq::proxy (
return
-
1
;
// Process a control command if any
if
(
control_
&&
items
[
2
*
n
].
revents
&
ZMQ_POLLIN
)
{
if
(
control_
&&
items
[
2
].
revents
&
ZMQ_POLLIN
)
{
rc
=
control_
->
recv
(
&
msg
,
0
);
if
(
unlikely
(
rc
<
0
))
return
-
1
;
...
...
@@ -214,23 +187,22 @@ zmq::proxy (
zmq_assert
(
false
);
}
}
// process each pair of sockets
for
(
size_t
i
=
0
;
i
<
n
;
i
++
)
{
// Process a request
if
(
state
==
active
&&
items
[
2
*
i
].
revents
&
ZMQ_POLLIN
)
{
rc
=
forward
(
frontend_
[
i
],
backend_
[
i
],
capture_
,
msg
,
hook_
[
i
]
->
front2back_hook
,
hook_
[
i
]
->
data
);
if
(
unlikely
(
rc
<
0
))
return
-
1
;
}
// Process a reply
if
(
state
==
active
&&
items
[
2
*
i
+
1
].
revents
&
ZMQ_POLLIN
)
{
rc
=
forward
(
backend_
[
i
],
frontend_
[
i
],
capture_
,
msg
,
hook_
[
i
]
->
back2front_hook
,
hook_
[
i
]
->
data
);
if
(
unlikely
(
rc
<
0
))
return
-
1
;
}
// Check if a hook is used
if
(
!
hook_
)
hook_
=
&
dummy_hook
;
// Process a request
if
(
state
==
active
&&
items
[
0
].
revents
&
ZMQ_POLLIN
)
{
rc
=
forward
(
frontend_
,
backend_
,
capture_
,
msg
,
hook_
->
front2back_hook
,
hook_
->
data
);
if
(
unlikely
(
rc
<
0
))
return
-
1
;
}
// Process a reply
if
(
state
==
active
&&
items
[
1
].
revents
&
ZMQ_POLLIN
)
{
rc
=
forward
(
backend_
,
frontend_
,
capture_
,
msg
,
hook_
->
back2front_hook
,
hook_
->
data
);
if
(
unlikely
(
rc
<
0
))
return
-
1
;
}
}
return
0
;
...
...
src/proxy.hpp
View file @
bc25366f
...
...
@@ -32,11 +32,11 @@ namespace zmq
};
int
proxy
(
class
socket_base_t
*
*
frontend_
,
class
socket_base_t
*
*
backend_
,
class
socket_base_t
*
capture_
=
NULL
,
class
socket_base_t
*
control_
=
NULL
,
// backward compatibility without this argument
proxy_hook_t
*
*
hook_
=
NULL
// backward compatibility without this argument
class
socket_base_t
*
frontend_
,
class
socket_base_t
*
backend_
,
class
socket_base_t
*
capture_
=
NULL
,
class
socket_base_t
*
control_
=
NULL
,
// backward compatibility without this argument
proxy_hook_t
*
hook_
=
NULL
// backward compatibility without this argument
);
}
...
...
src/zmq.cpp
View file @
bc25366f
...
...
@@ -1025,55 +1025,48 @@ typedef char check_proxy_hook_t_size
int
zmq_proxy
(
void
*
frontend_
,
void
*
backend_
,
void
*
capture_
)
{
zmq
::
socket_base_t
*
frontends_
[]
=
{(
zmq
::
socket_base_t
*
)
frontend_
,
NULL
};
zmq
::
socket_base_t
*
backends_
[]
=
{(
zmq
::
socket_base_t
*
)
backend_
,
NULL
};
if
(
!
frontend_
||
!
backend_
)
{
errno
=
EFAULT
;
return
-
1
;
}
return
zmq
::
proxy
(
(
zmq
::
socket_base_t
*
*
)
frontends
_
,
(
zmq
::
socket_base_t
*
*
)
backends
_
,
(
zmq
::
socket_base_t
*
)
frontend
_
,
(
zmq
::
socket_base_t
*
)
backend
_
,
(
zmq
::
socket_base_t
*
)
capture_
);
}
int
zmq_proxy_steerable
(
void
*
frontend_
,
void
*
backend_
,
void
*
capture_
,
void
*
control_
)
{
zmq
::
socket_base_t
*
frontends_
[]
=
{(
zmq
::
socket_base_t
*
)
frontend_
,
NULL
};
zmq
::
socket_base_t
*
backends_
[]
=
{(
zmq
::
socket_base_t
*
)
backend_
,
NULL
};
if
(
!
frontend_
||
!
backend_
)
{
errno
=
EFAULT
;
return
-
1
;
}
return
zmq
::
proxy
(
(
zmq
::
socket_base_t
*
*
)
frontends
_
,
(
zmq
::
socket_base_t
*
*
)
backends
_
,
(
zmq
::
socket_base_t
*
)
frontend
_
,
(
zmq
::
socket_base_t
*
)
backend
_
,
(
zmq
::
socket_base_t
*
)
capture_
,
(
zmq
::
socket_base_t
*
)
control_
);
}
int
zmq_proxy_hook
(
void
*
frontend_
,
void
*
backend_
,
void
*
capture_
,
void
*
hook_
,
void
*
control_
)
{
zmq
::
socket_base_t
*
frontends_
[]
=
{(
zmq
::
socket_base_t
*
)
frontend_
,
NULL
};
zmq
::
socket_base_t
*
backends_
[]
=
{(
zmq
::
socket_base_t
*
)
backend_
,
NULL
};
zmq
::
proxy_hook_t
*
hooks_
[]
=
{(
zmq
::
proxy_hook_t
*
)
hook_
};
return
zmq
::
proxy
(
(
zmq
::
socket_base_t
**
)
frontends_
,
(
zmq
::
socket_base_t
**
)
backends_
,
(
zmq
::
socket_base_t
*
)
capture_
,
(
zmq
::
socket_base_t
*
)
control_
,
(
zmq
::
proxy_hook_t
**
)
hooks_
);
}
int
zmq_proxy_chain
(
void
**
frontends_
,
void
**
backends_
,
void
*
capture_
,
void
**
hooks_
,
void
*
control_
)
{
if
(
!
frontend_
||
!
backend_
)
{
errno
=
EFAULT
;
return
-
1
;
}
return
zmq
::
proxy
(
(
zmq
::
socket_base_t
*
*
)
frontends
_
,
(
zmq
::
socket_base_t
*
*
)
backends
_
,
(
zmq
::
socket_base_t
*
)
frontend
_
,
(
zmq
::
socket_base_t
*
)
backend
_
,
(
zmq
::
socket_base_t
*
)
capture_
,
(
zmq
::
socket_base_t
*
)
control_
,
(
zmq
::
proxy_hook_t
*
*
)
hooks
_
);
(
zmq
::
proxy_hook_t
*
)
hook
_
);
}
// The deprecated device functionality
int
zmq_device
(
int
/* type */
,
void
*
frontend_
,
void
*
backend_
)
{
zmq
::
socket_base_t
*
frontends_
[]
=
{(
zmq
::
socket_base_t
*
)
frontend_
,
NULL
};
zmq
::
socket_base_t
*
backends_
[]
=
{(
zmq
::
socket_base_t
*
)
backend_
,
NULL
};
return
zmq
::
proxy
(
(
zmq
::
socket_base_t
*
*
)
frontends
_
,
(
zmq
::
socket_base_t
*
*
)
backends
_
);
(
zmq
::
socket_base_t
*
)
frontend
_
,
(
zmq
::
socket_base_t
*
)
backend
_
);
}
tests/Makefile.am
View file @
bc25366f
...
...
@@ -44,7 +44,6 @@ noinst_PROGRAMS = test_system \
test_inproc_connect
\
test_issue_566
\
test_proxy
\
test_proxy_chain
\
test_abstract_ipc
\
test_many_sockets
\
test_ipc_wildcard
\
...
...
@@ -111,7 +110,6 @@ test_conflate_SOURCES = test_conflate.cpp
test_inproc_connect_SOURCES
=
test_inproc_connect.cpp
test_issue_566_SOURCES
=
test_issue_566.cpp
test_proxy_SOURCES
=
test_proxy.cpp
test_proxy_chain_SOURCES
=
test_proxy_chain.cpp
test_abstract_ipc_SOURCES
=
test_abstract_ipc.cpp
test_many_sockets_SOURCES
=
test_many_sockets.cpp
test_ipc_wildcard_SOURCES
=
test_ipc_wildcard.cpp
...
...
tests/test_proxy_chain.cpp
deleted
100644 → 0
View file @
b54a168d
/*
Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
#include "../include/zmq_utils.h"
// Asynchronous client-to-server (DEALER to ROUTER) - pure libzmq
//
// While this example runs in a single process, that is to make
// it easier to start and stop the example. Each task may have its own
// context and conceptually acts as a separate process. To have this
// behaviour, it is necessary to replace the inproc transport of the
// control socket by a tcp transport.
// This is our client task
// It connects to the server, and then sends a request once per second
// It collects responses as they arrive, and it prints them out. We will
// run several client tasks in parallel, each with a different random ID.
#define CONTENT_SIZE 13
#define CONTENT_SIZE_MAX 32
#define ID_SIZE 10
#define ID_SIZE_MAX 32
#define QT_WORKERS 1
#define QT_CLIENTS 1
#define is_verbose 0
static
void
client_task
(
void
*
ctx
)
{
void
*
client
=
zmq_socket
(
ctx
,
ZMQ_DEALER
);
assert
(
client
);
// Control socket receives terminate command from main over inproc
void
*
control
=
zmq_socket
(
ctx
,
ZMQ_SUB
);
assert
(
control
);
int
rc
=
zmq_setsockopt
(
control
,
ZMQ_SUBSCRIBE
,
""
,
0
);
assert
(
rc
==
0
);
rc
=
zmq_connect
(
control
,
"inproc://control"
);
assert
(
rc
==
0
);
char
content
[
CONTENT_SIZE_MAX
];
// Set random identity to make tracing easier
char
identity
[
ID_SIZE
];
sprintf
(
identity
,
"%04X-%04X"
,
rand
()
%
0xFFFF
,
rand
()
%
0xFFFF
);
rc
=
zmq_setsockopt
(
client
,
ZMQ_IDENTITY
,
identity
,
ID_SIZE
);
// includes '\0' as an helper for printf
assert
(
rc
==
0
);
rc
=
zmq_connect
(
client
,
"tcp://127.0.0.1:9999"
);
assert
(
rc
==
0
);
zmq_pollitem_t
items
[]
=
{
{
client
,
0
,
ZMQ_POLLIN
,
0
},
{
control
,
0
,
ZMQ_POLLIN
,
0
}
};
int
request_nbr
=
0
;
bool
run
=
true
;
while
(
run
)
{
// Tick once per 200 ms, pulling in arriving messages
int
centitick
;
for
(
centitick
=
0
;
centitick
<
20
;
centitick
++
)
{
zmq_poll
(
items
,
2
,
10
);
if
(
items
[
0
].
revents
&
ZMQ_POLLIN
)
{
int
rcvmore
;
size_t
sz
=
sizeof
(
rcvmore
);
rc
=
zmq_recv
(
client
,
content
,
CONTENT_SIZE_MAX
,
0
);
assert
(
rc
==
CONTENT_SIZE
);
if
(
is_verbose
)
printf
(
"client receive - identity = %s content = %s
\n
"
,
identity
,
content
);
// Check that message is still the same
assert
(
memcmp
(
content
,
"request #"
,
9
)
==
0
);
rc
=
zmq_getsockopt
(
client
,
ZMQ_RCVMORE
,
&
rcvmore
,
&
sz
);
assert
(
rc
==
0
);
assert
(
!
rcvmore
);
}
if
(
items
[
1
].
revents
&
ZMQ_POLLIN
)
{
rc
=
zmq_recv
(
control
,
content
,
CONTENT_SIZE_MAX
,
0
);
if
(
rc
>
0
)
{
if
(
is_verbose
)
{
if
(
rc
==
9
&&
memcmp
(
content
,
"TERMINATE"
,
9
)
==
0
)
content
[
9
]
=
'\0'
;
// required to have a clean output since '\0' is not included in the command
printf
(
"client receive - identity = %s command = %s
\n
"
,
identity
,
content
);
}
if
(
memcmp
(
content
,
"TERMINATE"
,
9
)
==
0
)
{
run
=
false
;
break
;
}
}
}
}
sprintf
(
content
,
"request #%03d"
,
++
request_nbr
);
// CONTENT_SIZE
rc
=
zmq_send
(
client
,
content
,
CONTENT_SIZE
,
0
);
assert
(
rc
==
CONTENT_SIZE
);
}
rc
=
zmq_close
(
client
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
control
);
assert
(
rc
==
0
);
}
// This is our server task.
// It uses the multithreaded server model to deal requests out to a pool
// of workers and route replies back to clients. One worker can handle
// one request at a time but one client can talk to multiple workers at
// once.
static
void
server_worker
(
void
*
ctx
);
void
server_task
(
void
*
ctx
)
{
// Frontend socket talks to clients over TCP
void
*
frontend
=
zmq_socket
(
ctx
,
ZMQ_ROUTER
);
assert
(
frontend
);
int
rc
=
zmq_bind
(
frontend
,
"tcp://127.0.0.1:9999"
);
assert
(
rc
==
0
);
// Intermediate 1
void
*
intermediate1
=
zmq_socket
(
ctx
,
ZMQ_DEALER
);
assert
(
intermediate1
);
rc
=
zmq_connect
(
intermediate1
,
"inproc://intermediate"
);
assert
(
rc
==
0
);
// Intermediate 2
void
*
intermediate2
=
zmq_socket
(
ctx
,
ZMQ_DEALER
);
assert
(
intermediate2
);
rc
=
zmq_bind
(
intermediate2
,
"inproc://intermediate"
);
assert
(
rc
==
0
);
// Backend socket talks to workers over inproc
void
*
backend
=
zmq_socket
(
ctx
,
ZMQ_DEALER
);
assert
(
backend
);
rc
=
zmq_bind
(
backend
,
"inproc://backend"
);
assert
(
rc
==
0
);
// Control socket receives terminate command from main over inproc
void
*
control
=
zmq_socket
(
ctx
,
ZMQ_SUB
);
assert
(
control
);
rc
=
zmq_setsockopt
(
control
,
ZMQ_SUBSCRIBE
,
""
,
0
);
assert
(
rc
==
0
);
rc
=
zmq_connect
(
control
,
"inproc://control"
);
assert
(
rc
==
0
);
// Launch pool of worker threads, precise number is not critical
int
thread_nbr
;
void
*
threads
[
5
];
for
(
thread_nbr
=
0
;
thread_nbr
<
QT_WORKERS
;
thread_nbr
++
)
threads
[
thread_nbr
]
=
zmq_threadstart
(
&
server_worker
,
ctx
);
void
*
frontends
[]
=
{
frontend
,
intermediate2
,
NULL
};
void
*
backends
[]
=
{
intermediate1
,
backend
,
NULL
};
// Connect backend to frontend via a proxy
zmq_proxy_chain
(
frontends
,
backends
,
NULL
,
NULL
,
control
);
// until TERMINATE is sent on control
for
(
thread_nbr
=
0
;
thread_nbr
<
QT_WORKERS
;
thread_nbr
++
)
zmq_threadclose
(
threads
[
thread_nbr
]);
rc
=
zmq_close
(
frontend
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
intermediate1
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
intermediate2
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
backend
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
control
);
assert
(
rc
==
0
);
}
// Each worker task works on one request at a time and sends a random number
// of replies back, with random delays between replies:
// The comments in the first column, if suppressed, makes it a poller version
static
void
server_worker
(
void
*
ctx
)
{
void
*
worker
=
zmq_socket
(
ctx
,
ZMQ_DEALER
);
assert
(
worker
);
int
rc
=
zmq_connect
(
worker
,
"inproc://backend"
);
assert
(
rc
==
0
);
// Control socket receives terminate command from main over inproc
void
*
control
=
zmq_socket
(
ctx
,
ZMQ_SUB
);
assert
(
control
);
rc
=
zmq_setsockopt
(
control
,
ZMQ_SUBSCRIBE
,
""
,
0
);
assert
(
rc
==
0
);
rc
=
zmq_connect
(
control
,
"inproc://control"
);
assert
(
rc
==
0
);
char
content
[
CONTENT_SIZE_MAX
];
// bigger than what we need to check that
char
identity
[
ID_SIZE_MAX
];
// the size received is the size sent
bool
run
=
true
;
while
(
run
)
{
rc
=
zmq_recv
(
control
,
content
,
CONTENT_SIZE_MAX
,
ZMQ_DONTWAIT
);
// usually, rc == -1 (no message)
if
(
rc
>
0
)
{
if
(
is_verbose
)
{
if
(
rc
==
9
&&
memcmp
(
content
,
"TERMINATE"
,
9
)
==
0
)
content
[
9
]
=
'\0'
;
// required to have a clean output since '\0' is not included in the command
printf
(
"server_worker receives command = %s
\n
"
,
content
);
}
if
(
memcmp
(
content
,
"TERMINATE"
,
9
)
==
0
)
run
=
false
;
}
// The DEALER socket gives us the reply envelope and message
// if we don't poll, we have to use ZMQ_DONTWAIT, if we poll, we can block-receive with 0
rc
=
zmq_recv
(
worker
,
identity
,
ID_SIZE_MAX
,
ZMQ_DONTWAIT
);
if
(
rc
==
ID_SIZE
)
{
rc
=
zmq_recv
(
worker
,
content
,
CONTENT_SIZE_MAX
,
0
);
assert
(
rc
==
CONTENT_SIZE
);
if
(
is_verbose
)
printf
(
"server receive - identity = %s content = %s
\n
"
,
identity
,
content
);
// Send 0..4 replies back
int
reply
,
replies
=
1
;
//rand() % 5;
for
(
reply
=
0
;
reply
<
replies
;
reply
++
)
{
// Sleep for some fraction of a second
msleep
(
rand
()
%
10
+
1
);
// Send message from server to client
rc
=
zmq_send
(
worker
,
identity
,
ID_SIZE
,
ZMQ_SNDMORE
);
assert
(
rc
==
ID_SIZE
);
rc
=
zmq_send
(
worker
,
content
,
CONTENT_SIZE
,
0
);
assert
(
rc
==
CONTENT_SIZE
);
}
}
}
rc
=
zmq_close
(
worker
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
control
);
assert
(
rc
==
0
);
}
// The main thread simply starts several clients and a server, and then
// waits for the server to finish.
int
main
(
void
)
{
setup_test_environment
();
void
*
ctx
=
zmq_ctx_new
();
assert
(
ctx
);
// Control socket receives terminate command from main over inproc
void
*
control
=
zmq_socket
(
ctx
,
ZMQ_PUB
);
assert
(
control
);
int
rc
=
zmq_bind
(
control
,
"inproc://control"
);
assert
(
rc
==
0
);
void
*
threads
[
QT_CLIENTS
+
1
];
for
(
int
i
=
0
;
i
<
QT_CLIENTS
;
i
++
)
threads
[
i
]
=
zmq_threadstart
(
&
client_task
,
ctx
);
threads
[
QT_CLIENTS
]
=
zmq_threadstart
(
&
server_task
,
ctx
);
msleep
(
1000
);
// Run for 500 ms the standard proxy
rc
=
zmq_send
(
control
,
"TERMINATE"
,
9
,
0
);
assert
(
rc
==
9
);
rc
=
zmq_close
(
control
);
assert
(
rc
==
0
);
for
(
int
i
=
0
;
i
<
QT_CLIENTS
+
1
;
i
++
)
zmq_threadclose
(
threads
[
i
]);
rc
=
zmq_ctx_term
(
ctx
);
assert
(
rc
==
0
);
return
0
;
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment