Commit 6a5120b1 authored by Martin Sustrik's avatar Martin Sustrik

python extension & perf tests

parent 72fdf47d
...@@ -111,7 +111,7 @@ ZMQ_EXPORT int zmq_msg_init (struct zmq_msg_t *msg); ...@@ -111,7 +111,7 @@ ZMQ_EXPORT int zmq_msg_init (struct zmq_msg_t *msg);
ZMQ_EXPORT int zmq_msg_init_size (struct zmq_msg_t *msg, size_t size); ZMQ_EXPORT int zmq_msg_init_size (struct zmq_msg_t *msg, size_t size);
// Initialise a message from an existing buffer. Message isn't copied, // Initialise a message from an existing buffer. Message isn't copied,
// instead 0SOCKETS infrastructure take ownership of the buffer and call // instead 0MQ infrastructure take ownership of the buffer and call
// deallocation functio (ffn) once it's not needed anymore. // deallocation functio (ffn) once it's not needed anymore.
ZMQ_EXPORT int zmq_msg_init_data (struct zmq_msg_t *msg, void *data, ZMQ_EXPORT int zmq_msg_init_data (struct zmq_msg_t *msg, void *data,
size_t size, zmq_free_fn *ffn); size_t size, zmq_free_fn *ffn);
...@@ -139,7 +139,7 @@ ZMQ_EXPORT size_t zmq_msg_size (struct zmq_msg_t *msg); ...@@ -139,7 +139,7 @@ ZMQ_EXPORT size_t zmq_msg_size (struct zmq_msg_t *msg);
// Returns type of the message. // Returns type of the message.
ZMQ_EXPORT int zmq_msg_type (struct zmq_msg_t *msg); ZMQ_EXPORT int zmq_msg_type (struct zmq_msg_t *msg);
// Initialise 0SOCKETS context. 'app_threads' specifies maximal number // Initialise 0MQ context. 'app_threads' specifies maximal number
// of application threads that can have open sockets at the same time. // of application threads that can have open sockets at the same time.
// 'io_threads' specifies the size of thread pool to handle I/O operations. // 'io_threads' specifies the size of thread pool to handle I/O operations.
// //
...@@ -147,7 +147,7 @@ ZMQ_EXPORT int zmq_msg_type (struct zmq_msg_t *msg); ...@@ -147,7 +147,7 @@ ZMQ_EXPORT int zmq_msg_type (struct zmq_msg_t *msg);
// threads declared at all. // threads declared at all.
ZMQ_EXPORT void *zmq_init (int app_threads, int io_threads); ZMQ_EXPORT void *zmq_init (int app_threads, int io_threads);
// Deinitialise 0SOCKETS context including all the open sockets. Closing // Deinitialise 0MQ context including all the open sockets. Closing
// sockets after zmq_term has been called will result in undefined behaviour. // sockets after zmq_term has been called will result in undefined behaviour.
ZMQ_EXPORT int zmq_term (void *context); ZMQ_EXPORT int zmq_term (void *context);
......
...@@ -38,6 +38,7 @@ int main (int argc, char *argv []) ...@@ -38,6 +38,7 @@ int main (int argc, char *argv [])
struct timeval end; struct timeval end;
uint64_t elapsed; uint64_t elapsed;
uint64_t throughput; uint64_t throughput;
double megabits;
if (argc != 4) { if (argc != 4) {
printf ("usage: local_thr <bind-to> <message-count> " printf ("usage: local_thr <bind-to> <message-count> "
...@@ -81,12 +82,15 @@ int main (int argc, char *argv []) ...@@ -81,12 +82,15 @@ int main (int argc, char *argv [])
elapsed = ((uint64_t) end.tv_sec * 1000000 + end.tv_usec) - elapsed = ((uint64_t) end.tv_sec * 1000000 + end.tv_usec) -
((uint64_t) start.tv_sec * 1000000 + start.tv_usec); ((uint64_t) start.tv_sec * 1000000 + start.tv_usec);
if (elapsed == 0)
elapsed = 1;
throughput = (uint64_t) message_count * 1000000 / elapsed; throughput = (uint64_t) message_count * 1000000 / elapsed;
megabits = (double) (throughput * message_size * 8) / 1000000;
printf ("message size: %d [B]\n", (int) message_size); printf ("message size: %d [B]\n", (int) message_size);
printf ("message count: %d\n", (int) message_count); printf ("message count: %d\n", (int) message_count);
printf ("mean throughput: %d [msg/s]\n", (int) throughput); printf ("mean throughput: %d [msg/s]\n", (int) throughput);
printf ("mean throughput: %3f [Mb/s]\n", (double) megabits);
return 0; return 0;
} }
...@@ -63,12 +63,15 @@ int main (int argc, char *argv []) ...@@ -63,12 +63,15 @@ int main (int argc, char *argv [])
uint64_t elapsed = ((uint64_t) end.tv_sec * 1000000 + end.tv_usec) - uint64_t elapsed = ((uint64_t) end.tv_sec * 1000000 + end.tv_usec) -
((uint64_t) start.tv_sec * 1000000 + start.tv_usec); ((uint64_t) start.tv_sec * 1000000 + start.tv_usec);
if (elapsed == 0)
elapsed = 1;
uint64_t throughput = (uint64_t) message_count * 1000000 / elapsed; uint64_t throughput = (uint64_t) message_count * 1000000 / elapsed;
double megabits = (double) (throughput * message_size * 8) / 1000000;
printf ("message size: %d [B]\n", (int) message_size); printf ("message size: %d [B]\n", (int) message_size);
printf ("message count: %d\n", (int) message_count); printf ("message count: %d\n", (int) message_count);
printf ("mean throughput: %d [msg/s]\n", (int) throughput); printf ("mean throughput: %d [msg/s]\n", (int) throughput);
printf ("mean throughput: %3f [Mb/s]\n", (double) megabits);
return 0; return 0;
} }
...@@ -18,50 +18,32 @@ ...@@ -18,50 +18,32 @@
# #
import sys import sys
from datetime import datetime
import libpyzmq
import time import time
import libpyzmq
def main (): def main ():
if len (sys.argv) != 5: if len (sys.argv) != 4:
print 'usage: py_local_lat <in-interface> <out-interface> <message-size> <roundtrip-count>' print 'usage: local_lat <bind-to> <roundtrip-count> <message-size>'
sys.exit (1) sys.exit (1)
try: try:
in_interface = sys.argv [1] bind_to = sys.argv [1]
out_interface = sys.argv [2] roundtrip_count = int (sys.argv [2])
message_size = int (sys.argv [3]) message_size = int (sys.argv [3])
roundtrip_count = int (sys.argv [4])
except (ValueError, OverflowError), e: except (ValueError, OverflowError), e:
print 'message-size and roundtrip-count must be integers' print 'message-size and roundtrip-count must be integers'
sys.exit (1) sys.exit (1)
print "message size:", message_size, "[B]" ctx = libpyzmq.Context (1, 1);
print "roundtrip count:", roundtrip_count s = libpyzmq.Socket (ctx, libpyzmq.REP)
s.bind (bind_to)
z = libpyzmq.Zmq ()
context = z.context (1,1);
in_socket = z.socket (context, libpyzmq.ZMQ_SUB)
out_socket = z.socket (context, libpyzmq.ZMQ_PUB)
z.bind (in_socket, addr = in_interface)
z.bind (out_socket, addr = out_interface)
msg_out = z.init_msg_data (string_msg, type)
start = datetime.now ()
for i in range (0, roundtrip_count): for i in range (0, roundtrip_count):
z.send (out_socket, msg_out, True) msg = s.recv ()
list = z.receive (in_socket, True) assert len (msg) == message_size
msg_in = list [1] s.send (msg)
assert len(msg_in) == message_size
end = datetime.now ()
delta = end - start time.sleep (1)
delta_us = delta.seconds * 1000000 + delta.microseconds
print 'Your average latency is', delta_us / roundtrip_count, ' [us]'
if __name__ == "__main__": if __name__ == "__main__":
main () main ()
...@@ -23,46 +23,42 @@ import libpyzmq ...@@ -23,46 +23,42 @@ import libpyzmq
def main (): def main ():
if len (sys.argv) != 4: if len (sys.argv) != 4:
print ('usage: py_local_thr <in_interface> <message-size> ' + print 'usage: local_thr <bind-to> <message-size> <message-count>'
'<message-count>')
sys.exit (1) sys.exit (1)
try: try:
bind_to = sys.argv [1]
message_size = int (sys.argv [2]) message_size = int (sys.argv [2])
message_count = int (sys.argv [3]) message_count = int (sys.argv [3])
except (ValueError, OverflowError), e: except (ValueError, OverflowError), e:
print 'message-size and message-count must be integers' print 'message-size and message-count must be integers'
sys.exit (1) sys.exit (1)
print "message size:", message_size, "[B]" ctx = libpyzmq.Context (1, 1);
print "message count:", message_count s = libpyzmq.Socket (ctx, libpyzmq.P2P)
s.bind (bind_to)
z = libpyzmq.Zmq () msg = s.recv ()
assert len (msg) == message_size
context = z.context (1,1)
in_socket = z.socket (context, libpyzmq.ZMQ_SUB)
z.connect (in_socketaddr = sys.argv [1])
list = z.receive (in_socket, True)
msg = list [1]
assert len(msg) == message_size
start = datetime.now () start = datetime.now ()
for i in range (1, message_count): for i in range (1, message_count):
list = z.receive (in_socket, True) msg = s.recv ()
msg = list [1] assert len (msg) == message_size
assert len(msg) == message_size
end = datetime.now() end = datetime.now()
delta = end - start elapsed = (end - start).seconds * 1000000 + (end - start).microseconds
delta_us = delta.seconds * 1000000 + delta.microseconds if elapsed == 0:
if delta_us == 0: elapsed = 1
delta_us = 1 throughput = (1000000.0 * float (message_count)) / float (elapsed)
message_thr = (1000000.0 * float (message_count)) / float (delta_us) megabits = float (throughput * message_size * 8) / 1000000
megabit_thr = (message_thr * float (message_size) * 8.0) / 1000000.0;
print "Your average throughput is %.0f [msg/s]" % (message_thr, ) print "message size: %.0f [B]" % (message_size, )
print "Your average throughput is %.2f [Mb/s]" % (megabit_thr, ) print "message count: %.0f" % (message_count, )
print "mean throughput: %.0f [msg/s]" % (throughput, )
print "mean throughput: %.3f [Mb/s]" % (megabits, )
if __name__ == "__main__": if __name__ == "__main__":
main () main ()
...@@ -20,39 +20,40 @@ ...@@ -20,39 +20,40 @@
import sys import sys
from datetime import datetime from datetime import datetime
import libpyzmq import libpyzmq
import time
def main (): def main ():
if len(sys.argv) != 5: if len(sys.argv) != 4:
print ('usage: py_remote_lat <in-interface> ' + print 'usage: remote_lat <connect-to> <roundtrip-count> <message-size>'
'<out-interface> <message-size> <roundtrip-count>')
sys.exit (1) sys.exit (1)
try: try:
message_size = int (sys.argv [3]) connect_to = sys.argv [1]
roundtrip_count = int (sys.argv [4]) message_size = int (sys.argv [2])
roundtrip_count = int (sys.argv [3])
except (ValueError, OverflowError), e: except (ValueError, OverflowError), e:
print 'message-size and message-count must be integers' print 'message-size and message-count must be integers'
sys.exit (1) sys.exit (1)
z = libpyzmq.Zmq () ctx = libpyzmq.Context (1, 1);
s = libpyzmq.Socket (ctx, libpyzmq.REQ)
context = z.context (1,1) s.connect (connect_to)
in_socket = z.socket (context, libpyzmq.ZMQ_SUB) msg = ''.join ([' ' for n in range (0, message_size)])
out_socket = z.socket (context, libpyzmq.ZMQ_PUB)
z.connect (in_socket, addr = in_interface) start = datetime.now ()
z.connect (out_socket, addr = out_interface)
for i in range (0, roundtrip_count): for i in range (0, roundtrip_count):
list = z.receive (in_socket, True) s.send (msg)
message = list [1] msg = s.recv ()
z.send (out_socket, message, True) assert len (msg) == message_size
time.sleep (2) end = datetime.now ()
delta = (end - start).microseconds + 1000000 * (end - start).seconds
latency = delta / roundtrip_count / 2
print "message size: %.0f [B]" % (message_size, )
print "roundtrip count: %.0f" % (roundtrip_count, )
print "mean latency: %.3f [us]" % (latency, )
if __name__ == "__main__": if __name__ == "__main__":
main () main ()
......
...@@ -18,33 +18,32 @@ ...@@ -18,33 +18,32 @@
# #
import sys import sys
from datetime import datetime
import libpyzmq import libpyzmq
import time import time
def main (): def main ():
if len (sys.argv) != 4: if len (sys.argv) != 4:
print 'usage: py_remote_thr <out-interface> <message-size> <message-count>' print 'usage: remote_thr <connect-to> <message-size> <message-count>'
sys.exit (1) sys.exit (1)
try: try:
connect_to = argv [1]
message_size = int (sys.argv [2]) message_size = int (sys.argv [2])
message_count = int (sys.argv [3]) message_count = int (sys.argv [3])
except (ValueError, OverflowError), e: except (ValueError, OverflowError), e:
print 'message-size and message-count must be integers' print 'message-size and message-count must be integers'
sys.exit (1) sys.exit (1)
z = libpyzmq.Zmq () ctx = libpyzmq.Context (1, 1);
context = z.context (1,1); s = libpyzmq.Socket (ctx, libpyzmq.P2P)
out_socket = z.socket (context, libpyzmq.ZMQ_PUB) s.connect (connect_to)
z.bind (out_socket, addr = sys.argv [1])
msg = z.init_msg_data (string_msg, type) msg = ''.join ([' ' for n in range (0, message_size)])
for i in range (0, message_count): for i in range (0, message_count):
z.send (out_socket, msg, True) s.send (msg)
time.sleep (2) time.sleep (10)
if __name__ == "__main__": if __name__ == "__main__":
main () main ()
This diff is collapsed.
...@@ -25,6 +25,8 @@ namespace zmq ...@@ -25,6 +25,8 @@ namespace zmq
struct i_endpoint struct i_endpoint
{ {
virtual void attach_inpipe (class reader_t *pipe_) = 0;
virtual void attach_outpipe (class writer_t *pipe_) = 0;
virtual void revive (class reader_t *pipe_) = 0; virtual void revive (class reader_t *pipe_) = 0;
virtual void detach_inpipe (class reader_t *pipe_) = 0; virtual void detach_inpipe (class reader_t *pipe_) = 0;
virtual void detach_outpipe (class writer_t *pipe_) = 0; virtual void detach_outpipe (class writer_t *pipe_) = 0;
......
...@@ -43,21 +43,6 @@ zmq::session_t::~session_t () ...@@ -43,21 +43,6 @@ zmq::session_t::~session_t ()
out_pipe->term (); out_pipe->term ();
} }
void zmq::session_t::set_inbound_pipe (reader_t *pipe_)
{
zmq_assert (!in_pipe);
in_pipe = pipe_;
active = true;
in_pipe->set_endpoint (this);
}
void zmq::session_t::set_outbound_pipe (writer_t *pipe_)
{
zmq_assert (!out_pipe);
out_pipe = pipe_;
out_pipe->set_endpoint (this);
}
bool zmq::session_t::read (::zmq_msg_t *msg_) bool zmq::session_t::read (::zmq_msg_t *msg_)
{ {
if (!active) if (!active)
...@@ -90,6 +75,20 @@ void zmq::session_t::detach () ...@@ -90,6 +75,20 @@ void zmq::session_t::detach ()
// term (); // term ();
} }
void zmq::session_t::attach_inpipe (reader_t *pipe_)
{
zmq_assert (!in_pipe);
in_pipe = pipe_;
active = true;
in_pipe->set_endpoint (this);
}
void zmq::session_t::attach_outpipe (writer_t *pipe_)
{
zmq_assert (!out_pipe);
out_pipe = pipe_;
out_pipe->set_endpoint (this);
}
void zmq::session_t::revive (reader_t *pipe_) void zmq::session_t::revive (reader_t *pipe_)
{ {
zmq_assert (in_pipe == pipe_); zmq_assert (in_pipe == pipe_);
......
...@@ -37,13 +37,6 @@ namespace zmq ...@@ -37,13 +37,6 @@ namespace zmq
session_t (object_t *parent_, socket_base_t *owner_, const char *name_, session_t (object_t *parent_, socket_base_t *owner_, const char *name_,
const options_t &options_); const options_t &options_);
void set_inbound_pipe (class reader_t *pipe_);
void set_outbound_pipe (class writer_t *pipe_);
private:
~session_t ();
// i_inout interface implementation. // i_inout interface implementation.
bool read (::zmq_msg_t *msg_); bool read (::zmq_msg_t *msg_);
bool write (::zmq_msg_t *msg_); bool write (::zmq_msg_t *msg_);
...@@ -51,10 +44,16 @@ namespace zmq ...@@ -51,10 +44,16 @@ namespace zmq
void detach (); void detach ();
// i_endpoint interface implementation. // i_endpoint interface implementation.
void attach_inpipe (class reader_t *pipe_);
void attach_outpipe (class writer_t *pipe_);
void revive (class reader_t *pipe_); void revive (class reader_t *pipe_);
void detach_inpipe (class reader_t *pipe_); void detach_inpipe (class reader_t *pipe_);
void detach_outpipe (class writer_t *pipe_); void detach_outpipe (class writer_t *pipe_);
private:
~session_t ();
// Handlers for incoming commands. // Handlers for incoming commands.
void process_plug (); void process_plug ();
void process_unplug (); void process_unplug ();
......
...@@ -173,7 +173,7 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -173,7 +173,7 @@ int zmq::socket_base_t::connect (const char *addr_)
pipe_t *in_pipe = new pipe_t (this, session, options.hwm, options.lwm); pipe_t *in_pipe = new pipe_t (this, session, options.hwm, options.lwm);
zmq_assert (in_pipe); zmq_assert (in_pipe);
in_pipe->reader.set_endpoint (this); in_pipe->reader.set_endpoint (this);
session->set_outbound_pipe (&in_pipe->writer); session->attach_outpipe (&in_pipe->writer);
in_pipes.push_back (&in_pipe->reader); in_pipes.push_back (&in_pipe->reader);
in_pipes.back ()->set_index (active); in_pipes.back ()->set_index (active);
in_pipes [active]->set_index (in_pipes.size () - 1); in_pipes [active]->set_index (in_pipes.size () - 1);
...@@ -184,7 +184,7 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -184,7 +184,7 @@ int zmq::socket_base_t::connect (const char *addr_)
pipe_t *out_pipe = new pipe_t (session, this, options.hwm, options.lwm); pipe_t *out_pipe = new pipe_t (session, this, options.hwm, options.lwm);
zmq_assert (out_pipe); zmq_assert (out_pipe);
out_pipe->writer.set_endpoint (this); out_pipe->writer.set_endpoint (this);
session->set_inbound_pipe (&out_pipe->reader); session->attach_inpipe (&out_pipe->reader);
out_pipes.push_back (&out_pipe->writer); out_pipes.push_back (&out_pipe->writer);
// Activate the session. // Activate the session.
...@@ -327,6 +327,22 @@ zmq::session_t *zmq::socket_base_t::find_session (const char *name_) ...@@ -327,6 +327,22 @@ zmq::session_t *zmq::socket_base_t::find_session (const char *name_)
return it->second; return it->second;
} }
void zmq::socket_base_t::attach_inpipe (class reader_t *pipe_)
{
pipe_->set_endpoint (this);
in_pipes.push_back (pipe_);
in_pipes.back ()->set_index (active);
in_pipes [active]->set_index (in_pipes.size () - 1);
std::swap (in_pipes.back (), in_pipes [active]);
active++;
}
void zmq::socket_base_t::attach_outpipe (class writer_t *pipe_)
{
pipe_->set_endpoint (this);
out_pipes.push_back (pipe_);
}
void zmq::socket_base_t::revive (reader_t *pipe_) void zmq::socket_base_t::revive (reader_t *pipe_)
{ {
// Move the pipe to the list of active pipes. // Move the pipe to the list of active pipes.
...@@ -372,15 +388,9 @@ void zmq::socket_base_t::process_bind (owned_t *session_, ...@@ -372,15 +388,9 @@ void zmq::socket_base_t::process_bind (owned_t *session_,
reader_t *in_pipe_, writer_t *out_pipe_) reader_t *in_pipe_, writer_t *out_pipe_)
{ {
zmq_assert (in_pipe_); zmq_assert (in_pipe_);
in_pipe_->set_endpoint (this); attach_inpipe (in_pipe_);
in_pipes.push_back (in_pipe_);
in_pipes.back ()->set_index (active);
in_pipes [active]->set_index (in_pipes.size () - 1);
std::swap (in_pipes.back (), in_pipes [active]);
active++;
zmq_assert (out_pipe_); zmq_assert (out_pipe_);
out_pipe_->set_endpoint (this); attach_outpipe (out_pipe_);
out_pipes.push_back (out_pipe_);
} }
void zmq::socket_base_t::process_term_req (owned_t *object_) void zmq::socket_base_t::process_term_req (owned_t *object_)
......
...@@ -60,6 +60,8 @@ namespace zmq ...@@ -60,6 +60,8 @@ namespace zmq
class session_t *find_session (const char *name_); class session_t *find_session (const char *name_);
// i_endpoint interface implementation. // i_endpoint interface implementation.
void attach_inpipe (class reader_t *pipe_);
void attach_outpipe (class writer_t *pipe_);
void revive (class reader_t *pipe_); void revive (class reader_t *pipe_);
void detach_inpipe (class reader_t *pipe_); void detach_inpipe (class reader_t *pipe_);
void detach_outpipe (class writer_t *pipe_); void detach_outpipe (class writer_t *pipe_);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment