Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
4da5ff6a
Commit
4da5ff6a
authored
Aug 17, 2015
by
somdoron
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
revert to original zmq_polland creating zmq_pollfd_poll for polling on pollfd
parent
a96f16e3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
479 additions
and
49 deletions
+479
-49
zmq.h
include/zmq.h
+8
-0
zmq.cpp
src/zmq.cpp
+466
-44
test_thread_safe_polling.cpp
tests/test_thread_safe_polling.cpp
+5
-5
No files found.
include/zmq.h
View file @
4da5ff6a
...
@@ -410,8 +410,16 @@ typedef struct zmq_pollitem_t
...
@@ -410,8 +410,16 @@ typedef struct zmq_pollitem_t
#define ZMQ_POLLITEMS_DFLT 16
#define ZMQ_POLLITEMS_DFLT 16
ZMQ_EXPORT
int
zmq_poll
(
zmq_pollitem_t
*
items
,
int
nitems
,
long
timeout
);
ZMQ_EXPORT
int
zmq_poll
(
zmq_pollitem_t
*
items
,
int
nitems
,
long
timeout
);
/******************************************************************************/
/* Pollfd polling on thread safe socket */
/******************************************************************************/
ZMQ_EXPORT
void
*
zmq_pollfd_new
();
ZMQ_EXPORT
void
*
zmq_pollfd_new
();
ZMQ_EXPORT
int
zmq_pollfd_close
(
void
*
p
);
ZMQ_EXPORT
int
zmq_pollfd_close
(
void
*
p
);
ZMQ_EXPORT
void
zmq_pollfd_recv
(
void
*
p
);
ZMQ_EXPORT
int
zmq_pollfd_wait
(
void
*
p
,
int
timeout_
);
ZMQ_EXPORT
int
zmq_pollfd_poll
(
void
*
p
,
zmq_pollitem_t
*
items
,
int
nitems
,
long
timeout
);
#if defined _WIN32
#if defined _WIN32
ZMQ_EXPORT
SOCKET
zmq_pollfd_fd
(
void
*
p
);
ZMQ_EXPORT
SOCKET
zmq_pollfd_fd
(
void
*
p
);
...
...
src/zmq.cpp
View file @
4da5ff6a
...
@@ -709,6 +709,364 @@ const char *zmq_msg_gets (zmq_msg_t *msg_, const char *property_)
...
@@ -709,6 +709,364 @@ const char *zmq_msg_gets (zmq_msg_t *msg_, const char *property_)
}
}
}
}
// Polling.
int
zmq_poll
(
zmq_pollitem_t
*
items_
,
int
nitems_
,
long
timeout_
)
{
// TODO: the function implementation can just call zmq_pollfd_poll with pollfd as NULL, however pollfd is not yet stable
#if defined ZMQ_POLL_BASED_ON_POLL
if
(
unlikely
(
nitems_
<
0
))
{
errno
=
EINVAL
;
return
-
1
;
}
if
(
unlikely
(
nitems_
==
0
))
{
if
(
timeout_
==
0
)
return
0
;
#if defined ZMQ_HAVE_WINDOWS
Sleep
(
timeout_
>
0
?
timeout_
:
INFINITE
);
return
0
;
#elif defined ZMQ_HAVE_ANDROID
usleep
(
timeout_
*
1000
);
return
0
;
#else
return
usleep
(
timeout_
*
1000
);
#endif
}
if
(
!
items_
)
{
errno
=
EFAULT
;
return
-
1
;
}
zmq
::
clock_t
clock
;
uint64_t
now
=
0
;
uint64_t
end
=
0
;
pollfd
spollfds
[
ZMQ_POLLITEMS_DFLT
];
pollfd
*
pollfds
=
spollfds
;
if
(
nitems_
>
ZMQ_POLLITEMS_DFLT
)
{
pollfds
=
(
pollfd
*
)
malloc
(
nitems_
*
sizeof
(
pollfd
));
alloc_assert
(
pollfds
);
}
// Build pollset for poll () system call.
for
(
int
i
=
0
;
i
!=
nitems_
;
i
++
)
{
// If the poll item is a 0MQ socket, we poll on the file descriptor
// retrieved by the ZMQ_FD socket option.
if
(
items_
[
i
].
socket
)
{
size_t
zmq_fd_size
=
sizeof
(
zmq
::
fd_t
);
if
(
zmq_getsockopt
(
items_
[
i
].
socket
,
ZMQ_FD
,
&
pollfds
[
i
].
fd
,
&
zmq_fd_size
)
==
-
1
)
{
if
(
pollfds
!=
spollfds
)
free
(
pollfds
);
return
-
1
;
}
pollfds
[
i
].
events
=
items_
[
i
].
events
?
POLLIN
:
0
;
}
// Else, the poll item is a raw file descriptor. Just convert the
// events to normal POLLIN/POLLOUT for poll ().
else
{
pollfds
[
i
].
fd
=
items_
[
i
].
fd
;
pollfds
[
i
].
events
=
(
items_
[
i
].
events
&
ZMQ_POLLIN
?
POLLIN
:
0
)
|
(
items_
[
i
].
events
&
ZMQ_POLLOUT
?
POLLOUT
:
0
)
|
(
items_
[
i
].
events
&
ZMQ_POLLPRI
?
POLLPRI
:
0
);
}
}
bool
first_pass
=
true
;
int
nevents
=
0
;
while
(
true
)
{
// Compute the timeout for the subsequent poll.
int
timeout
;
if
(
first_pass
)
timeout
=
0
;
else
if
(
timeout_
<
0
)
timeout
=
-
1
;
else
timeout
=
end
-
now
;
// Wait for events.
while
(
true
)
{
int
rc
=
poll
(
pollfds
,
nitems_
,
timeout
);
if
(
rc
==
-
1
&&
errno
==
EINTR
)
{
if
(
pollfds
!=
spollfds
)
free
(
pollfds
);
return
-
1
;
}
errno_assert
(
rc
>=
0
);
break
;
}
// Check for the events.
for
(
int
i
=
0
;
i
!=
nitems_
;
i
++
)
{
items_
[
i
].
revents
=
0
;
// The poll item is a 0MQ socket. Retrieve pending events
// using the ZMQ_EVENTS socket option.
if
(
items_
[
i
].
socket
)
{
size_t
zmq_events_size
=
sizeof
(
uint32_t
);
uint32_t
zmq_events
;
if
(
zmq_getsockopt
(
items_
[
i
].
socket
,
ZMQ_EVENTS
,
&
zmq_events
,
&
zmq_events_size
)
==
-
1
)
{
if
(
pollfds
!=
spollfds
)
free
(
pollfds
);
return
-
1
;
}
if
((
items_
[
i
].
events
&
ZMQ_POLLOUT
)
&&
(
zmq_events
&
ZMQ_POLLOUT
))
items_
[
i
].
revents
|=
ZMQ_POLLOUT
;
if
((
items_
[
i
].
events
&
ZMQ_POLLIN
)
&&
(
zmq_events
&
ZMQ_POLLIN
))
items_
[
i
].
revents
|=
ZMQ_POLLIN
;
}
// Else, the poll item is a raw file descriptor, simply convert
// the events to zmq_pollitem_t-style format.
else
{
if
(
pollfds
[
i
].
revents
&
POLLIN
)
items_
[
i
].
revents
|=
ZMQ_POLLIN
;
if
(
pollfds
[
i
].
revents
&
POLLOUT
)
items_
[
i
].
revents
|=
ZMQ_POLLOUT
;
if
(
pollfds
[
i
].
revents
&
POLLPRI
)
items_
[
i
].
revents
|=
ZMQ_POLLPRI
;
if
(
pollfds
[
i
].
revents
&
~
(
POLLIN
|
POLLOUT
|
POLLPRI
))
items_
[
i
].
revents
|=
ZMQ_POLLERR
;
}
if
(
items_
[
i
].
revents
)
nevents
++
;
}
// If timout is zero, exit immediately whether there are events or not.
if
(
timeout_
==
0
)
break
;
// If there are events to return, we can exit immediately.
if
(
nevents
)
break
;
// At this point we are meant to wait for events but there are none.
// If timeout is infinite we can just loop until we get some events.
if
(
timeout_
<
0
)
{
if
(
first_pass
)
first_pass
=
false
;
continue
;
}
// The timeout is finite and there are no events. In the first pass
// we get a timestamp of when the polling have begun. (We assume that
// first pass have taken negligible time). We also compute the time
// when the polling should time out.
if
(
first_pass
)
{
now
=
clock
.
now_ms
();
end
=
now
+
timeout_
;
if
(
now
==
end
)
break
;
first_pass
=
false
;
continue
;
}
// Find out whether timeout have expired.
now
=
clock
.
now_ms
();
if
(
now
>=
end
)
break
;
}
if
(
pollfds
!=
spollfds
)
free
(
pollfds
);
return
nevents
;
#elif defined ZMQ_POLL_BASED_ON_SELECT
if
(
unlikely
(
nitems_
<
0
))
{
errno
=
EINVAL
;
return
-
1
;
}
if
(
unlikely
(
nitems_
==
0
))
{
if
(
timeout_
==
0
)
return
0
;
#if defined ZMQ_HAVE_WINDOWS
Sleep
(
timeout_
>
0
?
timeout_
:
INFINITE
);
return
0
;
#else
return
usleep
(
timeout_
*
1000
);
#endif
}
zmq
::
clock_t
clock
;
uint64_t
now
=
0
;
uint64_t
end
=
0
;
// Ensure we do not attempt to select () on more than FD_SETSIZE
// file descriptors.
zmq_assert
(
nitems_
<=
FD_SETSIZE
);
fd_set
pollset_in
;
FD_ZERO
(
&
pollset_in
);
fd_set
pollset_out
;
FD_ZERO
(
&
pollset_out
);
fd_set
pollset_err
;
FD_ZERO
(
&
pollset_err
);
zmq
::
fd_t
maxfd
=
0
;
// Build the fd_sets for passing to select ().
for
(
int
i
=
0
;
i
!=
nitems_
;
i
++
)
{
// If the poll item is a 0MQ socket we are interested in input on the
// notification file descriptor retrieved by the ZMQ_FD socket option.
if
(
items_
[
i
].
socket
)
{
size_t
zmq_fd_size
=
sizeof
(
zmq
::
fd_t
);
zmq
::
fd_t
notify_fd
;
if
(
zmq_getsockopt
(
items_
[
i
].
socket
,
ZMQ_FD
,
&
notify_fd
,
&
zmq_fd_size
)
==
-
1
)
return
-
1
;
if
(
items_
[
i
].
events
)
{
FD_SET
(
notify_fd
,
&
pollset_in
);
if
(
maxfd
<
notify_fd
)
maxfd
=
notify_fd
;
}
}
// Else, the poll item is a raw file descriptor. Convert the poll item
// events to the appropriate fd_sets.
else
{
if
(
items_
[
i
].
events
&
ZMQ_POLLIN
)
FD_SET
(
items_
[
i
].
fd
,
&
pollset_in
);
if
(
items_
[
i
].
events
&
ZMQ_POLLOUT
)
FD_SET
(
items_
[
i
].
fd
,
&
pollset_out
);
if
(
items_
[
i
].
events
&
ZMQ_POLLERR
)
FD_SET
(
items_
[
i
].
fd
,
&
pollset_err
);
if
(
maxfd
<
items_
[
i
].
fd
)
maxfd
=
items_
[
i
].
fd
;
}
}
bool
first_pass
=
true
;
int
nevents
=
0
;
fd_set
inset
,
outset
,
errset
;
while
(
true
)
{
// Compute the timeout for the subsequent poll.
timeval
timeout
;
timeval
*
ptimeout
;
if
(
first_pass
)
{
timeout
.
tv_sec
=
0
;
timeout
.
tv_usec
=
0
;
ptimeout
=
&
timeout
;
}
else
if
(
timeout_
<
0
)
ptimeout
=
NULL
;
else
{
timeout
.
tv_sec
=
(
long
)
((
end
-
now
)
/
1000
);
timeout
.
tv_usec
=
(
long
)
((
end
-
now
)
%
1000
*
1000
);
ptimeout
=
&
timeout
;
}
// Wait for events. Ignore interrupts if there's infinite timeout.
while
(
true
)
{
memcpy
(
&
inset
,
&
pollset_in
,
sizeof
(
fd_set
));
memcpy
(
&
outset
,
&
pollset_out
,
sizeof
(
fd_set
));
memcpy
(
&
errset
,
&
pollset_err
,
sizeof
(
fd_set
));
#if defined ZMQ_HAVE_WINDOWS
int
rc
=
select
(
0
,
&
inset
,
&
outset
,
&
errset
,
ptimeout
);
if
(
unlikely
(
rc
==
SOCKET_ERROR
))
{
errno
=
zmq
::
wsa_error_to_errno
(
WSAGetLastError
());
wsa_assert
(
errno
==
ENOTSOCK
);
return
-
1
;
}
#else
int
rc
=
select
(
maxfd
+
1
,
&
inset
,
&
outset
,
&
errset
,
ptimeout
);
if
(
unlikely
(
rc
==
-
1
))
{
errno_assert
(
errno
==
EINTR
||
errno
==
EBADF
);
return
-
1
;
}
#endif
break
;
}
// Check for the events.
for
(
int
i
=
0
;
i
!=
nitems_
;
i
++
)
{
items_
[
i
].
revents
=
0
;
// The poll item is a 0MQ socket. Retrieve pending events
// using the ZMQ_EVENTS socket option.
if
(
items_
[
i
].
socket
)
{
size_t
zmq_events_size
=
sizeof
(
uint32_t
);
uint32_t
zmq_events
;
if
(
zmq_getsockopt
(
items_
[
i
].
socket
,
ZMQ_EVENTS
,
&
zmq_events
,
&
zmq_events_size
)
==
-
1
)
return
-
1
;
if
((
items_
[
i
].
events
&
ZMQ_POLLOUT
)
&&
(
zmq_events
&
ZMQ_POLLOUT
))
items_
[
i
].
revents
|=
ZMQ_POLLOUT
;
if
((
items_
[
i
].
events
&
ZMQ_POLLIN
)
&&
(
zmq_events
&
ZMQ_POLLIN
))
items_
[
i
].
revents
|=
ZMQ_POLLIN
;
}
// Else, the poll item is a raw file descriptor, simply convert
// the events to zmq_pollitem_t-style format.
else
{
if
(
FD_ISSET
(
items_
[
i
].
fd
,
&
inset
))
items_
[
i
].
revents
|=
ZMQ_POLLIN
;
if
(
FD_ISSET
(
items_
[
i
].
fd
,
&
outset
))
items_
[
i
].
revents
|=
ZMQ_POLLOUT
;
if
(
FD_ISSET
(
items_
[
i
].
fd
,
&
errset
))
items_
[
i
].
revents
|=
ZMQ_POLLERR
;
}
if
(
items_
[
i
].
revents
)
nevents
++
;
}
// If timout is zero, exit immediately whether there are events or not.
if
(
timeout_
==
0
)
break
;
// If there are events to return, we can exit immediately.
if
(
nevents
)
break
;
// At this point we are meant to wait for events but there are none.
// If timeout is infinite we can just loop until we get some events.
if
(
timeout_
<
0
)
{
if
(
first_pass
)
first_pass
=
false
;
continue
;
}
// The timeout is finite and there are no events. In the first pass
// we get a timestamp of when the polling have begun. (We assume that
// first pass have taken negligible time). We also compute the time
// when the polling should time out.
if
(
first_pass
)
{
now
=
clock
.
now_ms
();
end
=
now
+
timeout_
;
if
(
now
==
end
)
break
;
first_pass
=
false
;
continue
;
}
// Find out whether timeout have expired.
now
=
clock
.
now_ms
();
if
(
now
>=
end
)
break
;
}
return
nevents
;
#else
// Exotic platforms that support neither poll() nor select().
errno
=
ENOTSUP
;
return
-
1
;
#endif
}
// Create pollfd
// Create pollfd
void
*
zmq_pollfd_new
()
void
*
zmq_pollfd_new
()
...
@@ -718,28 +1076,44 @@ void *zmq_pollfd_new ()
...
@@ -718,28 +1076,44 @@ void *zmq_pollfd_new ()
// Close pollfd
// Close pollfd
int
zmq_pollfd_close
(
void
*
p
)
int
zmq_pollfd_close
(
void
*
p
_
)
{
{
zmq
::
signaler_t
*
s
=
(
zmq
::
signaler_t
*
)
p
;
zmq
::
signaler_t
*
s
=
(
zmq
::
signaler_t
*
)
p
_
;
LIBZMQ_DELETE
(
s
);
LIBZMQ_DELETE
(
s
);
return
0
;
return
0
;
}
}
// Get poller fd
// Recv signal from pollfd
void
zmq_pollfd_recv
(
void
*
p_
)
{
zmq
::
signaler_t
*
s
=
(
zmq
::
signaler_t
*
)
p_
;
s
->
recv
();
}
// Wait until pollfd is signalled
int
zmq_pollfd_wait
(
void
*
p_
,
int
timeout_
)
{
zmq
::
signaler_t
*
s
=
(
zmq
::
signaler_t
*
)
p_
;
return
s
->
wait
(
timeout_
);
}
// Get pollfd fd
#if defined _WIN32
#if defined _WIN32
SOCKET
zmq_pollfd_fd
(
void
*
p
)
SOCKET
zmq_pollfd_fd
(
void
*
p
_
)
#else
#else
int
zmq_pollfd_fd
(
void
*
p
)
int
zmq_pollfd_fd
(
void
*
p
_
)
#endif
#endif
{
{
zmq
::
signaler_t
*
s
=
(
zmq
::
signaler_t
*
)
p
;
zmq
::
signaler_t
*
s
=
(
zmq
::
signaler_t
*
)
p
_
;
return
s
->
get_fd
();
return
s
->
get_fd
();
}
}
// Polling
.
// Polling
thread safe sockets version
int
zmq_poll
(
zmq_pollitem_t
*
items_
,
int
nitems_
,
long
timeout_
)
int
zmq_poll
fd_poll
(
void
*
p_
,
zmq_pollitem_t
*
items_
,
int
nitems_
,
long
timeout_
)
{
{
#if defined ZMQ_POLL_BASED_ON_POLL
#if defined ZMQ_POLL_BASED_ON_POLL
if
(
unlikely
(
nitems_
<
0
))
{
if
(
unlikely
(
nitems_
<
0
))
{
...
@@ -770,12 +1144,48 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
...
@@ -770,12 +1144,48 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
uint64_t
end
=
0
;
uint64_t
end
=
0
;
pollfd
spollfds
[
ZMQ_POLLITEMS_DFLT
];
pollfd
spollfds
[
ZMQ_POLLITEMS_DFLT
];
pollfd
*
pollfds
=
spollfds
;
pollfd
*
pollfds
=
spollfds
;
int
pollfds_size
=
0
;
int
pollfds_index
=
0
;
bool
use_pollfd
=
false
;
if
(
nitems_
>
ZMQ_POLLITEMS_DFLT
)
{
for
(
int
i
=
0
;
i
!=
nitems_
;
i
++
)
{
pollfds
=
(
pollfd
*
)
malloc
(
nitems_
*
sizeof
(
pollfd
));
if
(
items_
[
i
].
socket
)
{
int
thread_safe
;
size_t
thread_safe_size
=
sizeof
(
int
);
if
(
zmq_getsockopt
(
items_
[
i
].
socket
,
ZMQ_THREAD_SAFE
,
&
thread_safe
,
&
thread_safe_size
)
==
-
1
)
{
return
-
1
;
}
// All thread safe sockets share same fd
if
(
thread_safe
)
{
// if poll fd is not set yet and events are set for this socket
if
(
!
use_pollfd
&&
items_
[
i
].
events
)
{
use_pollfd
=
true
;
pollfds_size
++
;
}
}
else
pollfds_size
++
;
}
else
pollfds_size
++
;
}
if
(
pollfds_size
>
ZMQ_POLLITEMS_DFLT
)
{
pollfds
=
(
pollfd
*
)
malloc
(
pollfds_size
*
sizeof
(
pollfd
));
alloc_assert
(
pollfds
);
alloc_assert
(
pollfds
);
}
}
// If we have at least one thread safe socket we set pollfd first
if
(
use_pollfd
)
{
pollfds
[
0
].
fd
=
zmq_pollfd_fd
(
p_
);
pollfds
[
0
].
events
=
POLLIN
;
pollfds_index
=
1
;
}
// Build pollset for poll () system call.
// Build pollset for poll () system call.
for
(
int
i
=
0
;
i
!=
nitems_
;
i
++
)
{
for
(
int
i
=
0
;
i
!=
nitems_
;
i
++
)
{
...
@@ -792,35 +1202,28 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
...
@@ -792,35 +1202,28 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
return
-
1
;
return
-
1
;
}
}
if
(
thread_safe
)
{
// We already handled the thread safe sockets
if
(
!
items_
[
i
].
fd
)
{
if
(
!
thread_safe
)
{
if
(
pollfds
!=
spollfds
)
free
(
pollfds
);
errno
=
EINVAL
;
return
-
1
;
}
pollfds
[
i
].
fd
=
items_
[
i
].
fd
;
}
else
{
size_t
zmq_fd_size
=
sizeof
(
zmq
::
fd_t
);
size_t
zmq_fd_size
=
sizeof
(
zmq
::
fd_t
);
if
(
zmq_getsockopt
(
items_
[
i
].
socket
,
ZMQ_FD
,
&
pollfds
[
i
].
fd
,
if
(
zmq_getsockopt
(
items_
[
i
].
socket
,
ZMQ_FD
,
&
pollfds
[
pollfds_index
].
fd
,
&
zmq_fd_size
)
==
-
1
)
{
&
zmq_fd_size
)
==
-
1
)
{
if
(
pollfds
!=
spollfds
)
if
(
pollfds
!=
spollfds
)
free
(
pollfds
);
free
(
pollfds
);
return
-
1
;
return
-
1
;
}
}
pollfds
[
pollfds_index
].
events
=
items_
[
i
].
events
?
POLLIN
:
0
;
pollfds_index
++
;
}
}
pollfds
[
i
].
events
=
items_
[
i
].
events
?
POLLIN
:
0
;
}
}
// Else, the poll item is a raw file descriptor. Just convert the
// Else, the poll item is a raw file descriptor. Just convert the
// events to normal POLLIN/POLLOUT for poll ().
// events to normal POLLIN/POLLOUT for poll ().
else
{
else
{
pollfds
[
i
].
fd
=
items_
[
i
].
fd
;
pollfds
[
pollfds_index
].
fd
=
items_
[
i
].
fd
;
pollfds
[
i
].
events
=
pollfds
[
pollfds_index
].
events
=
(
items_
[
i
].
events
&
ZMQ_POLLIN
?
POLLIN
:
0
)
|
(
items_
[
i
].
events
&
ZMQ_POLLIN
?
POLLIN
:
0
)
|
(
items_
[
i
].
events
&
ZMQ_POLLOUT
?
POLLOUT
:
0
)
|
(
items_
[
i
].
events
&
ZMQ_POLLOUT
?
POLLOUT
:
0
)
|
(
items_
[
i
].
events
&
ZMQ_POLLPRI
?
POLLPRI
:
0
);
(
items_
[
i
].
events
&
ZMQ_POLLPRI
?
POLLPRI
:
0
);
pollfds_index
++
;
}
}
}
}
...
@@ -840,7 +1243,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
...
@@ -840,7 +1243,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
// Wait for events.
// Wait for events.
while
(
true
)
{
while
(
true
)
{
int
rc
=
poll
(
pollfds
,
nitems_
,
timeout
);
int
rc
=
poll
(
pollfds
,
pollfds_size
,
timeout
);
if
(
rc
==
-
1
&&
errno
==
EINTR
)
{
if
(
rc
==
-
1
&&
errno
==
EINTR
)
{
if
(
pollfds
!=
spollfds
)
if
(
pollfds
!=
spollfds
)
free
(
pollfds
);
free
(
pollfds
);
...
@@ -849,6 +1252,11 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
...
@@ -849,6 +1252,11 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
errno_assert
(
rc
>=
0
);
errno_assert
(
rc
>=
0
);
break
;
break
;
}
}
// Receive the signal from pollfd
if
(
use_pollfd
&&
pollfds
[
0
].
revents
&
POLLIN
)
zmq_pollfd_recv
(
p_
);
// Check for the events.
// Check for the events.
for
(
int
i
=
0
;
i
!=
nitems_
;
i
++
)
{
for
(
int
i
=
0
;
i
!=
nitems_
;
i
++
)
{
...
@@ -959,6 +1367,25 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
...
@@ -959,6 +1367,25 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
fd_set
pollset_err
;
fd_set
pollset_err
;
FD_ZERO
(
&
pollset_err
);
FD_ZERO
(
&
pollset_err
);
bool
use_pollfd
=
false
;
for
(
int
i
=
0
;
i
!=
nitems_
;
i
++
)
{
if
(
items_
[
i
].
socket
)
{
int
thread_safe
;
size_t
thread_safe_size
=
sizeof
(
int
);
if
(
zmq_getsockopt
(
items_
[
i
].
socket
,
ZMQ_THREAD_SAFE
,
&
thread_safe
,
&
thread_safe_size
)
==
-
1
)
return
-
1
;
if
(
thread_safe
&&
items_
[
i
].
events
)
{
use_pollfd
=
true
;
FD_SET
(
zmq_pollfd_fd
(
p_
),
&
pollset_in
);
break
;
}
}
}
zmq
::
fd_t
maxfd
=
0
;
zmq
::
fd_t
maxfd
=
0
;
// Build the fd_sets for passing to select ().
// Build the fd_sets for passing to select ().
...
@@ -973,27 +1400,19 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
...
@@ -973,27 +1400,19 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
if
(
zmq_getsockopt
(
items_
[
i
].
socket
,
ZMQ_THREAD_SAFE
,
&
thread_safe
,
if
(
zmq_getsockopt
(
items_
[
i
].
socket
,
ZMQ_THREAD_SAFE
,
&
thread_safe
,
&
thread_safe_size
)
==
-
1
)
&
thread_safe_size
)
==
-
1
)
return
-
1
;
return
-
1
;
zmq
::
fd_t
notify_fd
;
if
(
!
thread_safe
)
{
zmq
::
fd_t
notify_fd
;
if
(
thread_safe
)
{
if
(
!
items_
[
i
].
fd
)
{
errno
=
EINVAL
;
return
-
1
;
}
notify_fd
=
items_
[
i
].
fd
;
}
else
{
size_t
zmq_fd_size
=
sizeof
(
zmq
::
fd_t
);
size_t
zmq_fd_size
=
sizeof
(
zmq
::
fd_t
);
if
(
zmq_getsockopt
(
items_
[
i
].
socket
,
ZMQ_FD
,
&
notify_fd
,
if
(
zmq_getsockopt
(
items_
[
i
].
socket
,
ZMQ_FD
,
&
notify_fd
,
&
zmq_fd_size
)
==
-
1
)
&
zmq_fd_size
)
==
-
1
)
return
-
1
;
return
-
1
;
}
if
(
items_
[
i
].
events
)
{
if
(
items_
[
i
].
events
)
{
FD_SET
(
notify_fd
,
&
pollset_in
);
FD_SET
(
notify_fd
,
&
pollset_in
);
if
(
maxfd
<
notify_fd
)
if
(
maxfd
<
notify_fd
)
maxfd
=
notify_fd
;
maxfd
=
notify_fd
;
}
}
}
}
}
// Else, the poll item is a raw file descriptor. Convert the poll item
// Else, the poll item is a raw file descriptor. Convert the poll item
...
@@ -1055,6 +1474,9 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
...
@@ -1055,6 +1474,9 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
break
;
break
;
}
}
if
(
use_pollfd
&&
FD_ISSET
(
zmq_pollfd_fd
(
p_
),
&
inset
))
zmq_pollfd_recv
(
p_
);
// Check for the events.
// Check for the events.
for
(
int
i
=
0
;
i
!=
nitems_
;
i
++
)
{
for
(
int
i
=
0
;
i
!=
nitems_
;
i
++
)
{
...
...
tests/test_thread_safe_polling.cpp
View file @
4da5ff6a
...
@@ -50,8 +50,8 @@ int main (void)
...
@@ -50,8 +50,8 @@ int main (void)
assert
(
rc
==
0
);
assert
(
rc
==
0
);
zmq_pollitem_t
items
[]
=
{
zmq_pollitem_t
items
[]
=
{
{
server
,
zmq_pollfd_fd
(
pollfd
)
,
ZMQ_POLLIN
,
0
},
{
server
,
0
,
ZMQ_POLLIN
,
0
},
{
server2
,
zmq_pollfd_fd
(
pollfd
)
,
ZMQ_POLLIN
,
0
}};
{
server2
,
0
,
ZMQ_POLLIN
,
0
}};
rc
=
zmq_bind
(
server
,
"tcp://127.0.0.1:5560"
);
rc
=
zmq_bind
(
server
,
"tcp://127.0.0.1:5560"
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
...
@@ -63,7 +63,7 @@ int main (void)
...
@@ -63,7 +63,7 @@ int main (void)
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_poll
(
items
,
2
,
-
1
);
rc
=
zmq_poll
fd_poll
(
pollfd
,
items
,
2
,
-
1
);
assert
(
rc
==
1
);
assert
(
rc
==
1
);
assert
(
items
[
0
].
revents
==
ZMQ_POLLIN
);
assert
(
items
[
0
].
revents
==
ZMQ_POLLIN
);
...
@@ -74,7 +74,7 @@ int main (void)
...
@@ -74,7 +74,7 @@ int main (void)
rc
=
zmq_msg_recv
(
&
msg
,
server
,
ZMQ_DONTWAIT
);
rc
=
zmq_msg_recv
(
&
msg
,
server
,
ZMQ_DONTWAIT
);
assert
(
rc
==
1
);
assert
(
rc
==
1
);
rc
=
zmq_poll
(
items
,
2
,
-
1
);
rc
=
zmq_poll
fd_poll
(
pollfd
,
items
,
2
,
-
1
);
assert
(
rc
==
1
);
assert
(
rc
==
1
);
assert
(
items
[
0
].
revents
==
0
);
assert
(
items
[
0
].
revents
==
0
);
...
@@ -83,7 +83,7 @@ int main (void)
...
@@ -83,7 +83,7 @@ int main (void)
rc
=
zmq_msg_recv
(
&
msg
,
server2
,
ZMQ_DONTWAIT
);
rc
=
zmq_msg_recv
(
&
msg
,
server2
,
ZMQ_DONTWAIT
);
assert
(
rc
==
1
);
assert
(
rc
==
1
);
rc
=
zmq_poll
(
items
,
2
,
0
);
rc
=
zmq_poll
fd_poll
(
pollfd
,
items
,
2
,
0
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
assert
(
items
[
0
].
revents
==
0
);
assert
(
items
[
0
].
revents
==
0
);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment