Unverified Commit 2f5c2f4a authored by Luca Boccassi's avatar Luca Boccassi Committed by GitHub

Merge pull request #3440 from emtr/proxy-performance-fix

Proxy performance fix
parents 2e646486 d41ed618
...@@ -168,6 +168,7 @@ remote_lat ...@@ -168,6 +168,7 @@ remote_lat
remote_thr remote_thr
inproc_lat inproc_lat
inproc_thr inproc_thr
proxy_thr
benchmark_radix_tree benchmark_radix_tree
!local_lat/ !local_lat/
!local_thr/ !local_thr/
......
...@@ -1292,7 +1292,8 @@ if(BUILD_SHARED) ...@@ -1292,7 +1292,8 @@ if(BUILD_SHARED)
local_thr local_thr
remote_thr remote_thr
inproc_lat inproc_lat
inproc_thr) inproc_thr
proxy_thr)
if(NOT CMAKE_BUILD_TYPE STREQUAL "Debug") # Why? if(NOT CMAKE_BUILD_TYPE STREQUAL "Debug") # Why?
option(WITH_PERF_TOOL "Build with perf-tools" ON) option(WITH_PERF_TOOL "Build with perf-tools" ON)
......
...@@ -341,7 +341,8 @@ noinst_PROGRAMS = \ ...@@ -341,7 +341,8 @@ noinst_PROGRAMS = \
perf/local_thr \ perf/local_thr \
perf/remote_thr \ perf/remote_thr \
perf/inproc_lat \ perf/inproc_lat \
perf/inproc_thr perf/inproc_thr \
perf/proxy_thr
perf_local_lat_LDADD = src/libzmq.la perf_local_lat_LDADD = src/libzmq.la
perf_local_lat_SOURCES = perf/local_lat.cpp perf_local_lat_SOURCES = perf/local_lat.cpp
...@@ -361,6 +362,9 @@ perf_inproc_lat_SOURCES = perf/inproc_lat.cpp ...@@ -361,6 +362,9 @@ perf_inproc_lat_SOURCES = perf/inproc_lat.cpp
perf_inproc_thr_LDADD = src/libzmq.la perf_inproc_thr_LDADD = src/libzmq.la
perf_inproc_thr_SOURCES = perf/inproc_thr.cpp perf_inproc_thr_SOURCES = perf/inproc_thr.cpp
perf_proxy_thr_LDADD = src/libzmq.la
perf_proxy_thr_SOURCES = perf/proxy_thr.cpp
if ENABLE_STATIC if ENABLE_STATIC
noinst_PROGRAMS += \ noinst_PROGRAMS += \
perf/benchmark_radix_tree perf/benchmark_radix_tree
......
# Permission to Relicense under MPLv2 or any other OSI approved license chosen by the current ZeroMQ BDFL
This is a statement by Claudio Biagi that grants permission to relicense
its copyrights in the libzmq C++ library (ZeroMQ) under the Mozilla Public
License v2 (MPLv2) or any other Open Source Initiative approved license
chosen by the current ZeroMQ BDFL (Benevolent Dictator for Life).
A portion of the commits made by the Github handle "emtr", with
commit author "Claudio Biagi <retmt@gmx.com>", are copyright
of Claudio Biagi. This document hereby grants the libzmq project
team to relicense libzmq, including all past, present and future
contributions of the author listed above.
Claudio Biagi
2019/03/07
This diff is collapsed.
...@@ -70,6 +70,11 @@ enum ...@@ -70,6 +70,11 @@ enum
// Maximum number of events the I/O thread can process in one go. // Maximum number of events the I/O thread can process in one go.
max_io_events = 256, max_io_events = 256,
// Maximal batch size of packets forwarded by a ZMQ proxy.
// Increasing this value improves throughput at the expense of
// latency and fairness.
proxy_burst_size = 1000,
// Maximal delay to process command in API thread (in CPU ticks). // Maximal delay to process command in API thread (in CPU ticks).
// 3,000,000 ticks equals to 1 - 2 milliseconds on current CPUs. // 3,000,000 ticks equals to 1 - 2 milliseconds on current CPUs.
// Note that delay is only applied when there is continuous stream of // Note that delay is only applied when there is continuous stream of
......
...@@ -116,39 +116,48 @@ int forward (class zmq::socket_base_t *from_, ...@@ -116,39 +116,48 @@ int forward (class zmq::socket_base_t *from_,
class zmq::socket_base_t *capture_, class zmq::socket_base_t *capture_,
zmq::msg_t *msg_) zmq::msg_t *msg_)
{ {
int more; // Forward a burst of messages
size_t moresz; for (unsigned int i = 0; i < zmq::proxy_burst_size; i++) {
size_t complete_msg_size = 0; int more;
while (true) { size_t moresz;
int rc = from_->recv (msg_, 0); size_t complete_msg_size = 0;
if (unlikely (rc < 0))
return -1; // Forward all the parts of one message
while (true) {
int rc = from_->recv (msg_, ZMQ_DONTWAIT);
if (rc < 0) {
if (likely (errno == EAGAIN && i > 0))
return 0; // End of burst
else
return -1;
}
complete_msg_size += msg_->size (); complete_msg_size += msg_->size ();
moresz = sizeof more; moresz = sizeof more;
rc = from_->getsockopt (ZMQ_RCVMORE, &more, &moresz); rc = from_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
if (unlikely (rc < 0)) if (unlikely (rc < 0))
return -1; return -1;
// Copy message to capture socket if any // Copy message to capture socket if any
rc = capture (capture_, msg_, more); rc = capture (capture_, msg_, more);
if (unlikely (rc < 0)) if (unlikely (rc < 0))
return -1; return -1;
rc = to_->send (msg_, more ? ZMQ_SNDMORE : 0); rc = to_->send (msg_, more ? ZMQ_SNDMORE : 0);
if (unlikely (rc < 0)) if (unlikely (rc < 0))
return -1; return -1;
if (more == 0) if (more == 0)
break; break;
} }
// A multipart message counts as 1 packet: // A multipart message counts as 1 packet:
from_stats_->msg_in++; from_stats_->msg_in++;
from_stats_->bytes_in += complete_msg_size; from_stats_->bytes_in += complete_msg_size;
to_stats_->msg_out++; to_stats_->msg_out++;
to_stats_->bytes_out += complete_msg_size; to_stats_->bytes_out += complete_msg_size;
}
return 0; return 0;
} }
......
...@@ -103,9 +103,9 @@ static void client_task (void *db_) ...@@ -103,9 +103,9 @@ static void client_task (void *db_)
rc = zmq_connect (control, "inproc://control"); rc = zmq_connect (control, "inproc://control");
assert (rc == 0); assert (rc == 0);
char content[CONTENT_SIZE_MAX]; char content[CONTENT_SIZE_MAX] = {};
// Set random routing id to make tracing easier // Set random routing id to make tracing easier
char routing_id[ROUTING_ID_SIZE]; char routing_id[ROUTING_ID_SIZE] = {};
sprintf (routing_id, "%04X-%04X", rand () % 0xFFFF, rand () % 0xFFFF); sprintf (routing_id, "%04X-%04X", rand () % 0xFFFF, rand () % 0xFFFF);
rc = rc =
zmq_setsockopt (client, ZMQ_ROUTING_ID, routing_id, zmq_setsockopt (client, ZMQ_ROUTING_ID, routing_id,
...@@ -291,8 +291,10 @@ static void server_worker (void *ctx_) ...@@ -291,8 +291,10 @@ static void server_worker (void *ctx_)
rc = zmq_connect (control, "inproc://control"); rc = zmq_connect (control, "inproc://control");
assert (rc == 0); assert (rc == 0);
char content[CONTENT_SIZE_MAX]; // bigger than what we need to check that char content[CONTENT_SIZE_MAX] =
char routing_id[ROUTING_ID_SIZE_MAX]; // the size received is the size sent {}; // bigger than what we need to check that
char routing_id[ROUTING_ID_SIZE_MAX] =
{}; // the size received is the size sent
bool run = true; bool run = true;
bool keep_sending = true; bool keep_sending = true;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment