Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
0650b59b
Commit
0650b59b
authored
Oct 21, 2015
by
Richard Newton
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #1616 from somdoron/ZMQ_POLLER
problem: ZMQ_POLLER doesn't support polling on out
parents
a7ea57fb
24fc0d4e
Show whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
123 additions
and
28 deletions
+123
-28
zmq.h
include/zmq.h
+10
-4
socket_poller.cpp
src/socket_poller.cpp
+56
-9
socket_poller.hpp
src/socket_poller.hpp
+8
-4
zmq.cpp
src/zmq.cpp
+35
-7
test_poller.cpp
tests/test_poller.cpp
+14
-4
No files found.
include/zmq.h
View file @
0650b59b
...
...
@@ -431,6 +431,8 @@ ZMQ_EXPORT int zmq_pollfd_fd (void *p);
/* Poller polling on sockets,fd and threaf safe sockets */
/******************************************************************************/
#define ZMQ_HAVE_POLLER
typedef
struct
zmq_poller_event_t
{
void
*
socket
;
...
...
@@ -440,19 +442,23 @@ typedef struct zmq_poller_event_t
int
fd
;
#endif
void
*
user_data
;
short
events
;
}
zmq_poller_event_t
;
ZMQ_EXPORT
void
*
zmq_poller_new
();
ZMQ_EXPORT
int
zmq_poller_close
(
void
*
poller
);
ZMQ_EXPORT
int
zmq_poller_add_socket
(
void
*
poller
,
void
*
socket
,
void
*
user_data
);
ZMQ_EXPORT
int
zmq_poller_remove_socket
(
void
*
poller
,
void
*
socket
);
ZMQ_EXPORT
int
zmq_poller_add
(
void
*
poller
,
void
*
socket
,
void
*
user_data
,
short
events
);
ZMQ_EXPORT
int
zmq_poller_modify
(
void
*
poller
,
void
*
socket
,
short
events
);
ZMQ_EXPORT
int
zmq_poller_remove
(
void
*
poller
,
void
*
socket
);
ZMQ_EXPORT
int
zmq_poller_wait
(
void
*
poller
,
zmq_poller_event_t
*
event
,
long
timeout
);
#if defined _WIN32
ZMQ_EXPORT
int
zmq_poller_add_fd
(
void
*
poller
,
SOCKET
fd
,
void
*
user_data
);
ZMQ_EXPORT
int
zmq_poller_add_fd
(
void
*
poller
,
SOCKET
fd
,
void
*
user_data
,
short
events
);
ZMQ_EXPORT
int
zmq_poller_modify_fd
(
void
*
poller
,
SOCKET
fd
,
short
events
);
ZMQ_EXPORT
int
zmq_poller_remove_fd
(
void
*
poller
,
SOCKET
fd
);
#else
ZMQ_EXPORT
int
zmq_poller_add_fd
(
void
*
poller
,
int
fd
,
void
*
user_data
);
ZMQ_EXPORT
int
zmq_poller_add_fd
(
void
*
poller
,
int
fd
,
void
*
user_data
,
short
events
);
ZMQ_EXPORT
int
zmq_poller_modify_fd
(
void
*
poller
,
int
fd
,
short
events
);
ZMQ_EXPORT
int
zmq_poller_remove_fd
(
void
*
poller
,
int
fd
);
#endif
...
...
src/socket_poller.cpp
View file @
0650b59b
...
...
@@ -71,7 +71,7 @@ bool zmq::socket_poller_t::check_tag ()
return
tag
==
0xCAFEBABE
;
}
int
zmq
::
socket_poller_t
::
add
_socket
(
void
*
socket_
,
void
*
user_data
_
)
int
zmq
::
socket_poller_t
::
add
(
void
*
socket_
,
void
*
user_data_
,
short
events
_
)
{
for
(
events_t
::
iterator
it
=
events
.
begin
();
it
!=
events
.
end
();
++
it
)
{
if
(
it
->
socket
==
socket_
)
{
...
...
@@ -91,7 +91,7 @@ int zmq::socket_poller_t::add_socket (void *socket_, void* user_data_)
return
-
1
;
}
event_t
event
=
{
socket_
,
0
,
user_data_
};
event_t
event
=
{
socket_
,
0
,
user_data_
,
events_
};
events
.
push_back
(
event
);
need_rebuild
=
true
;
...
...
@@ -99,9 +99,9 @@ int zmq::socket_poller_t::add_socket (void *socket_, void* user_data_)
}
#if defined _WIN32
int
zmq
::
socket_poller_t
::
add_fd
(
SOCKET
fd_
,
void
*
user_data_
)
int
zmq
::
socket_poller_t
::
add_fd
(
SOCKET
fd_
,
void
*
user_data_
,
short
events_
)
#else
int
zmq
::
socket_poller_t
::
add_fd
(
int
fd_
,
void
*
user_data_
)
int
zmq
::
socket_poller_t
::
add_fd
(
int
fd_
,
void
*
user_data_
,
short
events_
)
#endif
{
for
(
events_t
::
iterator
it
=
events
.
begin
();
it
!=
events
.
end
();
++
it
)
{
...
...
@@ -111,14 +111,60 @@ int zmq::socket_poller_t::add_fd (int fd_, void *user_data_)
}
}
event_t
event
=
{
NULL
,
fd_
,
user_data_
};
event_t
event
=
{
NULL
,
fd_
,
user_data_
,
events_
};
events
.
push_back
(
event
);
need_rebuild
=
true
;
return
0
;
}
int
zmq
::
socket_poller_t
::
remove_socket
(
void
*
socket_
)
int
zmq
::
socket_poller_t
::
modify
(
void
*
socket_
,
short
events_
)
{
events_t
::
iterator
it
;
for
(
it
=
events
.
begin
();
it
!=
events
.
end
();
++
it
)
{
if
(
it
->
socket
==
socket_
)
break
;
}
if
(
it
==
events
.
end
())
{
errno
=
EINVAL
;
return
-
1
;
}
it
->
events
=
events_
;
need_rebuild
=
true
;
return
0
;
}
#if defined _WIN32
int
zmq
::
socket_poller_t
::
modify_fd
(
SOCKET
fd_
,
short
events_
)
#else
int
zmq
::
socket_poller_t
::
modify_fd
(
int
fd_
,
short
events_
)
#endif
{
events_t
::
iterator
it
;
for
(
it
=
events
.
begin
();
it
!=
events
.
end
();
++
it
)
{
if
(
!
it
->
socket
&&
it
->
fd
==
fd_
)
break
;
}
if
(
it
==
events
.
end
())
{
errno
=
EINVAL
;
return
-
1
;
}
it
->
events
=
events_
;
need_rebuild
=
true
;
return
0
;
}
int
zmq
::
socket_poller_t
::
remove
(
void
*
socket_
)
{
events_t
::
iterator
it
;
...
...
@@ -185,13 +231,14 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time
}
if
(
rc
==
0
)
{
errno
=
E
AGAIN
;
errno
=
E
TIMEDOUT
;
return
-
1
;
}
for
(
int
i
=
0
;
i
<
poll_size
;
i
++
)
{
if
(
poll_set
[
i
].
revents
&
ZMQ_POLLIN
)
{
if
(
(
poll_set
[
i
].
revents
&
poll_events
[
i
].
events
)
!=
0
)
{
*
event_
=
poll_events
[
i
];
event_
->
events
=
poll_set
[
i
].
revents
&
poll_events
[
i
].
events
;
break
;
}
...
...
@@ -226,7 +273,7 @@ void zmq::socket_poller_t::rebuild ()
if
(
!
it
->
socket
)
poll_set
[
event_nbr
].
fd
=
it
->
fd
;
poll_set
[
event_nbr
].
events
=
ZMQ_POLLIN
;
poll_set
[
event_nbr
].
events
=
it
->
events
;
poll_events
[
event_nbr
]
=
*
it
;
}
...
...
src/socket_poller.hpp
View file @
0650b59b
...
...
@@ -53,15 +53,19 @@ namespace zmq
int
fd
;
#endif
void
*
user_data
;
short
events
;
}
event_t
;
int
add_socket
(
void
*
socket
,
void
*
user_data
);
int
remove_socket
(
void
*
socket
);
int
add
(
void
*
socket
,
void
*
user_data
,
short
events
);
int
modify
(
void
*
socket
,
short
events
);
int
remove
(
void
*
socket
);
#if defined _WIN32
int
add_fd
(
SOCKET
fd
,
void
*
user_data
);
int
add_fd
(
SOCKET
fd
,
void
*
user_data
,
short
events
);
int
modify_fd
(
SOCKET
fd
,
short
events
);
int
remove_fd
(
SOCKET
fd
);
#else
int
add_fd
(
int
fd
,
void
*
user_data
);
int
add_fd
(
int
fd
,
void
*
user_data
,
short
events
);
int
modify_fd
(
int
fd
,
short
events
);
int
remove_fd
(
int
fd
);
#endif
...
...
src/zmq.cpp
View file @
0650b59b
...
...
@@ -1579,20 +1579,20 @@ int zmq_poller_close (void *poller_)
return
0
;
}
int
zmq_poller_add
_socket
(
void
*
poller_
,
void
*
socket_
,
void
*
user_data
_
)
int
zmq_poller_add
(
void
*
poller_
,
void
*
socket_
,
void
*
user_data_
,
short
events
_
)
{
if
(
!
poller_
||
!
((
zmq
::
socket_poller_t
*
)
poller_
)
->
check_tag
())
{
errno
=
EFAULT
;
return
-
1
;
}
return
((
zmq
::
socket_poller_t
*
)
poller_
)
->
add
_socket
(
socket_
,
user_data
_
);
return
((
zmq
::
socket_poller_t
*
)
poller_
)
->
add
(
socket_
,
user_data_
,
events
_
);
}
#if defined _WIN32
int
zmq_poller_add_fd
(
void
*
poller_
,
SOCKET
fd_
,
void
*
user_data_
)
int
zmq_poller_add_fd
(
void
*
poller_
,
SOCKET
fd_
,
void
*
user_data_
,
short
events_
)
#else
int
zmq_poller_add_fd
(
void
*
poller_
,
int
fd_
,
void
*
user_data_
)
int
zmq_poller_add_fd
(
void
*
poller_
,
int
fd_
,
void
*
user_data_
,
short
events_
)
#endif
{
if
(
!
poller_
||
!
((
zmq
::
socket_poller_t
*
)
poller_
)
->
check_tag
())
{
...
...
@@ -1600,17 +1600,44 @@ int zmq_poller_add_fd (void *poller_, int fd_, void *user_data_)
return
-
1
;
}
return
((
zmq
::
socket_poller_t
*
)
poller_
)
->
add_fd
(
fd_
,
user_data_
);
return
((
zmq
::
socket_poller_t
*
)
poller_
)
->
add_fd
(
fd_
,
user_data_
,
events_
);
}
int
zmq_poller_remove_socket
(
void
*
poller_
,
void
*
socket
)
int
zmq_poller_modify
(
void
*
poller_
,
void
*
socket_
,
short
events_
)
{
if
(
!
poller_
||
!
((
zmq
::
socket_poller_t
*
)
poller_
)
->
check_tag
())
{
errno
=
EFAULT
;
return
-
1
;
}
return
((
zmq
::
socket_poller_t
*
)
poller_
)
->
modify
(
socket_
,
events_
);
}
#if defined _WIN32
int
zmq_poller_modify_fd
(
void
*
poller_
,
SOCKET
fd_
,
short
events_
)
#else
int
zmq_poller_modify_fd
(
void
*
poller_
,
int
fd_
,
short
events_
)
#endif
{
if
(
!
poller_
||
!
((
zmq
::
socket_poller_t
*
)
poller_
)
->
check_tag
())
{
errno
=
EFAULT
;
return
-
1
;
}
return
((
zmq
::
socket_poller_t
*
)
poller_
)
->
modify_fd
(
fd_
,
events_
);
}
int
zmq_poller_remove
(
void
*
poller_
,
void
*
socket
)
{
if
(
!
poller_
||
!
((
zmq
::
socket_poller_t
*
)
poller_
)
->
check_tag
())
{
errno
=
EFAULT
;
return
-
1
;
}
return
((
zmq
::
socket_poller_t
*
)
poller_
)
->
remove
_socket
(
socket
);
return
((
zmq
::
socket_poller_t
*
)
poller_
)
->
remove
(
socket
);
}
#if defined _WIN32
...
...
@@ -1642,6 +1669,7 @@ int zmq_poller_wait (void *poller_, zmq_poller_event_t *event, long timeout_)
event
->
socket
=
e
.
socket
;
event
->
fd
=
e
.
fd
;
event
->
user_data
=
e
.
user_data
;
event
->
events
=
e
.
events
;
return
rc
;
}
...
...
tests/test_poller.cpp
View file @
0650b59b
...
...
@@ -61,7 +61,7 @@ int main (void)
// Set up poller
void
*
poller
=
zmq_poller_new
();
rc
=
zmq_poller_add
_socket
(
poller
,
sink
,
sink
);
rc
=
zmq_poller_add
(
poller
,
sink
,
sink
,
ZMQ_POLLIN
);
assert
(
rc
==
0
);
// Send a message
...
...
@@ -79,7 +79,7 @@ int main (void)
assert
(
rc
==
1
);
// Stop polling sink
rc
=
zmq_poller_remove
_socket
(
poller
,
sink
);
rc
=
zmq_poller_remove
(
poller
,
sink
);
assert
(
rc
==
0
);
// Check we can poll an FD
...
...
@@ -96,7 +96,7 @@ int main (void)
rc
=
zmq_getsockopt
(
bowl
,
ZMQ_FD
,
&
fd
,
&
fd_size
);
assert
(
rc
==
0
);
rc
=
zmq_poller_add_fd
(
poller
,
fd
,
bowl
);
rc
=
zmq_poller_add_fd
(
poller
,
fd
,
bowl
,
ZMQ_POLLIN
);
assert
(
rc
==
0
);
rc
=
zmq_poller_wait
(
poller
,
&
event
,
500
);
assert
(
rc
==
0
);
...
...
@@ -106,7 +106,8 @@ int main (void)
zmq_poller_remove_fd
(
poller
,
fd
);
// Polling on thread safe sockets
zmq_poller_add_socket
(
poller
,
server
,
NULL
);
rc
=
zmq_poller_add
(
poller
,
server
,
NULL
,
ZMQ_POLLIN
);
assert
(
rc
==
0
);
rc
=
zmq_connect
(
client
,
"tcp://127.0.0.1:55557"
);
assert
(
rc
==
0
);
rc
=
zmq_send_const
(
client
,
data
,
1
,
0
);
...
...
@@ -118,6 +119,15 @@ int main (void)
rc
=
zmq_recv
(
server
,
data
,
1
,
0
);
assert
(
rc
==
1
);
// Polling on pollout
rc
=
zmq_poller_modify
(
poller
,
server
,
ZMQ_POLLOUT
|
ZMQ_POLLIN
);
assert
(
rc
==
0
);
rc
=
zmq_poller_wait
(
poller
,
&
event
,
0
);
assert
(
rc
==
0
);
assert
(
event
.
socket
==
server
);
assert
(
event
.
user_data
==
NULL
);
assert
(
event
.
events
==
ZMQ_POLLOUT
);
// Destory poller, sockets and ctx
rc
=
zmq_poller_close
(
poller
);
assert
(
rc
==
0
);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment