Commit b9d83169 authored by Pieter Hintjens's avatar Pieter Hintjens

Merge pull request #1699 from somdoron/master

problem: when socket is destroyed before zmq_poller is the zmq_poller is accessing a dead socket
parents 714988e6 6bbca7cf
...@@ -34,25 +34,25 @@ zmq::socket_poller_t::socket_poller_t () : ...@@ -34,25 +34,25 @@ zmq::socket_poller_t::socket_poller_t () :
tag (0xCAFEBABE), tag (0xCAFEBABE),
need_rebuild (true), need_rebuild (true),
use_signaler (false) use_signaler (false)
#if defined ZMQ_POLL_BASED_ON_POLL #if defined ZMQ_POLL_BASED_ON_POLL
, ,
pollfds (NULL) pollfds (NULL)
#endif #endif
{ {
} }
zmq::socket_poller_t::~socket_poller_t () zmq::socket_poller_t::~socket_poller_t ()
{ {
// Mark the socket_poller as dead // Mark the socket_poller as dead
tag = 0xdeadbeef; tag = 0xdeadbeef;
for (items_t::iterator it = items.begin(); it != items.end(); ++it) { for (items_t::iterator it = items.begin(); it != items.end(); ++it) {
if (it->socket) { if (it->socket && it->socket->check_tag()) {
int thread_safe; int thread_safe;
size_t thread_safe_size = sizeof(int); size_t thread_safe_size = sizeof(int);
if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == 0 && thread_safe) if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == 0 && thread_safe)
it->socket->remove_signaler (&signaler); it->socket->remove_signaler (&signaler);
} }
} }
...@@ -61,21 +61,21 @@ zmq::socket_poller_t::~socket_poller_t () ...@@ -61,21 +61,21 @@ zmq::socket_poller_t::~socket_poller_t ()
free (pollfds); free (pollfds);
pollfds = NULL; pollfds = NULL;
} }
#endif #endif
} }
bool zmq::socket_poller_t::check_tag () bool zmq::socket_poller_t::check_tag ()
{ {
return tag == 0xCAFEBABE; return tag == 0xCAFEBABE;
} }
int zmq::socket_poller_t::add (socket_base_t *socket_, void* user_data_, short events_) int zmq::socket_poller_t::add (socket_base_t *socket_, void* user_data_, short events_)
{ {
for (items_t::iterator it = items.begin (); it != items.end (); ++it) { for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
if (it->socket == socket_) { if (it->socket == socket_) {
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
} }
int thread_safe; int thread_safe;
...@@ -88,7 +88,7 @@ int zmq::socket_poller_t::add (socket_base_t *socket_, void* user_data_, short e ...@@ -88,7 +88,7 @@ int zmq::socket_poller_t::add (socket_base_t *socket_, void* user_data_, short e
if (socket_->add_signaler (&signaler) == -1) if (socket_->add_signaler (&signaler) == -1)
return -1; return -1;
} }
item_t item = {socket_, 0, user_data_, events_}; item_t item = {socket_, 0, user_data_, events_};
items.push_back (item); items.push_back (item);
need_rebuild = true; need_rebuild = true;
...@@ -102,7 +102,7 @@ int zmq::socket_poller_t::add_fd (fd_t fd_, void *user_data_, short events_) ...@@ -102,7 +102,7 @@ int zmq::socket_poller_t::add_fd (fd_t fd_, void *user_data_, short events_)
if (!it->socket && it->fd == fd_) { if (!it->socket && it->fd == fd_) {
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
} }
item_t item = {NULL, fd_, user_data_, events_}; item_t item = {NULL, fd_, user_data_, events_};
...@@ -146,12 +146,12 @@ int zmq::socket_poller_t::modify_fd (fd_t fd_, short events_) ...@@ -146,12 +146,12 @@ int zmq::socket_poller_t::modify_fd (fd_t fd_, short events_)
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
it->events = events_; it->events = events_;
need_rebuild = true; need_rebuild = true;
return 0; return 0;
} }
int zmq::socket_poller_t::remove (socket_base_t *socket_) int zmq::socket_poller_t::remove (socket_base_t *socket_)
...@@ -167,7 +167,7 @@ int zmq::socket_poller_t::remove (socket_base_t *socket_) ...@@ -167,7 +167,7 @@ int zmq::socket_poller_t::remove (socket_base_t *socket_)
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
int thread_safe; int thread_safe;
size_t thread_safe_size = sizeof(int); size_t thread_safe_size = sizeof(int);
...@@ -178,10 +178,10 @@ int zmq::socket_poller_t::remove (socket_base_t *socket_) ...@@ -178,10 +178,10 @@ int zmq::socket_poller_t::remove (socket_base_t *socket_)
if (socket_->remove_signaler (&signaler) == -1) if (socket_->remove_signaler (&signaler) == -1)
return -1; return -1;
} }
items.erase (it); items.erase (it);
need_rebuild = true; need_rebuild = true;
return 0; return 0;
} }
...@@ -198,14 +198,14 @@ int zmq::socket_poller_t::remove_fd (fd_t fd_) ...@@ -198,14 +198,14 @@ int zmq::socket_poller_t::remove_fd (fd_t fd_)
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
items.erase (it); items.erase (it);
need_rebuild = true; need_rebuild = true;
return 0; return 0;
} }
int zmq::socket_poller_t::rebuild () int zmq::socket_poller_t::rebuild ()
{ {
#if defined ZMQ_POLL_BASED_ON_POLL #if defined ZMQ_POLL_BASED_ON_POLL
...@@ -231,16 +231,16 @@ int zmq::socket_poller_t::rebuild () ...@@ -231,16 +231,16 @@ int zmq::socket_poller_t::rebuild ()
if (!use_signaler) { if (!use_signaler) {
use_signaler = true; use_signaler = true;
poll_size++; poll_size++;
} }
} }
else else
poll_size++; poll_size++;
} }
else else
poll_size++; poll_size++;
} }
} }
if (poll_size == 0) if (poll_size == 0)
return 0; return 0;
...@@ -255,7 +255,7 @@ int zmq::socket_poller_t::rebuild () ...@@ -255,7 +255,7 @@ int zmq::socket_poller_t::rebuild ()
pollfds[0].events = POLLIN; pollfds[0].events = POLLIN;
} }
for (items_t::iterator it = items.begin (); it != items.end (); ++it) { for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
if (it->events) { if (it->events) {
if (it->socket) { if (it->socket) {
int thread_safe; int thread_safe;
...@@ -264,12 +264,12 @@ int zmq::socket_poller_t::rebuild () ...@@ -264,12 +264,12 @@ int zmq::socket_poller_t::rebuild ()
if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1) if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
return -1; return -1;
if (!thread_safe) { if (!thread_safe) {
size_t fd_size = sizeof (zmq::fd_t); size_t fd_size = sizeof (zmq::fd_t);
if (it->socket->getsockopt (ZMQ_FD, &pollfds [item_nbr].fd, &fd_size) == -1) { if (it->socket->getsockopt (ZMQ_FD, &pollfds [item_nbr].fd, &fd_size) == -1) {
return -1; return -1;
} }
pollfds [item_nbr].events = POLLIN; pollfds [item_nbr].events = POLLIN;
item_nbr++; item_nbr++;
} }
...@@ -281,7 +281,7 @@ int zmq::socket_poller_t::rebuild () ...@@ -281,7 +281,7 @@ int zmq::socket_poller_t::rebuild ()
(it->events & ZMQ_POLLOUT ? POLLOUT : 0) | (it->events & ZMQ_POLLOUT ? POLLOUT : 0) |
(it->events & ZMQ_POLLPRI ? POLLPRI : 0); (it->events & ZMQ_POLLPRI ? POLLPRI : 0);
it->pollfd_index = item_nbr; it->pollfd_index = item_nbr;
item_nbr++; item_nbr++;
} }
} }
} }
...@@ -339,9 +339,9 @@ int zmq::socket_poller_t::rebuild () ...@@ -339,9 +339,9 @@ int zmq::socket_poller_t::rebuild ()
FD_SET (notify_fd, &pollset_in); FD_SET (notify_fd, &pollset_in);
if (maxfd < notify_fd) if (maxfd < notify_fd)
maxfd = notify_fd; maxfd = notify_fd;
poll_size++; poll_size++;
} }
} }
// Else, the poll item is a raw file descriptor. Convert the poll item // Else, the poll item is a raw file descriptor. Convert the poll item
...@@ -391,7 +391,7 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time ...@@ -391,7 +391,7 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time
zmq::clock_t clock; zmq::clock_t clock;
uint64_t now = 0; uint64_t now = 0;
uint64_t end = 0; uint64_t end = 0;
bool first_pass = true; bool first_pass = true;
while (true) { while (true) {
...@@ -438,14 +438,14 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time ...@@ -438,14 +438,14 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time
// If there is event to return, we can exit immediately. // If there is event to return, we can exit immediately.
return 0; return 0;
} }
} }
// Else, the poll item is a raw file descriptor, simply convert // Else, the poll item is a raw file descriptor, simply convert
// the events to zmq_pollitem_t-style format. // the events to zmq_pollitem_t-style format.
else { else {
short revents = pollfds [it->pollfd_index].revents; short revents = pollfds [it->pollfd_index].revents;
short events = 0; short events = 0;
if (revents & POLLIN) if (revents & POLLIN)
events |= ZMQ_POLLIN; events |= ZMQ_POLLIN;
if (revents & POLLOUT) if (revents & POLLOUT)
...@@ -517,7 +517,7 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time ...@@ -517,7 +517,7 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time
uint64_t now = 0; uint64_t now = 0;
uint64_t end = 0; uint64_t end = 0;
bool first_pass = true; bool first_pass = true;
fd_set inset, outset, errset; fd_set inset, outset, errset;
while (true) { while (true) {
...@@ -582,7 +582,7 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time ...@@ -582,7 +582,7 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time
// If there is event to return, we can exit immediately. // If there is event to return, we can exit immediately.
return 0; return 0;
} }
} }
// Else, the poll item is a raw file descriptor, simply convert // Else, the poll item is a raw file descriptor, simply convert
// the events to zmq_pollitem_t-style format. // the events to zmq_pollitem_t-style format.
...@@ -595,7 +595,7 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time ...@@ -595,7 +595,7 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time
events |= ZMQ_POLLOUT; events |= ZMQ_POLLOUT;
if (FD_ISSET (it->fd, &errset)) if (FD_ISSET (it->fd, &errset))
events |= ZMQ_POLLERR; events |= ZMQ_POLLERR;
if (events) { if (events) {
event_->socket = NULL; event_->socket = NULL;
event_->user_data = it->user_data; event_->user_data = it->user_data;
...@@ -648,5 +648,3 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time ...@@ -648,5 +648,3 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time
return -1; return -1;
#endif #endif
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment