Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
6c166937
Commit
6c166937
authored
Sep 29, 2016
by
Luca Boccassi
Committed by
GitHub
Sep 29, 2016
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #2145 from minrk/zmq-poll-repeat
allow duplicate entries in zmq_poller_poll
parents
ae0676e8
fb5a04e2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
54 additions
and
23 deletions
+54
-23
proxy.cpp
src/proxy.cpp
+1
-10
zmq.cpp
src/zmq.cpp
+53
-13
No files found.
src/proxy.cpp
View file @
6c166937
...
...
@@ -130,15 +130,6 @@ int zmq::proxy (
{
backend_
,
0
,
ZMQ_POLLOUT
,
0
}
};
int
control_idx
=
2
;
if
(
frontend_
==
backend_
)
{
// when frontend & backend are the same,
// avoid duplicate poll entries
qt_poll_items
-=
1
;
items
[
1
]
=
items
[
2
];
control_idx
=
1
;
}
// Proxy can be in these three states
enum
{
active
,
...
...
@@ -163,7 +154,7 @@ int zmq::proxy (
}
// Process a control command if any
if
(
control_
&&
items
[
control_idx
].
revents
&
ZMQ_POLLIN
)
{
if
(
control_
&&
items
[
2
].
revents
&
ZMQ_POLLIN
)
{
rc
=
control_
->
recv
(
&
msg
,
0
);
if
(
unlikely
(
rc
<
0
))
return
-
1
;
...
...
src/zmq.cpp
View file @
6c166937
...
...
@@ -753,12 +753,28 @@ inline int zmq_poller_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
void
*
poller
=
zmq_poller_new
();
alloc_assert
(
poller
);
bool
repeat_items
=
false
;
// Register sockets with poller
for
(
int
i
=
0
;
i
<
nitems_
;
i
++
)
{
items_
[
i
].
revents
=
0
;
bool
modify
=
false
;
short
e
=
items_
[
i
].
events
;
if
(
items_
[
i
].
socket
)
{
// Poll item is a 0MQ socket.
rc
=
zmq_poller_add
(
poller
,
items_
[
i
].
socket
,
NULL
,
items_
[
i
].
events
);
for
(
int
j
=
0
;
j
<
i
;
++
j
)
{
// Check for repeat entries
if
(
items_
[
j
].
socket
==
items_
[
i
].
socket
)
{
repeat_items
=
true
;
modify
=
true
;
e
|=
items_
[
j
].
events
;
}
}
if
(
modify
)
{
rc
=
zmq_poller_modify
(
poller
,
items_
[
i
].
socket
,
e
);
}
else
{
rc
=
zmq_poller_add
(
poller
,
items_
[
i
].
socket
,
NULL
,
e
);
}
if
(
rc
<
0
)
{
zmq_poller_destroy
(
&
poller
);
delete
[]
events
;
...
...
@@ -766,7 +782,19 @@ inline int zmq_poller_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
}
}
else
{
// Poll item is a raw file descriptor.
rc
=
zmq_poller_add_fd
(
poller
,
items_
[
i
].
fd
,
NULL
,
items_
[
i
].
events
);
for
(
int
j
=
0
;
j
<
i
;
++
j
)
{
// Check for repeat entries
if
(
items_
[
j
].
fd
==
items_
[
i
].
fd
)
{
repeat_items
=
true
;
modify
=
true
;
e
|=
items_
[
j
].
events
;
}
}
if
(
modify
)
{
rc
=
zmq_poller_modify_fd
(
poller
,
items_
[
i
].
fd
,
e
);
}
else
{
rc
=
zmq_poller_add_fd
(
poller
,
items_
[
i
].
fd
,
NULL
,
e
);
}
if
(
rc
<
0
)
{
zmq_poller_destroy
(
&
poller
);
delete
[]
events
;
...
...
@@ -786,18 +814,30 @@ inline int zmq_poller_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
return
rc
;
}
// Transform poller events into zmq_pollitem events
// Transform poller events into zmq_pollitem events
.
// items_ contains all items, while events only contains fired events.
// The two are still co-ordered, so the step through items
// Checking for matches only on the first event
int
found_events
=
rc
;
for
(
int
i
=
0
,
j
=
0
;
i
<
nitems_
&&
j
<
found_events
;
i
++
)
{
if
(
(
items_
[
i
].
socket
&&
items_
[
i
].
socket
==
events
[
j
].
socket
)
||
(
items_
[
i
].
fd
&&
items_
[
i
].
fd
==
events
[
j
].
fd
)
)
{
items_
[
i
].
revents
=
events
[
j
].
events
;
j
++
;
// If no sockets are repeated (likely), the two are still co-ordered, so the step through items
// Checking for matches only on the first event.
// If there are repeat items, they cannot be assumed to be co-ordered,
// so each pollitem must check fired events from the beginning.
int
j_start
=
0
,
found_events
=
rc
;
for
(
int
i
=
0
;
i
<
nitems_
;
i
++
)
{
for
(
int
j
=
j_start
;
j
<
found_events
;
++
j
)
{
if
(
(
items_
[
i
].
socket
&&
items_
[
i
].
socket
==
events
[
j
].
socket
)
||
(
items_
[
i
].
fd
&&
items_
[
i
].
fd
==
events
[
j
].
fd
)
)
{
items_
[
i
].
revents
=
events
[
j
].
events
&
items_
[
i
].
events
;
if
(
!
repeat_items
)
{
// no repeats, we can ignore events we've already seen
j_start
++
;
}
break
;
}
if
(
!
repeat_items
)
{
// no repeats, never have to look at j > j_start
break
;
}
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment