Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
ae0676e8
Commit
ae0676e8
authored
Sep 29, 2016
by
Luca Boccassi
Committed by
GitHub
Sep 29, 2016
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #2144 from minrk/fix-poller
Resolve issues with zmq_poll and zmq_poller working together
parents
872f1e5a
310dafbc
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
60 additions
and
35 deletions
+60
-35
socket_poller.cpp
src/socket_poller.cpp
+41
-30
zmq.cpp
src/zmq.cpp
+19
-5
No files found.
src/socket_poller.cpp
View file @
ae0676e8
...
...
@@ -412,7 +412,6 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev
uint64_t
end
=
0
;
bool
first_pass
=
true
;
bool
found
=
false
;
while
(
true
)
{
// Compute the timeout for the subsequent poll.
...
...
@@ -440,13 +439,13 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev
signaler
.
recv
();
// Check for the events.
int
i
=
0
;
for
(
items_t
::
iterator
it
=
items
.
begin
();
it
!=
items
.
end
()
&&
i
<
n_events_
;
++
i
,
++
it
)
{
int
found
=
0
;
for
(
items_t
::
iterator
it
=
items
.
begin
();
it
!=
items
.
end
()
&&
found
<
n_events_
;
++
it
)
{
events_
[
i
].
socket
=
NULL
;
events_
[
i
].
fd
=
0
;
events_
[
i
].
user_data
=
NULL
;
events_
[
i
].
events
=
0
;
events_
[
found
].
socket
=
NULL
;
events_
[
found
].
fd
=
0
;
events_
[
found
].
user_data
=
NULL
;
events_
[
found
].
events
=
0
;
// The poll item is a 0MQ socket. Retrieve pending events
// using the ZMQ_EVENTS socket option.
...
...
@@ -458,10 +457,10 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev
}
if
(
it
->
events
&
events
)
{
events_
[
i
].
socket
=
it
->
socket
;
events_
[
i
].
user_data
=
it
->
user_data
;
events_
[
i
].
events
=
it
->
events
&
events
;
found
=
true
;
events_
[
found
].
socket
=
it
->
socket
;
events_
[
found
].
user_data
=
it
->
user_data
;
events_
[
found
].
events
=
it
->
events
&
events
;
++
found
;
}
}
// Else, the poll item is a raw file descriptor, simply convert
...
...
@@ -480,16 +479,22 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev
events
|=
ZMQ_POLLERR
;
if
(
events
)
{
events_
[
i
].
socket
=
NULL
;
events_
[
i
].
user_data
=
it
->
user_data
;
events_
[
i
].
fd
=
it
->
fd
;
events_
[
i
].
events
=
events
;
found
=
true
;
events_
[
found
].
socket
=
NULL
;
events_
[
found
].
user_data
=
it
->
user_data
;
events_
[
found
].
fd
=
it
->
fd
;
events_
[
found
].
events
=
events
;
++
found
;
}
}
}
if
(
found
)
{
return
0
;
for
(
int
i
=
found
;
i
<
n_events_
;
++
i
)
{
events_
[
i
].
socket
=
NULL
;
events_
[
i
].
fd
=
0
;
events_
[
i
].
user_data
=
NULL
;
events_
[
i
].
events
=
0
;
}
return
found
;
}
// If timeout is zero, exit immediately whether there are events or not.
...
...
@@ -548,7 +553,6 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev
uint64_t
end
=
0
;
bool
first_pass
=
true
;
bool
found
=
false
;
fd_set
inset
,
outset
,
errset
;
while
(
true
)
{
...
...
@@ -596,8 +600,8 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev
signaler
.
recv
();
// Check for the events.
int
i
=
0
;
for
(
items_t
::
iterator
it
=
items
.
begin
();
it
!=
items
.
end
()
&&
i
<
n_events_
;
++
i
,
++
it
)
{
int
found
=
0
;
for
(
items_t
::
iterator
it
=
items
.
begin
();
it
!=
items
.
end
()
&&
found
<
n_events_
;
++
it
)
{
// The poll item is a 0MQ socket. Retrieve pending events
// using the ZMQ_EVENTS socket option.
...
...
@@ -608,10 +612,10 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev
return
-
1
;
if
(
it
->
events
&
events
)
{
events_
[
i
].
socket
=
it
->
socket
;
events_
[
i
].
user_data
=
it
->
user_data
;
events_
[
i
].
events
=
it
->
events
&
events
;
found
=
true
;
events_
[
found
].
socket
=
it
->
socket
;
events_
[
found
].
user_data
=
it
->
user_data
;
events_
[
found
].
events
=
it
->
events
&
events
;
++
found
;
}
}
// Else, the poll item is a raw file descriptor, simply convert
...
...
@@ -627,16 +631,23 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev
events
|=
ZMQ_POLLERR
;
if
(
events
)
{
events_
[
i
].
socket
=
NULL
;
events_
[
i
].
user_data
=
it
->
user_data
;
events_
[
i
].
fd
=
it
->
fd
;
events_
[
i
].
events
=
events
;
found
=
true
;
events_
[
found
].
socket
=
NULL
;
events_
[
found
].
user_data
=
it
->
user_data
;
events_
[
found
].
fd
=
it
->
fd
;
events_
[
found
].
events
=
events
;
++
found
;
}
}
}
if
(
found
)
{
return
0
;
// zero-out remaining events
for
(
int
i
=
found
;
i
<
n_events_
;
++
i
)
{
events_
[
i
].
socket
=
NULL
;
events_
[
i
].
fd
=
0
;
events_
[
i
].
user_data
=
NULL
;
events_
[
i
].
events
=
0
;
}
return
found
;
}
// If timeout is zero, exit immediately whether there are events or not.
...
...
src/zmq.cpp
View file @
ae0676e8
...
...
@@ -755,6 +755,7 @@ inline int zmq_poller_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
// Register sockets with poller
for
(
int
i
=
0
;
i
<
nitems_
;
i
++
)
{
items_
[
i
].
revents
=
0
;
if
(
items_
[
i
].
socket
)
{
// Poll item is a 0MQ socket.
rc
=
zmq_poller_add
(
poller
,
items_
[
i
].
socket
,
NULL
,
items_
[
i
].
events
);
...
...
@@ -779,12 +780,25 @@ inline int zmq_poller_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
if
(
rc
<
0
)
{
zmq_poller_destroy
(
&
poller
);
delete
[]
events
;
if
(
zmq_errno
()
==
ETIMEDOUT
)
{
return
0
;
}
return
rc
;
}
// Put the event information where zmq_poll expects it to go.
for
(
int
i
=
0
;
i
<
nitems_
;
i
++
)
{
items_
[
i
].
revents
=
events
[
i
].
events
;
// Transform poller events into zmq_pollitem events
// items_ contains all items, while events only contains fired events.
// The two are still co-ordered, so the step through items
// Checking for matches only on the first event
int
found_events
=
rc
;
for
(
int
i
=
0
,
j
=
0
;
i
<
nitems_
&&
j
<
found_events
;
i
++
)
{
if
(
(
items_
[
i
].
socket
&&
items_
[
i
].
socket
==
events
[
j
].
socket
)
||
(
items_
[
i
].
fd
&&
items_
[
i
].
fd
==
events
[
j
].
fd
)
)
{
items_
[
i
].
revents
=
events
[
j
].
events
;
j
++
;
}
}
// Cleanup
...
...
@@ -1284,8 +1298,8 @@ int zmq_poller_wait (void *poller_, zmq_poller_event_t *event, long timeout_)
if
(
rc
<
0
)
{
memset
(
event
,
0
,
sizeof
(
zmq_poller_event_t
));
}
return
rc
;
// wait_all returns number of events, but we return 0 for any success
return
rc
>=
0
?
0
:
rc
;
}
int
zmq_poller_wait_all
(
void
*
poller_
,
zmq_poller_event_t
*
events
,
int
n_events
,
long
timeout_
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment