Commit 64e1c181 authored by Richard Newton's avatar Richard Newton

Implement non-blocking shutdown command that unblocks other threads waiting on blocking operations.

parent a601b3f6
......@@ -581,6 +581,7 @@ set(tests
test_system
test_connect_delay
test_connect_resolve
test_ctx_destroy
test_ctx_options
test_disconnect_inproc
test_hwm
......
zmq_ctx_shutdown(3)
==================
NAME
----
zmq_ctx_shutdown - shutdown a 0MQ context
SYNOPSIS
--------
*int zmq_ctx_shutdown (void '*context');*
DESCRIPTION
-----------
The _zmq_ctx_shutdown()_ function shall shutdown the 0MQ context 'context'.
Context shutdown will cause any blocking operations currently in progress on
sockets open within 'context' to return immediately with an error code of ETERM.
With the exception of _zmq_close()_, any further operations on sockets open within
'context' shall fail with an error code of ETERM.
This function is optional, client code is still required to call the linkzmq:zmq_ctx_term[3]
function to free all resources allocated by zeromq.
RETURN VALUE
------------
The _zmq_ctx_shutdown()_ function shall return zero if successful. Otherwise
it shall return `-1` and set 'errno' to one of the values defined below.
ERRORS
------
*EFAULT*::
The provided 'context' was invalid.
SEE ALSO
--------
linkzmq:zmq[7]
linkzmq:zmq_init[3]
linkzmq:zmq_ctx_term[3]
linkzmq:zmq_close[3]
linkzmq:zmq_setsockopt[3]
AUTHORS
-------
This page was written by the 0MQ community. To make a change please
read the 0MQ Contribution Policy at <http://www.zeromq.org/docs:contributing>.
......@@ -178,6 +178,7 @@ ZMQ_EXPORT const char *zmq_strerror (int errnum);
ZMQ_EXPORT void *zmq_ctx_new (void);
ZMQ_EXPORT int zmq_ctx_term (void *context);
ZMQ_EXPORT int zmq_ctx_shutdown (void *ctx_);
ZMQ_EXPORT int zmq_ctx_set (void *context, int option, int optval);
ZMQ_EXPORT int zmq_ctx_get (void *context, int option);
......
......@@ -143,6 +143,25 @@ int zmq::ctx_t::terminate ()
return 0;
}
int zmq::ctx_t::shutdown ()
{
slot_sync.lock ();
if (!starting && !terminating) {
terminating = true;
// Send stop command to sockets so that any blocking calls
// can be interrupted. If there are no sockets we can ask reaper
// thread to stop.
for (sockets_t::size_type i = 0; i != sockets.size (); i++)
sockets [i]->stop ();
if (sockets.empty ())
reaper->stop ();
}
slot_sync.unlock ();
return 0;
}
int zmq::ctx_t::set (int option_, int optval_)
{
int rc = 0;
......
......@@ -69,6 +69,15 @@ namespace zmq
// after the last one is closed.
int terminate ();
// This function starts the terminate process by unblocking any blocking
// operations currently in progress and stopping any more socket activity
// (except zmq_close).
// This function is non-blocking.
// terminate must still be called afterwards.
// This function is optional, terminate will unblock any current
// operations as well.
int shutdown();
// Set and get context properties.
int set (int option_, int optval_);
int get (int option_);
......
......@@ -190,6 +190,16 @@ int zmq_ctx_term (void *ctx_)
return rc;
}
int zmq_ctx_shutdown (void *ctx_)
{
if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
errno = EFAULT;
return -1;
}
return ((zmq::ctx_t*) ctx_)->shutdown ();
}
int zmq_ctx_set (void *ctx_, int option_, int optval_)
{
if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
......
......@@ -24,6 +24,7 @@ noinst_PROGRAMS = test_system \
test_stream \
test_disconnect_inproc \
test_ctx_options \
test_ctx_destroy \
test_security_null \
test_security_plain \
test_security_curve \
......
/*
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zmq.h"
#include "../include/zmq_utils.h"
#include <string.h>
#include "testutil.hpp"
static void receiver (void *socket)
{
char buffer[16];
int rc = zmq_recv (socket, &buffer, sizeof (buffer), 0);
assert(rc == -1);
}
void test_ctx_destroy()
{
int rc;
// Set up our context and sockets
void *ctx = zmq_ctx_new ();
assert (ctx);
void *socket = zmq_socket (ctx, ZMQ_PULL);
assert (socket);
// Close the socket
rc = zmq_close (socket);
assert (rc == 0);
// Destroy the context
rc = zmq_ctx_destroy (ctx);
assert (rc == 0);
}
void test_ctx_shutdown()
{
int rc;
// Set up our context and sockets
void *ctx = zmq_ctx_new ();
assert (ctx);
void *socket = zmq_socket (ctx, ZMQ_PULL);
assert (socket);
// Spawn a thread to receive on socket
void *receiver_thread = zmq_threadstart (&receiver, socket);
// Shutdown context, if we used destroy here we would deadlock.
rc = zmq_ctx_shutdown (ctx);
assert (rc == 0);
// Wait for thread to finish
zmq_threadclose (receiver_thread);
// Close the socket.
rc = zmq_close (socket);
assert (rc == 0);
// Destory the context, will now not hang as we have closed the socket.
rc = zmq_ctx_destroy (ctx);
assert (rc == 0);
}
int main (void)
{
setup_test_environment();
test_ctx_destroy();
test_ctx_shutdown();
return 0;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment