Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
a96f16e3
Commit
a96f16e3
authored
Aug 17, 2015
by
Constantin Rack
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #1534 from somdoron/master
problem: new poller field in zmq_pollitem_t is changing the api
parents
7d42aac0
d83b0453
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
35 additions
and
33 deletions
+35
-33
zmq.h
include/zmq.h
+10
-5
zmq.cpp
src/zmq.cpp
+16
-13
test_thread_safe_polling.cpp
tests/test_thread_safe_polling.cpp
+9
-15
No files found.
include/zmq.h
View file @
a96f16e3
...
@@ -383,8 +383,8 @@ ZMQ_EXPORT int zmq_send (void *s, const void *buf, size_t len, int flags);
...
@@ -383,8 +383,8 @@ ZMQ_EXPORT int zmq_send (void *s, const void *buf, size_t len, int flags);
ZMQ_EXPORT
int
zmq_send_const
(
void
*
s
,
const
void
*
buf
,
size_t
len
,
int
flags
);
ZMQ_EXPORT
int
zmq_send_const
(
void
*
s
,
const
void
*
buf
,
size_t
len
,
int
flags
);
ZMQ_EXPORT
int
zmq_recv
(
void
*
s
,
void
*
buf
,
size_t
len
,
int
flags
);
ZMQ_EXPORT
int
zmq_recv
(
void
*
s
,
void
*
buf
,
size_t
len
,
int
flags
);
ZMQ_EXPORT
int
zmq_socket_monitor
(
void
*
s
,
const
char
*
addr
,
int
events
);
ZMQ_EXPORT
int
zmq_socket_monitor
(
void
*
s
,
const
char
*
addr
,
int
events
);
ZMQ_EXPORT
int
zmq_add_poll
er
(
void
*
s
,
void
*
p
);
ZMQ_EXPORT
int
zmq_add_poll
fd
(
void
*
s
,
void
*
p
);
ZMQ_EXPORT
int
zmq_remove_poll
er
(
void
*
s
,
void
*
p
);
ZMQ_EXPORT
int
zmq_remove_poll
fd
(
void
*
s
,
void
*
p
);
/******************************************************************************/
/******************************************************************************/
/* I/O multiplexing. */
/* I/O multiplexing. */
...
@@ -405,15 +405,20 @@ typedef struct zmq_pollitem_t
...
@@ -405,15 +405,20 @@ typedef struct zmq_pollitem_t
#endif
#endif
short
events
;
short
events
;
short
revents
;
short
revents
;
void
*
poller
;
}
zmq_pollitem_t
;
}
zmq_pollitem_t
;
#define ZMQ_POLLITEMS_DFLT 16
#define ZMQ_POLLITEMS_DFLT 16
ZMQ_EXPORT
int
zmq_poll
(
zmq_pollitem_t
*
items
,
int
nitems
,
long
timeout
);
ZMQ_EXPORT
int
zmq_poll
(
zmq_pollitem_t
*
items
,
int
nitems
,
long
timeout
);
ZMQ_EXPORT
void
*
zmq_poll
er
_new
();
ZMQ_EXPORT
void
*
zmq_poll
fd
_new
();
ZMQ_EXPORT
int
zmq_poll
er
_close
(
void
*
p
);
ZMQ_EXPORT
int
zmq_poll
fd
_close
(
void
*
p
);
#if defined _WIN32
ZMQ_EXPORT
SOCKET
zmq_pollfd_fd
(
void
*
p
);
#else
ZMQ_EXPORT
int
zmq_pollfd_fd
(
void
*
p
);
#endif
/******************************************************************************/
/******************************************************************************/
/* Message proxying */
/* Message proxying */
/******************************************************************************/
/******************************************************************************/
...
...
src/zmq.cpp
View file @
a96f16e3
...
@@ -562,9 +562,9 @@ int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_)
...
@@ -562,9 +562,9 @@ int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_)
return
nread
;
return
nread
;
}
}
// Add/remove poll
er
from a socket
// Add/remove poll
fd
from a socket
int
zmq_add_poll
er
(
void
*
s_
,
void
*
p_
)
int
zmq_add_poll
fd
(
void
*
s_
,
void
*
p_
)
{
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
errno
=
ENOTSOCK
;
errno
=
ENOTSOCK
;
...
@@ -576,7 +576,7 @@ int zmq_add_poller (void *s_, void *p_)
...
@@ -576,7 +576,7 @@ int zmq_add_poller (void *s_, void *p_)
return
s
->
add_signaler
(
p
);
return
s
->
add_signaler
(
p
);
}
}
int
zmq_remove_poll
er
(
void
*
s_
,
void
*
p_
)
int
zmq_remove_poll
fd
(
void
*
s_
,
void
*
p_
)
{
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
errno
=
ENOTSOCK
;
errno
=
ENOTSOCK
;
...
@@ -709,16 +709,16 @@ const char *zmq_msg_gets (zmq_msg_t *msg_, const char *property_)
...
@@ -709,16 +709,16 @@ const char *zmq_msg_gets (zmq_msg_t *msg_, const char *property_)
}
}
}
}
// Create poll
er
// Create poll
fd
void
*
zmq_poll
er
_new
()
void
*
zmq_poll
fd
_new
()
{
{
return
new
zmq
::
signaler_t
();
return
new
zmq
::
signaler_t
();
}
}
// Close poll
er
// Close poll
fd
int
zmq_poll
er
_close
(
void
*
p
)
int
zmq_poll
fd
_close
(
void
*
p
)
{
{
zmq
::
signaler_t
*
s
=
(
zmq
::
signaler_t
*
)
p
;
zmq
::
signaler_t
*
s
=
(
zmq
::
signaler_t
*
)
p
;
LIBZMQ_DELETE
(
s
);
LIBZMQ_DELETE
(
s
);
...
@@ -727,8 +727,11 @@ int zmq_poller_close (void* p)
...
@@ -727,8 +727,11 @@ int zmq_poller_close (void* p)
}
}
// Get poller fd
// Get poller fd
#if defined _WIN32
zmq
::
fd_t
zmq_poller_get_fd
(
void
*
p
)
SOCKET
zmq_pollfd_fd
(
void
*
p
)
#else
int
zmq_pollfd_fd
(
void
*
p
)
#endif
{
{
zmq
::
signaler_t
*
s
=
(
zmq
::
signaler_t
*
)
p
;
zmq
::
signaler_t
*
s
=
(
zmq
::
signaler_t
*
)
p
;
return
s
->
get_fd
();
return
s
->
get_fd
();
...
@@ -790,14 +793,14 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
...
@@ -790,14 +793,14 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
}
}
if
(
thread_safe
)
{
if
(
thread_safe
)
{
if
(
!
items_
[
i
].
poller
)
{
if
(
!
items_
[
i
].
fd
)
{
if
(
pollfds
!=
spollfds
)
if
(
pollfds
!=
spollfds
)
free
(
pollfds
);
free
(
pollfds
);
errno
=
EINVAL
;
errno
=
EINVAL
;
return
-
1
;
return
-
1
;
}
}
pollfds
[
i
].
fd
=
zmq_poller_get_fd
(
items_
[
i
].
poller
)
;
pollfds
[
i
].
fd
=
items_
[
i
].
fd
;
}
}
else
{
else
{
size_t
zmq_fd_size
=
sizeof
(
zmq
::
fd_t
);
size_t
zmq_fd_size
=
sizeof
(
zmq
::
fd_t
);
...
@@ -974,12 +977,12 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
...
@@ -974,12 +977,12 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
zmq
::
fd_t
notify_fd
;
zmq
::
fd_t
notify_fd
;
if
(
thread_safe
)
{
if
(
thread_safe
)
{
if
(
!
items_
[
i
].
poller
)
{
if
(
!
items_
[
i
].
fd
)
{
errno
=
EINVAL
;
errno
=
EINVAL
;
return
-
1
;
return
-
1
;
}
}
notify_fd
=
zmq_poller_get_fd
(
items_
[
i
].
poller
)
;
notify_fd
=
items_
[
i
].
fd
;
}
}
else
{
else
{
size_t
zmq_fd_size
=
sizeof
(
zmq
::
fd_t
);
size_t
zmq_fd_size
=
sizeof
(
zmq
::
fd_t
);
...
...
tests/test_thread_safe_polling.cpp
View file @
a96f16e3
...
@@ -39,25 +39,19 @@ int main (void)
...
@@ -39,25 +39,19 @@ int main (void)
void
*
server
=
zmq_socket
(
ctx
,
ZMQ_SERVER
);
void
*
server
=
zmq_socket
(
ctx
,
ZMQ_SERVER
);
void
*
server2
=
zmq_socket
(
ctx
,
ZMQ_SERVER
);
void
*
server2
=
zmq_socket
(
ctx
,
ZMQ_SERVER
);
void
*
poll
er
=
zmq_poller
_new
();
void
*
poll
fd
=
zmq_pollfd
_new
();
int
rc
;
int
rc
;
rc
=
zmq_add_poll
er
(
server
,
poller
);
rc
=
zmq_add_poll
fd
(
server
,
pollfd
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_add_poll
er
(
server2
,
poller
);
rc
=
zmq_add_poll
fd
(
server2
,
pollfd
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
zmq_pollitem_t
items
[
2
];
zmq_pollitem_t
items
[]
=
{
{
server
,
zmq_pollfd_fd
(
pollfd
),
ZMQ_POLLIN
,
0
},
items
[
0
].
socket
=
server
;
{
server2
,
zmq_pollfd_fd
(
pollfd
),
ZMQ_POLLIN
,
0
}};
items
[
0
].
poller
=
poller
;
items
[
0
].
events
=
ZMQ_POLLIN
;
items
[
1
].
socket
=
server2
;
items
[
1
].
poller
=
poller
;
items
[
1
].
events
=
ZMQ_POLLIN
;
rc
=
zmq_bind
(
server
,
"tcp://127.0.0.1:5560"
);
rc
=
zmq_bind
(
server
,
"tcp://127.0.0.1:5560"
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
...
@@ -100,13 +94,13 @@ int main (void)
...
@@ -100,13 +94,13 @@ int main (void)
rc
=
zmq_msg_close
(
&
msg
);
rc
=
zmq_msg_close
(
&
msg
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_remove_poll
er
(
server
,
poller
);
rc
=
zmq_remove_poll
fd
(
server
,
pollfd
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_remove_poll
er
(
server2
,
poller
);
rc
=
zmq_remove_poll
fd
(
server2
,
pollfd
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_poll
er_close
(
poller
);
rc
=
zmq_poll
fd_close
(
pollfd
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
server
);
rc
=
zmq_close
(
server
);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment