Commit 0ac223c7 authored by Richard Newton's avatar Richard Newton

Merge pull request #870 from hintjens/master

Fixed issue #868
parents 1fe82ae0 4f56631f
...@@ -99,14 +99,11 @@ int zmq::ctx_t::terminate () ...@@ -99,14 +99,11 @@ int zmq::ctx_t::terminate ()
if (!starting) { if (!starting) {
#ifdef HAVE_FORK #ifdef HAVE_FORK
if (pid != getpid()) if (pid != getpid ()) {
{
// we are a forked child process. Close all file descriptors // we are a forked child process. Close all file descriptors
// inherited from the parent. // inherited from the parent.
for (sockets_t::size_type i = 0; i != sockets.size (); i++) for (sockets_t::size_type i = 0; i != sockets.size (); i++)
{
sockets[i]->get_mailbox()->forked(); sockets[i]->get_mailbox()->forked();
}
term_mailbox.forked(); term_mailbox.forked();
} }
...@@ -118,7 +115,6 @@ int zmq::ctx_t::terminate () ...@@ -118,7 +115,6 @@ int zmq::ctx_t::terminate ()
// First attempt to terminate the context. // First attempt to terminate the context.
if (!restarted) { if (!restarted) {
// First send stop command to sockets so that any blocking calls // First send stop command to sockets so that any blocking calls
// can be interrupted. If there are no sockets we can ask reaper // can be interrupted. If there are no sockets we can ask reaper
// thread to stop. // thread to stop.
...@@ -351,7 +347,6 @@ int zmq::ctx_t::register_endpoint (const char *addr_, endpoint_t &endpoint_) ...@@ -351,7 +347,6 @@ int zmq::ctx_t::register_endpoint (const char *addr_, endpoint_t &endpoint_)
errno = EADDRINUSE; errno = EADDRINUSE;
return -1; return -1;
} }
return 0; return 0;
} }
...@@ -369,7 +364,6 @@ void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_) ...@@ -369,7 +364,6 @@ void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_)
} }
++it; ++it;
} }
endpoints_sync.unlock (); endpoints_sync.unlock ();
} }
...@@ -401,17 +395,14 @@ void zmq::ctx_t::pend_connection (const char *addr_, pending_connection_t &pendi ...@@ -401,17 +395,14 @@ void zmq::ctx_t::pend_connection (const char *addr_, pending_connection_t &pendi
endpoints_sync.lock (); endpoints_sync.lock ();
endpoints_t::iterator it = endpoints.find (addr_); endpoints_t::iterator it = endpoints.find (addr_);
if (it == endpoints.end ()) if (it == endpoints.end ()) {
{
// Still no bind. // Still no bind.
pending_connection_.endpoint.socket->inc_seqnum (); pending_connection_.endpoint.socket->inc_seqnum ();
pending_connections.insert (pending_connections_t::value_type (std::string (addr_), pending_connection_)); pending_connections.insert (pending_connections_t::value_type (std::string (addr_), pending_connection_));
} }
else else
{
// Bind has happened in the mean time, connect directly // Bind has happened in the mean time, connect directly
connect_inproc_sockets(it->second.socket, it->second.options, pending_connection_, connect_side); connect_inproc_sockets(it->second.socket, it->second.options, pending_connection_, connect_side);
}
endpoints_sync.unlock (); endpoints_sync.unlock ();
} }
...@@ -423,22 +414,19 @@ void zmq::ctx_t::connect_pending (const char *addr_, zmq::socket_base_t *bind_so ...@@ -423,22 +414,19 @@ void zmq::ctx_t::connect_pending (const char *addr_, zmq::socket_base_t *bind_so
std::pair<pending_connections_t::iterator, pending_connections_t::iterator> pending = pending_connections.equal_range(addr_); std::pair<pending_connections_t::iterator, pending_connections_t::iterator> pending = pending_connections.equal_range(addr_);
for (pending_connections_t::iterator p = pending.first; p != pending.second; ++p) for (pending_connections_t::iterator p = pending.first; p != pending.second; ++p)
{
connect_inproc_sockets(bind_socket_, endpoints[addr_].options, p->second, bind_side); connect_inproc_sockets(bind_socket_, endpoints[addr_].options, p->second, bind_side);
}
pending_connections.erase(pending.first, pending.second); pending_connections.erase(pending.first, pending.second);
endpoints_sync.unlock (); endpoints_sync.unlock ();
} }
void zmq::ctx_t::connect_inproc_sockets(zmq::socket_base_t *bind_socket_, options_t& bind_options, pending_connection_t &pending_connection_, side side_) void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_,
options_t& bind_options, pending_connection_t &pending_connection_, side side_)
{ {
bind_socket_->inc_seqnum(); bind_socket_->inc_seqnum();
pending_connection_.bind_pipe->set_tid(bind_socket_->get_tid()); pending_connection_.bind_pipe->set_tid(bind_socket_->get_tid());
if (side_ == bind_side) if (side_ == bind_side) {
{
command_t cmd; command_t cmd;
cmd.type = command_t::bind; cmd.type = command_t::bind;
cmd.args.bind.pipe = pending_connection_.bind_pipe; cmd.args.bind.pipe = pending_connection_.bind_pipe;
...@@ -446,13 +434,12 @@ void zmq::ctx_t::connect_inproc_sockets(zmq::socket_base_t *bind_socket_, option ...@@ -446,13 +434,12 @@ void zmq::ctx_t::connect_inproc_sockets(zmq::socket_base_t *bind_socket_, option
bind_socket_->send_inproc_connected(pending_connection_.endpoint.socket); bind_socket_->send_inproc_connected(pending_connection_.endpoint.socket);
} }
else else
{
pending_connection_.connect_pipe->send_bind(bind_socket_, pending_connection_.bind_pipe, false); pending_connection_.connect_pipe->send_bind(bind_socket_, pending_connection_.bind_pipe, false);
}
int sndhwm = 0; int sndhwm = 0;
if (pending_connection_.endpoint.options.sndhwm != 0 && bind_options.rcvhwm != 0) if (pending_connection_.endpoint.options.sndhwm != 0 && bind_options.rcvhwm != 0)
sndhwm = pending_connection_.endpoint.options.sndhwm + bind_options.rcvhwm; sndhwm = pending_connection_.endpoint.options.sndhwm + bind_options.rcvhwm;
int rcvhwm = 0; int rcvhwm = 0;
if (pending_connection_.endpoint.options.rcvhwm != 0 && bind_options.sndhwm != 0) if (pending_connection_.endpoint.options.rcvhwm != 0 && bind_options.sndhwm != 0)
rcvhwm = pending_connection_.endpoint.options.rcvhwm + bind_options.sndhwm; rcvhwm = pending_connection_.endpoint.options.rcvhwm + bind_options.sndhwm;
...@@ -473,7 +460,8 @@ void zmq::ctx_t::connect_inproc_sockets(zmq::socket_base_t *bind_socket_, option ...@@ -473,7 +460,8 @@ void zmq::ctx_t::connect_inproc_sockets(zmq::socket_base_t *bind_socket_, option
msg_t id; msg_t id;
int rc = id.init_size (pending_connection_.endpoint.options.identity_size); int rc = id.init_size (pending_connection_.endpoint.options.identity_size);
errno_assert (rc == 0); errno_assert (rc == 0);
memcpy (id.data (), pending_connection_.endpoint.options.identity, pending_connection_.endpoint.options.identity_size); memcpy (id.data (), pending_connection_.endpoint.options.identity,
pending_connection_.endpoint.options.identity_size);
id.set_flags (msg_t::identity); id.set_flags (msg_t::identity);
bool written = pending_connection_.connect_pipe->write (&id); bool written = pending_connection_.connect_pipe->write (&id);
zmq_assert (written); zmq_assert (written);
......
...@@ -133,7 +133,6 @@ int zmq::get_peer_ip_address (fd_t sockfd_, std::string &ip_addr_) ...@@ -133,7 +133,6 @@ int zmq::get_peer_ip_address (fd_t sockfd_, std::string &ip_addr_)
errno_assert (errno != EBADF && errno_assert (errno != EBADF &&
errno != EFAULT && errno != EFAULT &&
errno != EINVAL && errno != EINVAL &&
errno != ENOTCONN &&
errno != ENOTSOCK); errno != ENOTSOCK);
return 0; return 0;
} }
......
...@@ -40,7 +40,7 @@ extern "C" ...@@ -40,7 +40,7 @@ extern "C"
void zmq::thread_t::start (thread_fn *tfn_, void *arg_) void zmq::thread_t::start (thread_fn *tfn_, void *arg_)
{ {
tfn = tfn_; tfn = tfn_;
arg =arg_; arg = arg_;
#if defined _WIN32_WCE #if defined _WIN32_WCE
descriptor = (HANDLE) CreateThread (NULL, 0, descriptor = (HANDLE) CreateThread (NULL, 0,
&::thread_routine, this, 0 , NULL); &::thread_routine, this, 0 , NULL);
...@@ -86,7 +86,7 @@ extern "C" ...@@ -86,7 +86,7 @@ extern "C"
void zmq::thread_t::start (thread_fn *tfn_, void *arg_) void zmq::thread_t::start (thread_fn *tfn_, void *arg_)
{ {
tfn = tfn_; tfn = tfn_;
arg =arg_; arg = arg_;
int rc = pthread_create (&descriptor, NULL, thread_routine, this); int rc = pthread_create (&descriptor, NULL, thread_routine, this);
posix_assert (rc); posix_assert (rc);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment