Commit d41ed618 authored by Claudio Biagi's avatar Claudio Biagi

Proxy performance fix, ticket #3439

Improve performance of the proxy forwarding batch of message.
Add throughput benchmark for proxy.
Fix valgrind error reported on unitialized vars
RELICENSE: Add emtr grant
parent 2e646486
...@@ -168,6 +168,7 @@ remote_lat ...@@ -168,6 +168,7 @@ remote_lat
remote_thr remote_thr
inproc_lat inproc_lat
inproc_thr inproc_thr
proxy_thr
benchmark_radix_tree benchmark_radix_tree
!local_lat/ !local_lat/
!local_thr/ !local_thr/
......
...@@ -1292,7 +1292,8 @@ if(BUILD_SHARED) ...@@ -1292,7 +1292,8 @@ if(BUILD_SHARED)
local_thr local_thr
remote_thr remote_thr
inproc_lat inproc_lat
inproc_thr) inproc_thr
proxy_thr)
if(NOT CMAKE_BUILD_TYPE STREQUAL "Debug") # Why? if(NOT CMAKE_BUILD_TYPE STREQUAL "Debug") # Why?
option(WITH_PERF_TOOL "Build with perf-tools" ON) option(WITH_PERF_TOOL "Build with perf-tools" ON)
......
...@@ -341,7 +341,8 @@ noinst_PROGRAMS = \ ...@@ -341,7 +341,8 @@ noinst_PROGRAMS = \
perf/local_thr \ perf/local_thr \
perf/remote_thr \ perf/remote_thr \
perf/inproc_lat \ perf/inproc_lat \
perf/inproc_thr perf/inproc_thr \
perf/proxy_thr
perf_local_lat_LDADD = src/libzmq.la perf_local_lat_LDADD = src/libzmq.la
perf_local_lat_SOURCES = perf/local_lat.cpp perf_local_lat_SOURCES = perf/local_lat.cpp
...@@ -361,6 +362,9 @@ perf_inproc_lat_SOURCES = perf/inproc_lat.cpp ...@@ -361,6 +362,9 @@ perf_inproc_lat_SOURCES = perf/inproc_lat.cpp
perf_inproc_thr_LDADD = src/libzmq.la perf_inproc_thr_LDADD = src/libzmq.la
perf_inproc_thr_SOURCES = perf/inproc_thr.cpp perf_inproc_thr_SOURCES = perf/inproc_thr.cpp
perf_proxy_thr_LDADD = src/libzmq.la
perf_proxy_thr_SOURCES = perf/proxy_thr.cpp
if ENABLE_STATIC if ENABLE_STATIC
noinst_PROGRAMS += \ noinst_PROGRAMS += \
perf/benchmark_radix_tree perf/benchmark_radix_tree
......
# Permission to Relicense under MPLv2 or any other OSI approved license chosen by the current ZeroMQ BDFL
This is a statement by Claudio Biagi that grants permission to relicense
its copyrights in the libzmq C++ library (ZeroMQ) under the Mozilla Public
License v2 (MPLv2) or any other Open Source Initiative approved license
chosen by the current ZeroMQ BDFL (Benevolent Dictator for Life).
A portion of the commits made by the Github handle "emtr", with
commit author "Claudio Biagi <retmt@gmx.com>", are copyright
of Claudio Biagi. This document hereby grants the libzmq project
team to relicense libzmq, including all past, present and future
contributions of the author listed above.
Claudio Biagi
2019/03/07
/*
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zmq.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <time.h>
#include <stdarg.h>
#include <string.h>
#include <string>
#include "platform.hpp"
#if defined ZMQ_HAVE_WINDOWS
#include <windows.h>
#include <process.h>
#else
#include <pthread.h>
#include <unistd.h>
#endif
/*
Asynchronous proxy benchmark using ZMQ_XPUB_NODROP.
Topology:
XPUB SUB
| |
+-----> XSUB -> XPUB -----/
| ^^^^^^^^^^^^
XPUB ZMQ proxy
All connections use "inproc" transport. The two XPUB sockets start
flooding the proxy. The throughput is computed using the bytes received
in the SUB socket.
*/
#define HWM 10000
#ifndef ARRAY_SIZE
#define ARRAY_SIZE(x) (sizeof (x) / sizeof (*x))
#endif
#define TEST_ASSERT_SUCCESS_ERRNO(expr) \
test_assert_success_message_errno_helper (expr, NULL, #expr)
static uint64_t message_count = 0;
static size_t message_size = 0;
typedef struct
{
void *context;
int thread_idx;
const char *frontend_endpoint[4];
const char *backend_endpoint[4];
const char *control_endpoint;
} proxy_hwm_cfg_t;
int test_assert_success_message_errno_helper (int rc_,
const char *msg_,
const char *expr_)
{
if (rc_ == -1) {
char buffer[512];
buffer[sizeof (buffer) - 1] =
0; // to ensure defined behavior with VC++ <= 2013
printf ("%s failed%s%s%s, errno = %i (%s)", expr_,
msg_ ? " (additional info: " : "", msg_ ? msg_ : "",
msg_ ? ")" : "", zmq_errno (), zmq_strerror (zmq_errno ()));
exit (1);
}
return rc_;
}
static void set_hwm (void *skt)
{
int hwm = HWM;
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (skt, ZMQ_SNDHWM, &hwm, sizeof (hwm)));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (skt, ZMQ_RCVHWM, &hwm, sizeof (hwm)));
}
static void publisher_thread_main (void *pvoid)
{
const proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid;
const int idx = cfg->thread_idx;
int optval;
int rc;
void *pubsocket = zmq_socket (cfg->context, ZMQ_XPUB);
assert (pubsocket);
set_hwm (pubsocket);
optval = 1;
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (pubsocket, ZMQ_XPUB_NODROP, &optval, sizeof (optval)));
optval = 1;
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (pubsocket, ZMQ_SNDTIMEO, &optval, sizeof (optval)));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_connect (pubsocket, cfg->frontend_endpoint[idx]));
// Wait before starting TX operations till 1 subscriber has subscribed
// (in this test there's 1 subscriber only)
char buffer[32] = {};
rc = TEST_ASSERT_SUCCESS_ERRNO (
zmq_recv (pubsocket, buffer, sizeof (buffer), 0));
if (rc != 1) {
printf ("invalid response length: expected 1, received %d", rc);
exit (1);
}
if (buffer[0] != 1) {
printf ("invalid response value: expected 1, received %d",
(int) buffer[0]);
exit (1);
}
zmq_msg_t msg_orig;
rc = zmq_msg_init_size (&msg_orig, message_size);
assert (rc == 0);
memset (zmq_msg_data (&msg_orig), 'A', zmq_msg_size (&msg_orig));
uint64_t send_count = 0;
while (send_count < message_count) {
zmq_msg_t msg;
zmq_msg_init (&msg);
rc = zmq_msg_copy (&msg, &msg_orig);
assert (rc == 0);
// Send the message to the socket
rc = zmq_msg_send (&msg, pubsocket, 0);
if (rc != -1) {
send_count++;
} else {
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
}
}
zmq_close (pubsocket);
printf ("publisher thread ended\n");
}
static void subscriber_thread_main (void *pvoid)
{
const proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid;
const int idx = cfg->thread_idx;
void *subsocket = zmq_socket (cfg->context, ZMQ_SUB);
assert (subsocket);
set_hwm (subsocket);
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (subsocket, ZMQ_SUBSCRIBE, 0, 0));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_connect (subsocket, cfg->backend_endpoint[idx]));
// Receive message_count messages
uint64_t rxsuccess = 0;
bool success = true;
while (success) {
zmq_msg_t msg;
int rc = zmq_msg_init (&msg);
assert (rc == 0);
rc = zmq_msg_recv (&msg, subsocket, 0);
if (rc != -1) {
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
rxsuccess++;
}
if (rxsuccess == message_count)
break;
}
// Cleanup
zmq_close (subsocket);
printf ("subscriber thread ended\n");
}
static void proxy_thread_main (void *pvoid)
{
const proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid;
int rc;
// FRONTEND SUB
void *frontend_xsub = zmq_socket (
cfg->context,
ZMQ_XSUB); // the frontend is the one exposed to internal threads (INPROC)
assert (frontend_xsub);
set_hwm (frontend_xsub);
// Bind FRONTEND
for (unsigned int i = 0; i < ARRAY_SIZE (cfg->frontend_endpoint); i++) {
const char *ep = cfg->frontend_endpoint[i];
if (ep != NULL) {
assert (strlen (ep) > 5);
rc = zmq_bind (frontend_xsub, ep);
assert (rc == 0);
}
}
// BACKEND PUB
void *backend_xpub = zmq_socket (
cfg->context,
ZMQ_XPUB); // the backend is the one exposed to the external world (TCP)
assert (backend_xpub);
int optval = 1;
rc =
zmq_setsockopt (backend_xpub, ZMQ_XPUB_NODROP, &optval, sizeof (optval));
assert (rc == 0);
set_hwm (backend_xpub);
// Bind BACKEND
for (unsigned int i = 0; i < ARRAY_SIZE (cfg->backend_endpoint); i++) {
const char *ep = cfg->backend_endpoint[i];
if (ep != NULL) {
assert (strlen (ep) > 5);
rc = zmq_bind (backend_xpub, ep);
assert (rc == 0);
}
}
// CONTROL REP
void *control_rep = zmq_socket (
cfg->context,
ZMQ_REP); // This one is used by the proxy to receive&reply to commands
assert (control_rep);
// Bind CONTROL
rc = zmq_bind (control_rep, cfg->control_endpoint);
assert (rc == 0);
// Start proxying!
zmq_proxy_steerable (frontend_xsub, backend_xpub, NULL, control_rep);
zmq_close (frontend_xsub);
zmq_close (backend_xpub);
zmq_close (control_rep);
printf ("proxy thread ended\n");
}
void terminate_proxy (const proxy_hwm_cfg_t *cfg)
{
// CONTROL REQ
void *control_req = zmq_socket (
cfg->context,
ZMQ_REQ); // This one can be used to send command to the proxy
assert (control_req);
// Connect CONTROL-REQ: a socket to which send commands
int rc = zmq_connect (control_req, cfg->control_endpoint);
assert (rc == 0);
// Ask the proxy to exit: the subscriber has received all messages
rc = zmq_send (control_req, "TERMINATE", 9, 0);
assert (rc == 9);
zmq_close (control_req);
}
// The main thread simply starts some publishers, a proxy,
// and a subscriber. Finish when all packets are received.
int main (int argc, char *argv[])
{
if (argc != 3) {
printf ("usage: inproc_thr <message-size> <message-count>\n");
return 1;
}
message_size = atoi (argv[1]);
message_count = atoi (argv[2]);
printf ("message size: %d [B]\n", (int) message_size);
printf ("message count: %d\n", (int) message_count);
void *context = zmq_ctx_new ();
assert (context);
int rv = zmq_ctx_set (context, ZMQ_IO_THREADS, 4);
assert (rv == 0);
// START ALL SECONDARY THREADS
const char *pub1 = "inproc://perf_pub1";
const char *pub2 = "inproc://perf_pub2";
const char *sub1 = "inproc://perf_backend";
proxy_hwm_cfg_t cfg_global = {};
cfg_global.context = context;
cfg_global.frontend_endpoint[0] = pub1;
cfg_global.frontend_endpoint[1] = pub2;
cfg_global.backend_endpoint[0] = sub1;
cfg_global.control_endpoint = "inproc://ctrl";
// Proxy
proxy_hwm_cfg_t cfg_proxy = cfg_global;
void *proxy = zmq_threadstart (&proxy_thread_main, (void *) &cfg_proxy);
assert (proxy != 0);
// Subscriber 1
proxy_hwm_cfg_t cfg_sub1 = cfg_global;
cfg_sub1.thread_idx = 0;
void *subscriber =
zmq_threadstart (&subscriber_thread_main, (void *) &cfg_sub1);
assert (subscriber != 0);
// Start measuring
void *watch = zmq_stopwatch_start ();
// Publisher 1
proxy_hwm_cfg_t cfg_pub1 = cfg_global;
cfg_pub1.thread_idx = 0;
void *publisher1 =
zmq_threadstart (&publisher_thread_main, (void *) &cfg_pub1);
assert (publisher1 != 0);
// Publisher 2
proxy_hwm_cfg_t cfg_pub2 = cfg_global;
cfg_pub2.thread_idx = 1;
void *publisher2 =
zmq_threadstart (&publisher_thread_main, (void *) &cfg_pub2);
assert (publisher2 != 0);
// Wait for all packets to be received
zmq_threadclose (subscriber);
// Stop measuring
unsigned long elapsed = zmq_stopwatch_stop (watch);
if (elapsed == 0)
elapsed = 1;
unsigned long throughput =
(unsigned long) ((double) message_count / (double) elapsed * 1000000);
double megabits = (double) (throughput * message_size * 8) / 1000000;
printf ("mean throughput: %d [msg/s]\n", (int) throughput);
printf ("mean throughput: %.3f [Mb/s]\n", (double) megabits);
// Wait for the end of publishers...
zmq_threadclose (publisher1);
zmq_threadclose (publisher2);
// ... then close the proxy
terminate_proxy (&cfg_proxy);
zmq_threadclose (proxy);
int rc = zmq_ctx_term (context);
assert (rc == 0);
return 0;
}
...@@ -70,6 +70,11 @@ enum ...@@ -70,6 +70,11 @@ enum
// Maximum number of events the I/O thread can process in one go. // Maximum number of events the I/O thread can process in one go.
max_io_events = 256, max_io_events = 256,
// Maximal batch size of packets forwarded by a ZMQ proxy.
// Increasing this value improves throughput at the expense of
// latency and fairness.
proxy_burst_size = 1000,
// Maximal delay to process command in API thread (in CPU ticks). // Maximal delay to process command in API thread (in CPU ticks).
// 3,000,000 ticks equals to 1 - 2 milliseconds on current CPUs. // 3,000,000 ticks equals to 1 - 2 milliseconds on current CPUs.
// Note that delay is only applied when there is continuous stream of // Note that delay is only applied when there is continuous stream of
......
...@@ -116,13 +116,21 @@ int forward (class zmq::socket_base_t *from_, ...@@ -116,13 +116,21 @@ int forward (class zmq::socket_base_t *from_,
class zmq::socket_base_t *capture_, class zmq::socket_base_t *capture_,
zmq::msg_t *msg_) zmq::msg_t *msg_)
{ {
// Forward a burst of messages
for (unsigned int i = 0; i < zmq::proxy_burst_size; i++) {
int more; int more;
size_t moresz; size_t moresz;
size_t complete_msg_size = 0; size_t complete_msg_size = 0;
// Forward all the parts of one message
while (true) { while (true) {
int rc = from_->recv (msg_, 0); int rc = from_->recv (msg_, ZMQ_DONTWAIT);
if (unlikely (rc < 0)) if (rc < 0) {
if (likely (errno == EAGAIN && i > 0))
return 0; // End of burst
else
return -1; return -1;
}
complete_msg_size += msg_->size (); complete_msg_size += msg_->size ();
...@@ -149,6 +157,7 @@ int forward (class zmq::socket_base_t *from_, ...@@ -149,6 +157,7 @@ int forward (class zmq::socket_base_t *from_,
from_stats_->bytes_in += complete_msg_size; from_stats_->bytes_in += complete_msg_size;
to_stats_->msg_out++; to_stats_->msg_out++;
to_stats_->bytes_out += complete_msg_size; to_stats_->bytes_out += complete_msg_size;
}
return 0; return 0;
} }
......
...@@ -103,9 +103,9 @@ static void client_task (void *db_) ...@@ -103,9 +103,9 @@ static void client_task (void *db_)
rc = zmq_connect (control, "inproc://control"); rc = zmq_connect (control, "inproc://control");
assert (rc == 0); assert (rc == 0);
char content[CONTENT_SIZE_MAX]; char content[CONTENT_SIZE_MAX] = {};
// Set random routing id to make tracing easier // Set random routing id to make tracing easier
char routing_id[ROUTING_ID_SIZE]; char routing_id[ROUTING_ID_SIZE] = {};
sprintf (routing_id, "%04X-%04X", rand () % 0xFFFF, rand () % 0xFFFF); sprintf (routing_id, "%04X-%04X", rand () % 0xFFFF, rand () % 0xFFFF);
rc = rc =
zmq_setsockopt (client, ZMQ_ROUTING_ID, routing_id, zmq_setsockopt (client, ZMQ_ROUTING_ID, routing_id,
...@@ -291,8 +291,10 @@ static void server_worker (void *ctx_) ...@@ -291,8 +291,10 @@ static void server_worker (void *ctx_)
rc = zmq_connect (control, "inproc://control"); rc = zmq_connect (control, "inproc://control");
assert (rc == 0); assert (rc == 0);
char content[CONTENT_SIZE_MAX]; // bigger than what we need to check that char content[CONTENT_SIZE_MAX] =
char routing_id[ROUTING_ID_SIZE_MAX]; // the size received is the size sent {}; // bigger than what we need to check that
char routing_id[ROUTING_ID_SIZE_MAX] =
{}; // the size received is the size sent
bool run = true; bool run = true;
bool keep_sending = true; bool keep_sending = true;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment