Commit 7841b0dd authored by Richard Newton's avatar Richard Newton

Support high water mark on inproc socket connect before bind.

parent b9c09064
......@@ -406,12 +406,7 @@ void zmq::ctx_t::pend_connection (const char *addr_, pending_connection_t &pendi
else
{
// Bind has happened in the mean time, connect directly
it->second.socket->inc_seqnum();
pending_connection_.bind_pipe->set_tid(it->second.socket->get_tid());
command_t cmd;
cmd.type = command_t::bind;
cmd.args.bind.pipe = pending_connection_.bind_pipe;
it->second.socket->process_command(cmd);
connect_inproc_sockets(it->second.socket, it->second.options, pending_connection_, connect_side);
}
endpoints_sync.unlock ();
......@@ -425,43 +420,71 @@ void zmq::ctx_t::connect_pending (const char *addr_, zmq::socket_base_t *bind_so
for (pending_connections_t::iterator p = pending.first; p != pending.second; ++p)
{
bind_socket_->inc_seqnum();
p->second.bind_pipe->set_tid(bind_socket_->get_tid());
connect_inproc_sockets(bind_socket_, endpoints[addr_].options, p->second, bind_side);
}
pending_connections.erase(pending.first, pending.second);
endpoints_sync.unlock ();
}
void zmq::ctx_t::connect_inproc_sockets(zmq::socket_base_t *bind_socket_, options_t& bind_options, pending_connection_t &pending_connection_, side side_)
{
bind_socket_->inc_seqnum();
pending_connection_.bind_pipe->set_tid(bind_socket_->get_tid());
if (side_ == bind_side)
{
command_t cmd;
cmd.type = command_t::bind;
cmd.args.bind.pipe = p->second.bind_pipe;
cmd.args.bind.pipe = pending_connection_.bind_pipe;
bind_socket_->process_command(cmd);
bind_socket_->send_inproc_connected(p->second.endpoint.socket);
bind_socket_->send_inproc_connected(pending_connection_.endpoint.socket);
}
else
{
pending_connection_.connect_pipe->send_bind(bind_socket_, pending_connection_.bind_pipe, false);
}
// Send identities
options_t& bind_options = endpoints[addr_].options;
if (bind_options.recv_identity) {
int sndhwm = 0;
if (pending_connection_.endpoint.options.sndhwm != 0 && bind_options.rcvhwm != 0)
sndhwm = pending_connection_.endpoint.options.sndhwm + bind_options.rcvhwm;
int rcvhwm = 0;
if (pending_connection_.endpoint.options.rcvhwm != 0 && bind_options.sndhwm != 0)
rcvhwm = pending_connection_.endpoint.options.rcvhwm + bind_options.sndhwm;
bool conflate = pending_connection_.endpoint.options.conflate &&
(pending_connection_.endpoint.options.type == ZMQ_DEALER ||
pending_connection_.endpoint.options.type == ZMQ_PULL ||
pending_connection_.endpoint.options.type == ZMQ_PUSH ||
pending_connection_.endpoint.options.type == ZMQ_PUB ||
pending_connection_.endpoint.options.type == ZMQ_SUB);
int hwms [2] = {conflate? -1 : sndhwm, conflate? -1 : rcvhwm};
pending_connection_.connect_pipe->set_hwms(hwms [1], hwms [0]);
pending_connection_.bind_pipe->set_hwms(hwms [0], hwms [1]);
if (bind_options.recv_identity) {
msg_t id;
int rc = id.init_size (p->second.endpoint.options.identity_size);
errno_assert (rc == 0);
memcpy (id.data (), p->second.endpoint.options.identity, p->second.endpoint.options.identity_size);
id.set_flags (msg_t::identity);
bool written = p->second.connect_pipe->write (&id);
zmq_assert (written);
p->second.connect_pipe->flush ();
}
if (p->second.endpoint.options.recv_identity) {
msg_t id;
int rc = id.init_size (bind_options.identity_size);
errno_assert (rc == 0);
memcpy (id.data (), bind_options.identity, bind_options.identity_size);
id.set_flags (msg_t::identity);
bool written = p->second.bind_pipe->write (&id);
zmq_assert (written);
p->second.bind_pipe->flush ();
}
msg_t id;
int rc = id.init_size (pending_connection_.endpoint.options.identity_size);
errno_assert (rc == 0);
memcpy (id.data (), pending_connection_.endpoint.options.identity, pending_connection_.endpoint.options.identity_size);
id.set_flags (msg_t::identity);
bool written = pending_connection_.connect_pipe->write (&id);
zmq_assert (written);
pending_connection_.connect_pipe->flush ();
}
if (pending_connection_.endpoint.options.recv_identity) {
msg_t id;
int rc = id.init_size (bind_options.identity_size);
errno_assert (rc == 0);
memcpy (id.data (), bind_options.identity, bind_options.identity_size);
id.set_flags (msg_t::identity);
bool written = pending_connection_.bind_pipe->write (&id);
zmq_assert (written);
pending_connection_.bind_pipe->flush ();
}
pending_connections.erase(pending.first, pending.second);
endpoints_sync.unlock ();
}
// The last used socket ID, or 0 if no socket was used so far. Note that this
......
......@@ -195,6 +195,8 @@ namespace zmq
// the process that created this context. Used to detect forking.
pid_t pid;
#endif
enum side { connect_side, bind_side };
void connect_inproc_sockets(zmq::socket_base_t *bind_socket_, options_t& bind_options, pending_connection_t &pending_connection_, side side_);
};
}
......
......@@ -52,6 +52,7 @@ namespace zmq
ctx_t *get_ctx ();
void process_command (zmq::command_t &cmd_);
void send_inproc_connected (zmq::socket_base_t *socket_);
void send_bind (zmq::own_t *destination_, zmq::pipe_t *pipe_, bool inc_seqnum_ = true);
protected:
......@@ -80,8 +81,6 @@ namespace zmq
zmq::own_t *object_);
void send_attach (zmq::session_base_t *destination_,
zmq::i_engine *engine_, bool inc_seqnum_ = true);
void send_bind (zmq::own_t *destination_, zmq::pipe_t *pipe_,
bool inc_seqnum_ = true);
void send_activate_read (zmq::pipe_t *destination_);
void send_activate_write (zmq::pipe_t *destination_,
uint64_t msgs_read_);
......
......@@ -478,3 +478,8 @@ void zmq::pipe_t::hiccup ()
send_hiccup (peer, (void*) inpipe);
}
void zmq::pipe_t::set_hwms (int inhwm_, int outhwm_)
{
lwm = compute_lwm (inhwm_);
hwm = outhwm_;
}
......@@ -112,6 +112,9 @@ namespace zmq
// before actual shutdown.
void terminate (bool delay_);
// set the high water marks.
void set_hwms (int inhwm_, int outhwm_);
private:
// Type of the underlying lock-free pipe.
......
......@@ -22,61 +22,127 @@
#include <string.h>
#include "testutil.hpp"
int main (void)
const int MAX_SENDS = 10000;
enum TestType { BIND_FIRST, CONNECT_FIRST };
int count_msg (int send_hwm, int recv_hwm, TestType testType)
{
setup_test_environment();
void *ctx = zmq_ctx_new ();
assert (ctx);
int rc;
// Create pair of socket, each with high watermark of 2. Thus the total
// buffer space should be 4 messages.
void *sb = zmq_socket (ctx, ZMQ_PULL);
assert (sb);
int hwm = 2;
int rc = zmq_setsockopt (sb, ZMQ_RCVHWM, &hwm, sizeof (hwm));
assert (rc == 0);
rc = zmq_bind (sb, "inproc://a");
assert (rc == 0);
void *sc = zmq_socket (ctx, ZMQ_PUSH);
assert (sc);
rc = zmq_setsockopt (sc, ZMQ_SNDHWM, &hwm, sizeof (hwm));
assert (rc == 0);
rc = zmq_connect (sc, "inproc://a");
assert (rc == 0);
// Try to send 10 messages. Only 4 should succeed.
for (int i = 0; i < 10; i++)
void *bind_socket;
void *connect_socket;
if (testType == BIND_FIRST)
{
int rc = zmq_send (sc, NULL, 0, ZMQ_DONTWAIT);
if (i < 4)
assert (rc == 0);
else
assert (rc < 0 && errno == EAGAIN);
// Set up bind socket
bind_socket = zmq_socket (ctx, ZMQ_PULL);
assert (bind_socket);
rc = zmq_setsockopt (bind_socket, ZMQ_RCVHWM, &recv_hwm, sizeof (recv_hwm));
assert (rc == 0);
rc = zmq_bind (bind_socket, "inproc://a");
assert (rc == 0);
// Set up connect socket
connect_socket = zmq_socket (ctx, ZMQ_PUSH);
assert (connect_socket);
rc = zmq_setsockopt (connect_socket, ZMQ_SNDHWM, &send_hwm, sizeof (send_hwm));
assert (rc == 0);
rc = zmq_connect (connect_socket, "inproc://a");
assert (rc == 0);
}
else
{
// Set up connect socket
connect_socket = zmq_socket (ctx, ZMQ_PUSH);
assert (connect_socket);
rc = zmq_setsockopt (connect_socket, ZMQ_SNDHWM, &send_hwm, sizeof (send_hwm));
assert (rc == 0);
rc = zmq_connect (connect_socket, "inproc://a");
assert (rc == 0);
// There should be now 4 messages pending, consume them.
for (int i = 0; i != 4; i++) {
rc = zmq_recv (sb, NULL, 0, 0);
// Set up bind socket
bind_socket = zmq_socket (ctx, ZMQ_PULL);
assert (bind_socket);
rc = zmq_setsockopt (bind_socket, ZMQ_RCVHWM, &recv_hwm, sizeof (recv_hwm));
assert (rc == 0);
rc = zmq_bind (bind_socket, "inproc://a");
assert (rc == 0);
}
// Send until we block
int send_count = 0;
while (send_count < MAX_SENDS && zmq_send (connect_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
++send_count;
// Now receive all sent messages
int recv_count = 0;
while (zmq_recv (bind_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
++recv_count;
assert (send_count == recv_count);
// Now it should be possible to send one more.
rc = zmq_send (sc, NULL, 0, 0);
rc = zmq_send (connect_socket, NULL, 0, 0);
assert (rc == 0);
// Consume the remaining message.
rc = zmq_recv (sb, NULL, 0, 0);
rc = zmq_recv (bind_socket, NULL, 0, 0);
assert (rc == 0);
rc = zmq_close (sc);
// Clean up
rc = zmq_close (connect_socket);
assert (rc == 0);
rc = zmq_close (sb);
rc = zmq_close (bind_socket);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0;
return send_count;
}
int test_inproc_bind_first (int send_hwm, int recv_hwm)
{
return count_msg(send_hwm, recv_hwm, BIND_FIRST);
}
int test_inproc_connect_first (int send_hwm, int recv_hwm)
{
return count_msg(send_hwm, recv_hwm, CONNECT_FIRST);
}
int main (void)
{
setup_test_environment();
int count;
// Infinite send and receive buffer
count = test_inproc_bind_first (0, 0);
assert (count == MAX_SENDS);
count = test_inproc_connect_first (0, 0);
assert (count == MAX_SENDS);
// Infinite send buffer
count = test_inproc_bind_first (1, 0);
assert (count == MAX_SENDS);
count = test_inproc_connect_first (1, 0);
assert (count == MAX_SENDS);
// Infinite receive buffer
count = test_inproc_bind_first (0, 1);
assert (count == MAX_SENDS);
count = test_inproc_connect_first (0, 1);
assert (count == MAX_SENDS);
// Send and recv buffers 1, so total that can be queued is 2
count = test_inproc_bind_first (1, 1);
assert (count == 2);
count = test_inproc_connect_first (1, 1);
assert (count == 2);
return 0;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment