Commit 6fa274eb authored by Pieter Hintjens's avatar Pieter Hintjens

Merge pull request #640 from mattconnolly/fork

Adding ability for a context to be terminated in a forked child process
parents 282765ca 17717558
......@@ -60,6 +60,7 @@ tests/test_spec_req
tests/test_spec_router
tests/test_req_request_ids
tests/test_req_strict
tests/test_fork
src/platform.hpp*
src/stamp-h1
perf/local_lat
......
......@@ -388,7 +388,7 @@ AM_CONDITIONAL(ON_ANDROID, test "x$libzmq_on_android" = "xyes")
# Checks for library functions.
AC_TYPE_SIGNAL
AC_CHECK_FUNCS(perror gettimeofday clock_gettime memset socket getifaddrs freeifaddrs)
AC_CHECK_FUNCS(perror gettimeofday clock_gettime memset socket getifaddrs freeifaddrs fork)
AC_CHECK_HEADERS([alloca.h])
LIBZMQ_CHECK_SOCK_CLOEXEC([AC_DEFINE(
[ZMQ_HAVE_SOCK_CLOEXEC],
......
......@@ -49,6 +49,9 @@ zmq::ctx_t::ctx_t () :
io_thread_count (ZMQ_IO_THREADS_DFLT),
ipv6 (false)
{
#ifdef HAVE_FORK
pid = getpid();
#endif
}
bool zmq::ctx_t::check_tag ()
......@@ -89,6 +92,19 @@ int zmq::ctx_t::terminate ()
slot_sync.lock ();
if (!starting) {
#ifdef HAVE_FORK
if (pid != getpid())
{
// we are a forked child process. Close all file descriptors
// inherited from the parent.
for (sockets_t::size_type i = 0; i != sockets.size (); i++)
{
sockets[i]->get_mailbox()->forked();
}
term_mailbox.forked();
}
#endif
// Check whether termination was already underway, but interrupted and now
// restarted.
bool restarted = terminating;
......
......@@ -167,6 +167,11 @@ namespace zmq
ctx_t (const ctx_t&);
const ctx_t &operator = (const ctx_t&);
#ifdef HAVE_FORK
// the process that created this context. Used to detect forking.
pid_t pid;
#endif
};
}
......
......@@ -48,6 +48,9 @@ zmq::kqueue_t::kqueue_t () :
// Create event queue
kqueue_fd = kqueue ();
errno_assert (kqueue_fd != -1);
#ifdef HAVE_FORK
pid = getpid();
#endif
}
zmq::kqueue_t::~kqueue_t ()
......@@ -161,6 +164,13 @@ void zmq::kqueue_t::loop ()
timespec ts = {timeout / 1000, (timeout % 1000) * 1000000};
int n = kevent (kqueue_fd, NULL, 0, &ev_buf [0], max_io_events,
timeout ? &ts: NULL);
#ifdef HAVE_FORK
if (unlikely(pid != getpid())) {
//printf("zmq::kqueue_t::loop aborting on forked child %d\n", (int)getpid());
// simply exit the loop in a forked process.
return;
}
#endif
if (n == -1) {
errno_assert (errno == EINTR);
continue;
......
......@@ -25,6 +25,7 @@
#if defined ZMQ_USE_KQUEUE
#include <vector>
#include <unistd.h>
#include "fd.hpp"
#include "thread.hpp"
......@@ -94,6 +95,11 @@ namespace zmq
kqueue_t (const kqueue_t&);
const kqueue_t &operator = (const kqueue_t&);
#ifdef HAVE_FORK
// the process that created this context. Used to detect forking.
pid_t pid;
#endif
};
typedef kqueue_t poller_t;
......
......@@ -82,4 +82,3 @@ int zmq::mailbox_t::recv (command_t *cmd_, int timeout_)
zmq_assert (ok);
return 0;
}
......@@ -44,6 +44,13 @@ namespace zmq
void send (const command_t &cmd_);
int recv (command_t *cmd_, int timeout_);
#ifdef HAVE_FORK
// close the file descriptors in the signaller. This is used in a forked
// child process to close the file descriptors so that they do not interfere
// with the context in the parent process.
void forked() { signaler.forked(); }
#endif
private:
// The pipe to store actual commands.
......
......@@ -31,6 +31,10 @@ zmq::reaper_t::reaper_t (class ctx_t *ctx_, uint32_t tid_) :
mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
poller->set_pollin (mailbox_handle);
#ifdef HAVE_FORK
pid = getpid();
#endif
}
zmq::reaper_t::~reaper_t ()
......@@ -57,6 +61,13 @@ void zmq::reaper_t::stop ()
void zmq::reaper_t::in_event ()
{
while (true) {
#ifdef HAVE_FORK
if (unlikely(pid != getpid()))
{
//printf("zmq::reaper_t::in_event return in child process %d\n", (int)getpid());
return;
}
#endif
// Get the next command. If there is none, exit.
command_t cmd;
......
......@@ -72,6 +72,11 @@ namespace zmq
reaper_t (const reaper_t&);
const reaper_t &operator = (const reaper_t&);
#ifdef HAVE_FORK
// the process that created this context. Used to detect forking.
pid_t pid;
#endif
};
}
......
......@@ -86,6 +86,10 @@ zmq::signaler_t::signaler_t ()
// Set both fds to non-blocking mode.
unblock_socket (w);
unblock_socket (r);
#ifdef HAVE_FORK
pid = getpid();
#endif
}
zmq::signaler_t::~signaler_t ()
......@@ -117,6 +121,12 @@ zmq::fd_t zmq::signaler_t::get_fd ()
void zmq::signaler_t::send ()
{
#if HAVE_FORK
if (unlikely(pid != getpid())) {
//printf("Child process %d signaler_t::send returning without sending #1\n", getpid());
return; // do not send anything in forked child context
}
#endif
#if defined ZMQ_HAVE_EVENTFD
const uint64_t inc = 1;
ssize_t sz = write (w, &inc, sizeof (inc));
......@@ -132,6 +142,13 @@ void zmq::signaler_t::send ()
ssize_t nbytes = ::send (w, &dummy, sizeof (dummy), 0);
if (unlikely (nbytes == -1 && errno == EINTR))
continue;
#if HAVE_FORK
if (unlikely(pid != getpid())) {
//printf("Child process %d signaler_t::send returning without sending #2\n", getpid());
errno = EINTR;
break;
}
#endif
zmq_assert (nbytes == sizeof (dummy));
break;
}
......@@ -140,6 +157,17 @@ void zmq::signaler_t::send ()
int zmq::signaler_t::wait (int timeout_)
{
#ifdef HAVE_FORK
if (unlikely(pid != getpid()))
{
// we have forked and the file descriptor is closed. Emulate an interupt
// response.
//printf("Child process %d signaler_t::wait returning simulating interrupt #1\n", getpid());
errno = EINTR;
return -1;
}
#endif
#ifdef ZMQ_SIGNALER_WAIT_BASED_ON_POLL
struct pollfd pfd;
......@@ -155,6 +183,16 @@ int zmq::signaler_t::wait (int timeout_)
errno = EAGAIN;
return -1;
}
#ifdef HAVE_FORK
if (unlikely(pid != getpid()))
{
// we have forked and the file descriptor is closed. Emulate an interupt
// response.
//printf("Child process %d signaler_t::wait returning simulating interrupt #2\n", getpid());
errno = EINTR;
return -1;
}
#endif
zmq_assert (rc == 1);
zmq_assert (pfd.revents & POLLIN);
return 0;
......@@ -225,6 +263,32 @@ void zmq::signaler_t::recv ()
#endif
}
#ifdef HAVE_FORK
void zmq::signaler_t::forked()
{
int oldr = r;
#if !defined ZMQ_HAVE_EVENTFD
int oldw = w;
#endif
// replace the file descriptors created in the parent with new
// ones, and close the inherited ones
make_fdpair(&r, &w);
#if defined ZMQ_HAVE_EVENTFD
int rc = close (oldr);
errno_assert (rc == 0);
#else
int rc = close (oldw);
errno_assert (rc == 0);
rc = close (oldr);
errno_assert (rc == 0);
#endif
}
#endif
int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
{
#if defined ZMQ_HAVE_EVENTFD
......
......@@ -20,6 +20,10 @@
#ifndef __ZMQ_SIGNALER_HPP_INCLUDED__
#define __ZMQ_SIGNALER_HPP_INCLUDED__
#ifdef HAVE_FORK
#include <unistd.h>
#endif
#include "fd.hpp"
namespace zmq
......@@ -41,7 +45,13 @@ namespace zmq
void send ();
int wait (int timeout_);
void recv ();
#ifdef HAVE_FORK
// close the file descriptors in a forked child process so that they
// do not interfere with the context in the parent process.
void forked();
#endif
private:
// Creates a pair of filedescriptors that will be used
......@@ -55,6 +65,14 @@ namespace zmq
// Disable copying of signaler_t object.
signaler_t (const signaler_t&);
const signaler_t &operator = (const signaler_t&);
#ifdef HAVE_FORK
// the process that created this context. Used to detect forking.
pid_t pid;
// idempotent close of file descriptors that is safe to use by destructor
// and forked().
void close_internal();
#endif
};
}
......
......@@ -38,7 +38,8 @@ if !ON_MINGW
noinst_PROGRAMS += test_shutdown_stress \
test_pair_ipc \
test_reqrep_ipc \
test_timeo
test_timeo \
test_fork
endif
test_pair_inproc_SOURCES = test_pair_inproc.cpp testutil.hpp
......@@ -76,6 +77,7 @@ test_shutdown_stress_SOURCES = test_shutdown_stress.cpp
test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp
test_reqrep_ipc_SOURCES = test_reqrep_ipc.cpp testutil.hpp
test_timeo_SOURCES = test_timeo.cpp
test_fork_SOURCES = test_fork.cpp
endif
# Deprecated test cases
......
/*
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zmq.h"
#include "../include/zmq_utils.h"
#include <stdio.h>
#include <string.h>
#include "testutil.hpp"
#include <unistd.h>
#include <signal.h>
#include <stdlib.h>
#include <sys/wait.h>
const char* address = "tcp://127.0.0.1:6571";
#define NUM_MESSAGES 5
int main (void)
{
int rc;
setup_test_environment();
printf("parent: process id = %d\n", (int)getpid());
void *ctx = zmq_ctx_new ();
assert (ctx);
void* sock = zmq_socket(ctx, ZMQ_PULL);
assert(sock);
rc = zmq_bind(sock, address);
assert(rc == 0);
// wait for io threads to be running
usleep(100000);
printf("about to fork()\n");
int pid = fork();
if (pid == 0) {
// this is the child process
usleep(100000);
printf("child: process id = %d\n", (int)getpid());
printf("child: terminating inherited context...\n");
// close the socket, or the context gets stuck indefinitely
// zmq_close(sock);
zmq_term(ctx);
// create new context, socket, connect and send some messages
void *ctx2 = zmq_ctx_new ();
assert (ctx2);
void* push = zmq_socket(ctx2, ZMQ_PUSH);
assert(push);
rc = zmq_connect(push, address);
assert(rc == 0);
const char* message = "hello";
int i;
for (i = 0; i < NUM_MESSAGES; i += 1) {
printf("child: send message #%d\n", i);
zmq_send(push, message, strlen(message), 0);
usleep(100000);
}
printf("child: closing push socket\n");
zmq_close(push);
printf("child: destroying child context\n");
zmq_ctx_destroy(ctx2);
printf("child: done\n");
exit(0);
} else {
// this is the parent process
printf("parent: waiting for %d messages\n", NUM_MESSAGES);
char buffer[1024];
int i;
for (i = 0; i < NUM_MESSAGES; i += 1) {
int num_bytes = zmq_recv(sock, buffer, 1024, 0);
assert(num_bytes > 0);
buffer[num_bytes] = 0;
printf("parent: received #%d: %s\n", i, buffer);
}
int child_status;
while (true) {
rc = waitpid(pid, &child_status, 0);
if (rc == -1 && errno == EINTR) continue;
printf("parent: child exit code = %d, rc = %d, errno = %d\n", WEXITSTATUS(child_status), rc, errno);
assert(rc > 0);
// verify the status code of the child was zero.
assert(WEXITSTATUS(child_status) == 0);
break;
}
printf("parent: done.\n");
exit(0);
}
return 0;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment