Commit 06740b5d authored by Reza Ebrahimi's avatar Reza Ebrahimi

Problem with ZMQ_ROUTER_MANDATORY option when peer lost

indention

Revert "indention"

This reverts commit a6e7e192ac2d089ac9f7dc0d31d4b1fd10de982e.

indention

indention

Fix Failure in tests

Check both pipe full and pipe close
parent 995a41b5
...@@ -212,15 +212,30 @@ int zmq::router_t::xsend (msg_t *msg_) ...@@ -212,15 +212,30 @@ int zmq::router_t::xsend (msg_t *msg_)
if (it != outpipes.end ()) { if (it != outpipes.end ()) {
current_out = it->second.pipe; current_out = it->second.pipe;
if (!current_out->check_write ()) {
// Check whether pipe is full or not
if (!current_out->check_hwm()) {
it->second.active = false; it->second.active = false;
current_out = NULL; current_out = NULL;
if (mandatory) { if (mandatory) {
more_out = false; more_out = false;
errno = EAGAIN; errno = EAGAIN;
return -1; return -1;
} }
} }
// Check whether pipe is closed or not
else
if (!current_out->check_write()) {
it->second.active = false;
current_out = NULL;
if (mandatory) {
more_out = false;
errno = EHOSTUNREACH;
return -1;
}
}
} }
else else
if (mandatory) { if (mandatory) {
......
...@@ -1120,7 +1120,13 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_) ...@@ -1120,7 +1120,13 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
if (rc == 0) { if (rc == 0) {
return 0; return 0;
} }
if (unlikely (errno != EAGAIN)) {
// In case of ZMQ_ROUTER_MANDATORY option set and peer disconnected
if (likely(errno == EHOSTUNREACH)) {
return -1;
}
if (unlikely(errno != EAGAIN)) {
return -1; return -1;
} }
......
...@@ -84,11 +84,15 @@ int main (void) ...@@ -84,11 +84,15 @@ int main (void)
const int BUF_SIZE = 65536; const int BUF_SIZE = 65536;
char buf[BUF_SIZE]; char buf[BUF_SIZE];
memset(buf, 0, BUF_SIZE); memset(buf, 0, BUF_SIZE);
// Send first batch of messages // Send first batch of messages
for(i = 0; i < 100000; ++i) { for(i = 0; i < 100000; ++i) {
if (TRACE_ENABLED) fprintf(stderr, "Sending message %d ...\n", i); if (TRACE_ENABLED)
fprintf(stderr, "Sending message %d ...\n", i);
rc = zmq_send (router, "X", 1, ZMQ_DONTWAIT | ZMQ_SNDMORE); rc = zmq_send (router, "X", 1, ZMQ_DONTWAIT | ZMQ_SNDMORE);
if (rc == -1 && zmq_errno() == EAGAIN) break; if (rc == -1 && zmq_errno() == EHOSTUNREACH)
break;
assert (rc == 1); assert (rc == 1);
rc = zmq_send (router, buf, BUF_SIZE, ZMQ_DONTWAIT); rc = zmq_send (router, buf, BUF_SIZE, ZMQ_DONTWAIT);
assert (rc == BUF_SIZE); assert (rc == BUF_SIZE);
...@@ -96,12 +100,17 @@ int main (void) ...@@ -96,12 +100,17 @@ int main (void)
// This should fail after one message but kernel buffering could // This should fail after one message but kernel buffering could
// skew results // skew results
assert (i < 10); assert (i < 10);
msleep (1000); msleep (1000);
// Send second batch of messages // Send second batch of messages
for(; i < 100000; ++i) { for(; i < 100000; ++i) {
if (TRACE_ENABLED) fprintf(stderr, "Sending message %d (part 2) ...\n", i); if (TRACE_ENABLED)
fprintf(stderr, "Sending message %d (part 2) ...\n", i);
rc = zmq_send (router, "X", 1, ZMQ_DONTWAIT | ZMQ_SNDMORE); rc = zmq_send (router, "X", 1, ZMQ_DONTWAIT | ZMQ_SNDMORE);
if (rc == -1 && zmq_errno() == EAGAIN) break; if (rc == -1 && zmq_errno() == EHOSTUNREACH)
break;
assert (rc == 1); assert (rc == 1);
rc = zmq_send (router, buf, BUF_SIZE, ZMQ_DONTWAIT); rc = zmq_send (router, buf, BUF_SIZE, ZMQ_DONTWAIT);
assert (rc == BUF_SIZE); assert (rc == BUF_SIZE);
...@@ -110,7 +119,8 @@ int main (void) ...@@ -110,7 +119,8 @@ int main (void)
// skew results // skew results
assert (i < 20); assert (i < 20);
if (TRACE_ENABLED) fprintf(stderr, "Done sending messages.\n"); if (TRACE_ENABLED)
fprintf(stderr, "Done sending messages.\n");
rc = zmq_close (router); rc = zmq_close (router);
assert (rc == 0); assert (rc == 0);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment