Commit f4c51db8 authored by Ian Barber's avatar Ian Barber

Fix small streamengine issue w/ term and handshake

Also tidy up monitor test a little.
parent cabf4e65
...@@ -642,6 +642,12 @@ void zmq::stream_engine_t::mechanism_ready () ...@@ -642,6 +642,12 @@ void zmq::stream_engine_t::mechanism_ready ()
msg_t identity; msg_t identity;
mechanism->peer_identity (&identity); mechanism->peer_identity (&identity);
const int rc = session->push_msg (&identity); const int rc = session->push_msg (&identity);
if (rc == -1 && errno == EAGAIN) {
// If the write is failing at this stage with
// an EAGAIN the pipe must be being shut down,
// so we can just bail out of the identity set.
return;
}
errno_assert (rc == 0); errno_assert (rc == 0);
} }
......
...@@ -41,12 +41,14 @@ static bool read_msg(void* s, zmq_event_t& event, std::string& ep) ...@@ -41,12 +41,14 @@ static bool read_msg(void* s, zmq_event_t& event, std::string& ep)
zmq_msg_init (&msg2); zmq_msg_init (&msg2);
rc = zmq_msg_recv (&msg1, s, 0); rc = zmq_msg_recv (&msg1, s, 0);
if (rc == -1 && zmq_errno() == ETERM) if (rc == -1 && zmq_errno() == ETERM)
return true ; return true ;
assert (rc != -1); assert (rc != -1);
assert (zmq_msg_more(&msg1) != 0); assert (zmq_msg_more(&msg1) != 0);
rc = zmq_msg_recv (&msg2, s, 0); rc = zmq_msg_recv (&msg2, s, 0);
if (rc == -1 && zmq_errno() == ETERM) if (rc == -1 && zmq_errno() == ETERM)
return true; return true;
assert (rc != -1); assert (rc != -1);
assert (zmq_msg_more(&msg2) == 0); assert (zmq_msg_more(&msg2) == 0);
// copy binary data to event struct // copy binary data to event struct
...@@ -72,7 +74,7 @@ static void *req_socket_monitor (void *ctx) ...@@ -72,7 +74,7 @@ static void *req_socket_monitor (void *ctx)
rc = zmq_connect (s, "inproc://monitor.req"); rc = zmq_connect (s, "inproc://monitor.req");
assert (rc == 0); assert (rc == 0);
while (!read_msg(s, event, ep)) { while (!read_msg(s, event, ep)) {
assert (ep == addr); assert (ep == addr);
switch (event.event) { switch (event.event) {
case ZMQ_EVENT_CONNECTED: case ZMQ_EVENT_CONNECTED:
assert (event.value > 0); assert (event.value > 0);
...@@ -114,7 +116,7 @@ static void *req2_socket_monitor (void *ctx) ...@@ -114,7 +116,7 @@ static void *req2_socket_monitor (void *ctx)
rc = zmq_connect (s, "inproc://monitor.req2"); rc = zmq_connect (s, "inproc://monitor.req2");
assert (rc == 0); assert (rc == 0);
while (!read_msg(s, event, ep)) { while (!read_msg(s, event, ep)) {
assert (ep == addr); assert (ep == addr);
switch (event.event) { switch (event.event) {
case ZMQ_EVENT_CONNECTED: case ZMQ_EVENT_CONNECTED:
assert (event.value > 0); assert (event.value > 0);
...@@ -143,7 +145,7 @@ static void *rep_socket_monitor (void *ctx) ...@@ -143,7 +145,7 @@ static void *rep_socket_monitor (void *ctx)
rc = zmq_connect (s, "inproc://monitor.rep"); rc = zmq_connect (s, "inproc://monitor.rep");
assert (rc == 0); assert (rc == 0);
while (!read_msg(s, event, ep)) { while (!read_msg(s, event, ep)) {
assert (ep == addr); assert (ep == addr);
switch (event.event) { switch (event.event) {
case ZMQ_EVENT_LISTENING: case ZMQ_EVENT_LISTENING:
assert (event.value > 0); assert (event.value > 0);
...@@ -203,10 +205,7 @@ int main (void) ...@@ -203,10 +205,7 @@ int main (void)
assert (rc == 0); assert (rc == 0);
rc = pthread_create (&threads [0], NULL, rep_socket_monitor, ctx); rc = pthread_create (&threads [0], NULL, rep_socket_monitor, ctx);
assert (rc == 0); assert (rc == 0);
rc = zmq_bind (rep, addr.c_str());
assert (rc == 0);
// REQ socket // REQ socket
req = zmq_socket (ctx, ZMQ_REQ); req = zmq_socket (ctx, ZMQ_REQ);
assert (req); assert (req);
...@@ -216,6 +215,11 @@ int main (void) ...@@ -216,6 +215,11 @@ int main (void)
assert (rc == 0); assert (rc == 0);
rc = pthread_create (&threads [1], NULL, req_socket_monitor, ctx); rc = pthread_create (&threads [1], NULL, req_socket_monitor, ctx);
assert (rc == 0); assert (rc == 0);
sleep(1);
// Bind REQ and REP
rc = zmq_bind (rep, addr.c_str());
assert (rc == 0);
rc = zmq_connect (req, addr.c_str()); rc = zmq_connect (req, addr.c_str());
assert (rc == 0); assert (rc == 0);
...@@ -271,4 +275,3 @@ int main (void) ...@@ -271,4 +275,3 @@ int main (void)
return 0 ; return 0 ;
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment