Commit d62c7423 authored by Martin Sustrik's avatar Martin Sustrik

Merge branch 'master' of git://github.com/sustrik/zeromq2

parents 11a6cb92 63b56d7f
...@@ -328,7 +328,8 @@ AC_TYPE_SIGNAL ...@@ -328,7 +328,8 @@ AC_TYPE_SIGNAL
AC_CHECK_FUNCS(perror gettimeofday memset socket getifaddrs freeifaddrs) AC_CHECK_FUNCS(perror gettimeofday memset socket getifaddrs freeifaddrs)
AC_OUTPUT(Makefile src/Makefile python/Makefile python/setup.py ruby/Makefile \ AC_OUTPUT(Makefile src/Makefile python/Makefile python/setup.py ruby/Makefile \
java/Makefile perf/Makefile perf/c/Makefile perf/cpp/Makefile) java/Makefile perf/Makefile perf/c/Makefile perf/cpp/Makefile \
perf/python/Makefile)
AC_MSG_RESULT([]) AC_MSG_RESULT([])
AC_MSG_RESULT([ ******************************************************** ]) AC_MSG_RESULT([ ******************************************************** ])
......
SUBDIRS = c cpp SUBDIRS = c cpp python
DIST_SUBDIRS = c cpp DIST_SUBDIRS = c cpp python
...@@ -35,13 +35,13 @@ int main (int argc, char *argv []) ...@@ -35,13 +35,13 @@ int main (int argc, char *argv [])
struct zmq_msg_t msg; struct zmq_msg_t msg;
if (argc != 4) { if (argc != 4) {
printf ("usage: local_lat <bind-to> <roundtrip-count> " printf ("usage: local_lat <bind-to> <message-size> "
"<message-size>\n"); "<roundtrip-count>\n");
return 1; return 1;
} }
bind_to = argv [1]; bind_to = argv [1];
roundtrip_count = atoi (argv [2]); message_size = atoi (argv [2]);
message_size = atoi (argv [3]); roundtrip_count = atoi (argv [3]);
ctx = zmq_init (1, 1); ctx = zmq_init (1, 1);
assert (ctx); assert (ctx);
...@@ -68,6 +68,9 @@ int main (int argc, char *argv []) ...@@ -68,6 +68,9 @@ int main (int argc, char *argv [])
sleep (1); sleep (1);
rc = zmq_close (s);
assert (rc == 0);
rc = zmq_term (ctx); rc = zmq_term (ctx);
assert (rc == 0); assert (rc == 0);
......
...@@ -41,13 +41,12 @@ int main (int argc, char *argv []) ...@@ -41,13 +41,12 @@ int main (int argc, char *argv [])
double megabits; double megabits;
if (argc != 4) { if (argc != 4) {
printf ("usage: local_thr <bind-to> <message-count> " printf ("usage: local_thr <bind-to> <message-size> <message-count>\n");
"<message-size>\n");
return 1; return 1;
} }
bind_to = argv [1]; bind_to = argv [1];
message_count = atoi (argv [2]); message_size = atoi (argv [2]);
message_size = atoi (argv [3]); message_count = atoi (argv [3]);
ctx = zmq_init (1, 1); ctx = zmq_init (1, 1);
assert (ctx); assert (ctx);
...@@ -92,6 +91,9 @@ int main (int argc, char *argv []) ...@@ -92,6 +91,9 @@ int main (int argc, char *argv [])
printf ("mean throughput: %d [msg/s]\n", (int) throughput); printf ("mean throughput: %d [msg/s]\n", (int) throughput);
printf ("mean throughput: %.3f [Mb/s]\n", (double) megabits); printf ("mean throughput: %.3f [Mb/s]\n", (double) megabits);
rc = zmq_close (s);
assert (rc == 0);
rc = zmq_term (ctx); rc = zmq_term (ctx);
assert (rc == 0); assert (rc == 0);
......
...@@ -39,13 +39,13 @@ int main (int argc, char *argv []) ...@@ -39,13 +39,13 @@ int main (int argc, char *argv [])
double latency; double latency;
if (argc != 4) { if (argc != 4) {
printf ("usage: remote_lat <connect-to> <roundtrip-count> " printf ("usage: remote_lat <connect-to> <message-size> "
"<message-size>\n"); "<roundtrip-count>\n");
return 1; return 1;
} }
connect_to = argv [1]; connect_to = argv [1];
roundtrip_count = atoi (argv [2]); message_size = atoi (argv [2]);
message_size = atoi (argv [3]); roundtrip_count = atoi (argv [3]);
ctx = zmq_init (1, 1); ctx = zmq_init (1, 1);
assert (ctx); assert (ctx);
...@@ -87,6 +87,9 @@ int main (int argc, char *argv []) ...@@ -87,6 +87,9 @@ int main (int argc, char *argv [])
printf ("roundtrip count: %d\n", (int) roundtrip_count); printf ("roundtrip count: %d\n", (int) roundtrip_count);
printf ("average latency: %.3f [us]\n", (double) latency); printf ("average latency: %.3f [us]\n", (double) latency);
rc = zmq_close (s);
assert (rc == 0);
rc = zmq_term (ctx); rc = zmq_term (ctx);
assert (rc == 0); assert (rc == 0);
......
...@@ -35,13 +35,13 @@ int main (int argc, char *argv []) ...@@ -35,13 +35,13 @@ int main (int argc, char *argv [])
struct zmq_msg_t msg; struct zmq_msg_t msg;
if (argc != 4) { if (argc != 4) {
printf ("usage: remote_thr <connect-to> <message-count> " printf ("usage: remote_thr <connect-to> <message-size> "
"<message-size>\n"); "<message-count>\n");
return 1; return 1;
} }
connect_to = argv [1]; connect_to = argv [1];
message_count = atoi (argv [2]); message_size = atoi (argv [2]);
message_size = atoi (argv [3]); message_count = atoi (argv [3]);
ctx = zmq_init (1, 1); ctx = zmq_init (1, 1);
assert (ctx); assert (ctx);
...@@ -63,6 +63,9 @@ int main (int argc, char *argv []) ...@@ -63,6 +63,9 @@ int main (int argc, char *argv [])
sleep (10); sleep (10);
rc = zmq_close (s);
assert (rc == 0);
rc = zmq_term (ctx); rc = zmq_term (ctx);
assert (rc == 0); assert (rc == 0);
......
...@@ -27,13 +27,13 @@ ...@@ -27,13 +27,13 @@
int main (int argc, char *argv []) int main (int argc, char *argv [])
{ {
if (argc != 4) { if (argc != 4) {
printf ("usage: local_lat <bind-to> <roundtrip-count> " printf ("usage: local_lat <bind-to> <message-size> "
"<message-size>\n"); "<roundtrip-count>\n");
return 1; return 1;
} }
const char *bind_to = argv [1]; const char *bind_to = argv [1];
int roundtrip_count = atoi (argv [2]); size_t message_size = (size_t) atoi (argv [2]);
size_t message_size = (size_t) atoi (argv [3]); int roundtrip_count = atoi (argv [3]);
zmq::context_t ctx (1, 1); zmq::context_t ctx (1, 1);
......
...@@ -28,13 +28,13 @@ ...@@ -28,13 +28,13 @@
int main (int argc, char *argv []) int main (int argc, char *argv [])
{ {
if (argc != 4) { if (argc != 4) {
printf ("usage: local_thr <bind-to> <message-count> " printf ("usage: local_thr <bind-to> <message-size> "
"<message-size>\n"); "<message-count>\n");
return 1; return 1;
} }
const char *bind_to = argv [1]; const char *bind_to = argv [1];
int message_count = atoi (argv [2]); size_t message_size = (size_t) atoi (argv [2]);
size_t message_size = (size_t) atoi (argv [3]); int message_count = atoi (argv [3]);
zmq::context_t ctx (1, 1); zmq::context_t ctx (1, 1);
......
...@@ -27,13 +27,13 @@ ...@@ -27,13 +27,13 @@
int main (int argc, char *argv []) int main (int argc, char *argv [])
{ {
if (argc != 4) { if (argc != 4) {
printf ("usage: remote_lat <connect-to> <roundtrip-count> " printf ("usage: remote_lat <connect-to> <message-size> "
"<message-size>\n"); "<roundtrip-count>\n");
return 1; return 1;
} }
const char *connect_to = argv [1]; const char *connect_to = argv [1];
int roundtrip_count = atoi (argv [2]); size_t message_size = (size_t) atoi (argv [2]);
size_t message_size = (size_t) atoi (argv [3]); int roundtrip_count = atoi (argv [3]);
zmq::context_t ctx (1, 1); zmq::context_t ctx (1, 1);
......
...@@ -27,13 +27,13 @@ ...@@ -27,13 +27,13 @@
int main (int argc, char *argv []) int main (int argc, char *argv [])
{ {
if (argc != 4) { if (argc != 4) {
printf ("usage: remote_thr <connect-to> <message-count> " printf ("usage: remote_thr <connect-to> <message-size> "
"<message-size>\n"); "<message-count>\n");
return 1; return 1;
} }
const char *connect_to = argv [1]; const char *connect_to = argv [1];
int message_count = atoi (argv [2]); size_t message_size = (size_t) atoi (argv [2]);
size_t message_size = (size_t) atoi (argv [3]); int message_count = atoi (argv [3]);
zmq::context_t ctx (1, 1); zmq::context_t ctx (1, 1);
......
EXTRA_DIST = \
local_lat.py \
remote_lat.py \
local_thr.py \
remote_thr.py
...@@ -23,13 +23,13 @@ import libpyzmq ...@@ -23,13 +23,13 @@ import libpyzmq
def main (): def main ():
if len (sys.argv) != 4: if len (sys.argv) != 4:
print 'usage: local_lat <bind-to> <roundtrip-count> <message-size>' print 'usage: local_lat <bind-to> <message-size> <roundtrip-count>'
sys.exit (1) sys.exit (1)
try: try:
bind_to = sys.argv [1] bind_to = sys.argv [1]
roundtrip_count = int (sys.argv [2]) message_size = int (sys.argv [2])
message_size = int (sys.argv [3]) roundtrip_count = int (sys.argv [3])
except (ValueError, OverflowError), e: except (ValueError, OverflowError), e:
print 'message-size and roundtrip-count must be integers' print 'message-size and roundtrip-count must be integers'
sys.exit (1) sys.exit (1)
......
...@@ -23,7 +23,7 @@ import libpyzmq ...@@ -23,7 +23,7 @@ import libpyzmq
def main (): def main ():
if len(sys.argv) != 4: if len(sys.argv) != 4:
print 'usage: remote_lat <connect-to> <roundtrip-count> <message-size>' print 'usage: remote_lat <connect-to> <message-size> <roundtrip-count>'
sys.exit (1) sys.exit (1)
try: try:
...@@ -49,7 +49,7 @@ def main (): ...@@ -49,7 +49,7 @@ def main ():
end = datetime.now () end = datetime.now ()
delta = (end - start).microseconds + 1000000 * (end - start).seconds delta = (end - start).microseconds + 1000000 * (end - start).seconds
latency = delta / roundtrip_count / 2 latency = float (delta) / roundtrip_count / 2
print "message size: %.0f [B]" % (message_size, ) print "message size: %.0f [B]" % (message_size, )
print "roundtrip count: %.0f" % (roundtrip_count, ) print "roundtrip count: %.0f" % (roundtrip_count, )
......
...@@ -27,7 +27,7 @@ def main (): ...@@ -27,7 +27,7 @@ def main ():
sys.exit (1) sys.exit (1)
try: try:
connect_to = argv [1] connect_to = sys.argv [1]
message_size = int (sys.argv [2]) message_size = int (sys.argv [2])
message_count = int (sys.argv [3]) message_count = int (sys.argv [3])
except (ValueError, OverflowError), e: except (ValueError, OverflowError), e:
......
...@@ -33,7 +33,6 @@ struct context_t ...@@ -33,7 +33,6 @@ struct context_t
PyObject *context_new (PyTypeObject *type, PyObject *args, PyObject *kwds) PyObject *context_new (PyTypeObject *type, PyObject *args, PyObject *kwds)
{ {
printf ("context_new\n");
context_t *self = (context_t*) type->tp_alloc (type, 0); context_t *self = (context_t*) type->tp_alloc (type, 0);
if (self) if (self)
...@@ -45,34 +44,27 @@ printf ("context_new\n"); ...@@ -45,34 +44,27 @@ printf ("context_new\n");
int context_init (context_t *self, PyObject *args, PyObject *kwdict) int context_init (context_t *self, PyObject *args, PyObject *kwdict)
{ {
printf ("context_init\n");
int app_threads; int app_threads;
int io_threads; int io_threads;
static const char *kwlist [] = {"app_threads", "io_threads", NULL}; static const char *kwlist [] = {"app_threads", "io_threads", NULL};
if (!PyArg_ParseTupleAndKeywords (args, kwdict, "ii", (char**) kwlist, if (!PyArg_ParseTupleAndKeywords (args, kwdict, "ii", (char**) kwlist,
&app_threads, &io_threads)) { &app_threads, &io_threads)) {
PyErr_SetString (PyExc_SystemError, "invalid arguments"); PyErr_SetString (PyExc_SystemError, "invalid arguments");
printf ("context_init err1\n");
return -1; // ? return -1; // ?
} }
printf ("app_threads=%d io_threads=%d\n", app_threads, io_threads);
assert (!self->handle); assert (!self->handle);
self->handle = zmq_init (app_threads, io_threads); self->handle = zmq_init (app_threads, io_threads);
if (!self->handle) { if (!self->handle) {
PyErr_SetString (PyExc_SystemError, strerror (errno)); PyErr_SetString (PyExc_SystemError, strerror (errno));
return -1; // ? return -1; // ?
printf ("context_init err2\n");
} }
printf ("context_init ok\n");
return 0; return 0;
} }
void context_dealloc (context_t *self) void context_dealloc (context_t *self)
{ {
printf ("context_dealloc\n");
if (self->handle) { if (self->handle) {
int rc = zmq_term (self->handle); int rc = zmq_term (self->handle);
if (rc != 0) if (rc != 0)
...@@ -90,7 +82,6 @@ struct socket_t ...@@ -90,7 +82,6 @@ struct socket_t
PyObject *socket_new (PyTypeObject *type, PyObject *args, PyObject *kwds) PyObject *socket_new (PyTypeObject *type, PyObject *args, PyObject *kwds)
{ {
printf ("socket_new\n");
socket_t *self = (socket_t*) type->tp_alloc (type, 0); socket_t *self = (socket_t*) type->tp_alloc (type, 0);
if (self) if (self)
...@@ -101,7 +92,6 @@ printf ("socket_new\n"); ...@@ -101,7 +92,6 @@ printf ("socket_new\n");
int socket_init (socket_t *self, PyObject *args, PyObject *kwdict) int socket_init (socket_t *self, PyObject *args, PyObject *kwdict)
{ {
printf ("socket_init\n");
context_t *context; context_t *context;
int socket_type; int socket_type;
static const char *kwlist [] = {"context", "type", NULL}; static const char *kwlist [] = {"context", "type", NULL};
...@@ -124,7 +114,6 @@ printf ("socket_init\n"); ...@@ -124,7 +114,6 @@ printf ("socket_init\n");
void socket_dealloc (socket_t *self) void socket_dealloc (socket_t *self)
{ {
printf ("socket_dealloc\n");
if (self->handle) { if (self->handle) {
int rc = zmq_close (self->handle); int rc = zmq_close (self->handle);
if (rc != 0) if (rc != 0)
...@@ -340,7 +329,7 @@ static PyTypeObject context_type = ...@@ -340,7 +329,7 @@ static PyTypeObject context_type =
0, /* tp_dictoffset */ 0, /* tp_dictoffset */
(initproc) context_init, /* tp_init */ (initproc) context_init, /* tp_init */
0, /* tp_alloc */ 0, /* tp_alloc */
context_new, /* tp_new */ context_new /* tp_new */
}; };
static PyMethodDef socket_methods [] = static PyMethodDef socket_methods [] =
...@@ -390,7 +379,7 @@ static PyTypeObject socket_type = ...@@ -390,7 +379,7 @@ static PyTypeObject socket_type =
{ {
PyObject_HEAD_INIT (NULL) PyObject_HEAD_INIT (NULL)
0, 0,
"libpyzmq.Socket" , /* tp_name */ "libpyzmq.Socket", /* tp_name */
sizeof (socket_t), /* tp_basicsize */ sizeof (socket_t), /* tp_basicsize */
0, /* tp_itemsize */ 0, /* tp_itemsize */
(destructor) socket_dealloc, /* tp_dealloc */ (destructor) socket_dealloc, /* tp_dealloc */
...@@ -426,7 +415,7 @@ static PyTypeObject socket_type = ...@@ -426,7 +415,7 @@ static PyTypeObject socket_type =
0, /* tp_dictoffset */ 0, /* tp_dictoffset */
(initproc) socket_init, /* tp_init */ (initproc) socket_init, /* tp_init */
0, /* tp_alloc */ 0, /* tp_alloc */
socket_new, /* tp_new */ socket_new /* tp_new */
}; };
static PyMethodDef module_methods [] = {{ NULL, NULL, 0, NULL }}; static PyMethodDef module_methods [] = {{ NULL, NULL, 0, NULL }};
...@@ -442,8 +431,10 @@ static const char* libpyzmq_doc = ...@@ -442,8 +431,10 @@ static const char* libpyzmq_doc =
PyMODINIT_FUNC initlibpyzmq () PyMODINIT_FUNC initlibpyzmq ()
{ {
if (PyType_Ready (&context_type) < 0 && PyType_Ready (&socket_type) < 0) int rc = PyType_Ready (&context_type);
return; assert (rc == 0);
rc = PyType_Ready (&socket_type);
assert (rc == 0);
PyObject *module = Py_InitModule3 ("libpyzmq", module_methods, PyObject *module = Py_InitModule3 ("libpyzmq", module_methods,
libpyzmq_doc); libpyzmq_doc);
...@@ -451,8 +442,8 @@ PyMODINIT_FUNC initlibpyzmq () ...@@ -451,8 +442,8 @@ PyMODINIT_FUNC initlibpyzmq ()
return; return;
Py_INCREF (&context_type); Py_INCREF (&context_type);
Py_INCREF (&socket_type);
PyModule_AddObject (module, "Context", (PyObject*) &context_type); PyModule_AddObject (module, "Context", (PyObject*) &context_type);
Py_INCREF (&socket_type);
PyModule_AddObject (module, "Socket", (PyObject*) &socket_type); PyModule_AddObject (module, "Socket", (PyObject*) &socket_type);
PyObject *dict = PyModule_GetDict (module); PyObject *dict = PyModule_GetDict (module);
......
...@@ -51,9 +51,7 @@ zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) : ...@@ -51,9 +51,7 @@ zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) :
zmq::app_thread_t::~app_thread_t () zmq::app_thread_t::~app_thread_t ()
{ {
// Destroy all the sockets owned by this application thread. zmq_assert (sockets.empty ());
for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++)
delete *it;
} }
zmq::i_signaler *zmq::app_thread_t::get_signaler () zmq::i_signaler *zmq::app_thread_t::get_signaler ()
......
...@@ -30,7 +30,9 @@ ...@@ -30,7 +30,9 @@
#include "windows.h" #include "windows.h"
#endif #endif
zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) :
sockets (0),
terminated (false)
{ {
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
// Intialise Windows sockets. Note that WSAStartup can be called multiple // Intialise Windows sockets. Note that WSAStartup can be called multiple
...@@ -68,6 +70,20 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) ...@@ -68,6 +70,20 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_)
io_threads [i]->start (); io_threads [i]->start ();
} }
int zmq::dispatcher_t::term ()
{
term_sync.lock ();
zmq_assert (!terminated);
terminated = true;
bool destroy = (sockets == 0);
term_sync.unlock ();
if (destroy)
delete this;
return 0;
}
zmq::dispatcher_t::~dispatcher_t () zmq::dispatcher_t::~dispatcher_t ()
{ {
// Close all application theads, sockets, io_objects etc. // Close all application theads, sockets, io_objects etc.
...@@ -111,9 +127,27 @@ zmq::socket_base_t *zmq::dispatcher_t::create_socket (int type_) ...@@ -111,9 +127,27 @@ zmq::socket_base_t *zmq::dispatcher_t::create_socket (int type_)
} }
threads_sync.unlock (); threads_sync.unlock ();
term_sync.lock ();
sockets++;
term_sync.unlock ();
return thread->create_socket (type_); return thread->create_socket (type_);
} }
void zmq::dispatcher_t::destroy_socket ()
{
// If zmq_term was already called and there are no more sockets,
// terminate the whole 0MQ infrastructure.
term_sync.lock ();
zmq_assert (sockets > 0);
sockets--;
bool destroy = (sockets == 0 && terminated);
term_sync.unlock ();
if (destroy)
delete this;
}
zmq::app_thread_t *zmq::dispatcher_t::choose_app_thread () zmq::app_thread_t *zmq::dispatcher_t::choose_app_thread ()
{ {
// Check whether thread ID is already assigned. If so, return it. // Check whether thread ID is already assigned. If so, return it.
......
...@@ -52,12 +52,18 @@ namespace zmq ...@@ -52,12 +52,18 @@ namespace zmq
// signalers. // signalers.
dispatcher_t (int app_threads_, int io_threads_); dispatcher_t (int app_threads_, int io_threads_);
// To be called to terminate the whole infrastructure (zmq_term). // This function is called when user invokes zmq_term. If there are
~dispatcher_t (); // no more sockets open it'll cause all the infrastructure to be shut
// down. If there are open sockets still, the deallocation happens
// after the last one is closed.
int term ();
// Create a socket. // Create a socket.
class socket_base_t *create_socket (int type_); class socket_base_t *create_socket (int type_);
// Destroy a socket.
void destroy_socket ();
// Returns number of thread slots in the dispatcher. To be used by // Returns number of thread slots in the dispatcher. To be used by
// individual threads to find out how many distinct signals can be // individual threads to find out how many distinct signals can be
// received. // received.
...@@ -93,6 +99,8 @@ namespace zmq ...@@ -93,6 +99,8 @@ namespace zmq
private: private:
~dispatcher_t ();
// Returns the app thread associated with the current thread. // Returns the app thread associated with the current thread.
// NULL if we are out of app thread slots. // NULL if we are out of app thread slots.
class app_thread_t *choose_app_thread (); class app_thread_t *choose_app_thread ();
...@@ -130,6 +138,17 @@ namespace zmq ...@@ -130,6 +138,17 @@ namespace zmq
// Synchronisation of access to the pipes repository. // Synchronisation of access to the pipes repository.
mutex_t pipes_sync; mutex_t pipes_sync;
// Number of sockets alive.
int sockets;
// If true, zmq_term was already called. When last socket is closed
// the whole 0MQ infrastructure should be deallocated.
bool terminated;
// Synchronisation of access to the termination data (socket count
// and 'terminated' flag).
mutex_t term_sync;
dispatcher_t (const dispatcher_t&); dispatcher_t (const dispatcher_t&);
void operator = (const dispatcher_t&); void operator = (const dispatcher_t&);
}; };
......
...@@ -53,6 +53,11 @@ int zmq::object_t::get_thread_slot () ...@@ -53,6 +53,11 @@ int zmq::object_t::get_thread_slot ()
return thread_slot; return thread_slot;
} }
zmq::dispatcher_t *zmq::object_t::get_dispatcher ()
{
return dispatcher;
}
void zmq::object_t::process_command (command_t &cmd_) void zmq::object_t::process_command (command_t &cmd_)
{ {
switch (cmd_.type) { switch (cmd_.type) {
......
...@@ -40,6 +40,7 @@ namespace zmq ...@@ -40,6 +40,7 @@ namespace zmq
~object_t (); ~object_t ();
int get_thread_slot (); int get_thread_slot ();
dispatcher_t *get_dispatcher ();
void process_command (struct command_t &cmd_); void process_command (struct command_t &cmd_);
// Allow pipe to access corresponding dispatcher functions. // Allow pipe to access corresponding dispatcher functions.
......
...@@ -54,7 +54,12 @@ bool zmq::session_t::read (::zmq_msg_t *msg_) ...@@ -54,7 +54,12 @@ bool zmq::session_t::read (::zmq_msg_t *msg_)
bool zmq::session_t::write (::zmq_msg_t *msg_) bool zmq::session_t::write (::zmq_msg_t *msg_)
{ {
return out_pipe->write (msg_); if (out_pipe->write (msg_)) {
zmq_msg_init (msg_);
return true;
}
return false;
} }
void zmq::session_t::flush () void zmq::session_t::flush ()
......
...@@ -24,7 +24,7 @@ ...@@ -24,7 +24,7 @@
#include "socket_base.hpp" #include "socket_base.hpp"
#include "app_thread.hpp" #include "app_thread.hpp"
#include "err.hpp" #include "dispatcher.hpp"
#include "zmq_listener.hpp" #include "zmq_listener.hpp"
#include "zmq_connecter.hpp" #include "zmq_connecter.hpp"
#include "msg_content.hpp" #include "msg_content.hpp"
...@@ -34,6 +34,7 @@ ...@@ -34,6 +34,7 @@
#include "owned.hpp" #include "owned.hpp"
#include "uuid.hpp" #include "uuid.hpp"
#include "pipe.hpp" #include "pipe.hpp"
#include "err.hpp"
zmq::socket_base_t::socket_base_t (app_thread_t *parent_) : zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
object_t (parent_), object_t (parent_),
...@@ -288,7 +289,16 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) ...@@ -288,7 +289,16 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
int zmq::socket_base_t::close () int zmq::socket_base_t::close ()
{ {
app_thread->remove_socket (this); app_thread->remove_socket (this);
// Pointer to the dispatcher must be retrieved before the socket is
// deallocated. Afterwards it is not available.
dispatcher_t *dispatcher = get_dispatcher ();
delete this; delete this;
// This function must be called after the socket is completely deallocated
// as it may cause termination of the whole 0MQ infrastructure.
dispatcher->destroy_socket ();
return 0; return 0;
} }
......
...@@ -183,8 +183,7 @@ void *zmq_init (int app_threads_, int io_threads_) ...@@ -183,8 +183,7 @@ void *zmq_init (int app_threads_, int io_threads_)
int zmq_term (void *dispatcher_) int zmq_term (void *dispatcher_)
{ {
delete (zmq::dispatcher_t*) dispatcher_; return ((zmq::dispatcher_t*) dispatcher_)->term ();
return 0;
} }
void *zmq_socket (void *dispatcher_, int type_) void *zmq_socket (void *dispatcher_, int type_)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment