Commit f92de9b2 authored by Martin Sustrik's avatar Martin Sustrik

bug during terminal shutdown fixed

parent 702fdbb5
...@@ -52,19 +52,24 @@ int main (int argc, char *argv []) ...@@ -52,19 +52,24 @@ int main (int argc, char *argv [])
rc = zmq_bind (s, bind_to); rc = zmq_bind (s, bind_to);
assert (rc == 0); assert (rc == 0);
for (i = 0; i != roundtrip_count; i++) {
rc = zmq_msg_init (&msg); rc = zmq_msg_init (&msg);
assert (rc == 0); assert (rc == 0);
for (i = 0; i != roundtrip_count; i++) {
rc = zmq_recv (s, &msg, 0); rc = zmq_recv (s, &msg, 0);
assert (rc == 0); assert (rc == 0);
assert (zmq_msg_size (&msg) == message_size); assert (zmq_msg_size (&msg) == message_size);
rc = zmq_send (s, &msg, 0); rc = zmq_send (s, &msg, 0);
assert (rc == 0); assert (rc == 0);
}
rc = zmq_msg_close (&msg); rc = zmq_msg_close (&msg);
assert (rc == 0); assert (rc == 0);
}
sleep (1); sleep (1);
rc = zmq_term (ctx);
assert (rc == 0);
return 0; return 0;
} }
...@@ -90,7 +90,10 @@ int main (int argc, char *argv []) ...@@ -90,7 +90,10 @@ int main (int argc, char *argv [])
printf ("message size: %d [B]\n", (int) message_size); printf ("message size: %d [B]\n", (int) message_size);
printf ("message count: %d\n", (int) message_count); printf ("message count: %d\n", (int) message_count);
printf ("mean throughput: %d [msg/s]\n", (int) throughput); printf ("mean throughput: %d [msg/s]\n", (int) throughput);
printf ("mean throughput: %3f [Mb/s]\n", (double) megabits); printf ("mean throughput: %.3f [Mb/s]\n", (double) megabits);
rc = zmq_term (ctx);
assert (rc == 0);
return 0; return 0;
} }
...@@ -59,17 +59,19 @@ int main (int argc, char *argv []) ...@@ -59,17 +59,19 @@ int main (int argc, char *argv [])
rc = gettimeofday (&start, NULL); rc = gettimeofday (&start, NULL);
assert (rc == 0); assert (rc == 0);
for (i = 0; i != roundtrip_count; i++) {
rc = zmq_msg_init_size (&msg, message_size); rc = zmq_msg_init_size (&msg, message_size);
assert (rc == 0); assert (rc == 0);
for (i = 0; i != roundtrip_count; i++) {
rc = zmq_send (s, &msg, 0); rc = zmq_send (s, &msg, 0);
assert (rc == 0); assert (rc == 0);
rc = zmq_recv (s, &msg, 0); rc = zmq_recv (s, &msg, 0);
assert (rc == 0); assert (rc == 0);
assert (zmq_msg_size (&msg) == message_size); assert (zmq_msg_size (&msg) == message_size);
}
rc = zmq_msg_close (&msg); rc = zmq_msg_close (&msg);
assert (rc == 0); assert (rc == 0);
}
rc = gettimeofday (&end, NULL); rc = gettimeofday (&end, NULL);
assert (rc == 0); assert (rc == 0);
...@@ -83,7 +85,10 @@ int main (int argc, char *argv []) ...@@ -83,7 +85,10 @@ int main (int argc, char *argv [])
printf ("message size: %d [B]\n", (int) message_size); printf ("message size: %d [B]\n", (int) message_size);
printf ("roundtrip count: %d\n", (int) roundtrip_count); printf ("roundtrip count: %d\n", (int) roundtrip_count);
printf ("average latency: %3f [us]\n", (double) latency); printf ("average latency: %.3f [us]\n", (double) latency);
rc = zmq_term (ctx);
assert (rc == 0);
return 0; return 0;
} }
...@@ -63,5 +63,8 @@ int main (int argc, char *argv []) ...@@ -63,5 +63,8 @@ int main (int argc, char *argv [])
sleep (10); sleep (10);
rc = zmq_term (ctx);
assert (rc == 0);
return 0; return 0;
} }
...@@ -71,7 +71,7 @@ int main (int argc, char *argv []) ...@@ -71,7 +71,7 @@ int main (int argc, char *argv [])
printf ("message size: %d [B]\n", (int) message_size); printf ("message size: %d [B]\n", (int) message_size);
printf ("message count: %d\n", (int) message_count); printf ("message count: %d\n", (int) message_count);
printf ("mean throughput: %d [msg/s]\n", (int) throughput); printf ("mean throughput: %d [msg/s]\n", (int) throughput);
printf ("mean throughput: %3f [Mb/s]\n", (double) megabits); printf ("mean throughput: %.3f [Mb/s]\n", (double) megabits);
return 0; return 0;
} }
...@@ -64,7 +64,7 @@ int main (int argc, char *argv []) ...@@ -64,7 +64,7 @@ int main (int argc, char *argv [])
printf ("message size: %d [B]\n", (int) message_size); printf ("message size: %d [B]\n", (int) message_size);
printf ("roundtrip count: %d\n", (int) roundtrip_count); printf ("roundtrip count: %d\n", (int) roundtrip_count);
printf ("average latency: %3f [us]\n", (double) latency); printf ("average latency: %.3f [us]\n", (double) latency);
return 0; return 0;
} }
...@@ -36,11 +36,8 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, ...@@ -36,11 +36,8 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
zmq::session_t::~session_t () zmq::session_t::~session_t ()
{ {
// Ask associated pipes to terminate. zmq_assert (!in_pipe);
if (in_pipe) zmq_assert (!out_pipe);
in_pipe->term ();
if (out_pipe)
out_pipe->term ();
} }
bool zmq::session_t::read (::zmq_msg_t *msg_) bool zmq::session_t::read (::zmq_msg_t *msg_)
...@@ -82,6 +79,7 @@ void zmq::session_t::attach_inpipe (reader_t *pipe_) ...@@ -82,6 +79,7 @@ void zmq::session_t::attach_inpipe (reader_t *pipe_)
active = true; active = true;
in_pipe->set_endpoint (this); in_pipe->set_endpoint (this);
} }
void zmq::session_t::attach_outpipe (writer_t *pipe_) void zmq::session_t::attach_outpipe (writer_t *pipe_)
{ {
zmq_assert (!out_pipe); zmq_assert (!out_pipe);
...@@ -141,6 +139,16 @@ void zmq::session_t::process_unplug () ...@@ -141,6 +139,16 @@ void zmq::session_t::process_unplug ()
bool ok = owner->unregister_session (name.c_str ()); bool ok = owner->unregister_session (name.c_str ());
zmq_assert (ok); zmq_assert (ok);
// Ask associated pipes to terminate.
if (in_pipe) {
in_pipe->term ();
in_pipe = NULL;
}
if (out_pipe) {
out_pipe->term ();
out_pipe = NULL;
}
if (engine) { if (engine) {
engine->unplug (); engine->unplug ();
delete engine; delete engine;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment