Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
0d459318
Commit
0d459318
authored
Oct 20, 2014
by
Pieter Hintjens
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Revert "ZMQ API diverges from POSIX"
parent
527eddc9
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
38 additions
and
39 deletions
+38
-39
zmq.h
include/zmq.h
+5
-6
zmq.cpp
src/zmq.cpp
+33
-33
No files found.
include/zmq.h
View file @
0d459318
...
@@ -87,7 +87,6 @@ typedef unsigned __int8 uint8_t;
...
@@ -87,7 +87,6 @@ typedef unsigned __int8 uint8_t;
# include <stdint.h>
# include <stdint.h>
#endif
#endif
#include <sys/types.h>
/******************************************************************************/
/******************************************************************************/
/* 0MQ errors. */
/* 0MQ errors. */
...
@@ -213,8 +212,8 @@ ZMQ_EXPORT int zmq_msg_init (zmq_msg_t *msg);
...
@@ -213,8 +212,8 @@ ZMQ_EXPORT int zmq_msg_init (zmq_msg_t *msg);
ZMQ_EXPORT
int
zmq_msg_init_size
(
zmq_msg_t
*
msg
,
size_t
size
);
ZMQ_EXPORT
int
zmq_msg_init_size
(
zmq_msg_t
*
msg
,
size_t
size
);
ZMQ_EXPORT
int
zmq_msg_init_data
(
zmq_msg_t
*
msg
,
void
*
data
,
ZMQ_EXPORT
int
zmq_msg_init_data
(
zmq_msg_t
*
msg
,
void
*
data
,
size_t
size
,
zmq_free_fn
*
ffn
,
void
*
hint
);
size_t
size
,
zmq_free_fn
*
ffn
,
void
*
hint
);
ZMQ_EXPORT
ssize_
t
zmq_msg_send
(
zmq_msg_t
*
msg
,
void
*
s
,
int
flags
);
ZMQ_EXPORT
in
t
zmq_msg_send
(
zmq_msg_t
*
msg
,
void
*
s
,
int
flags
);
ZMQ_EXPORT
ssize_
t
zmq_msg_recv
(
zmq_msg_t
*
msg
,
void
*
s
,
int
flags
);
ZMQ_EXPORT
in
t
zmq_msg_recv
(
zmq_msg_t
*
msg
,
void
*
s
,
int
flags
);
ZMQ_EXPORT
int
zmq_msg_close
(
zmq_msg_t
*
msg
);
ZMQ_EXPORT
int
zmq_msg_close
(
zmq_msg_t
*
msg
);
ZMQ_EXPORT
int
zmq_msg_move
(
zmq_msg_t
*
dest
,
zmq_msg_t
*
src
);
ZMQ_EXPORT
int
zmq_msg_move
(
zmq_msg_t
*
dest
,
zmq_msg_t
*
src
);
ZMQ_EXPORT
int
zmq_msg_copy
(
zmq_msg_t
*
dest
,
zmq_msg_t
*
src
);
ZMQ_EXPORT
int
zmq_msg_copy
(
zmq_msg_t
*
dest
,
zmq_msg_t
*
src
);
...
@@ -361,9 +360,9 @@ ZMQ_EXPORT int zmq_bind (void *s, const char *addr);
...
@@ -361,9 +360,9 @@ ZMQ_EXPORT int zmq_bind (void *s, const char *addr);
ZMQ_EXPORT
int
zmq_connect
(
void
*
s
,
const
char
*
addr
);
ZMQ_EXPORT
int
zmq_connect
(
void
*
s
,
const
char
*
addr
);
ZMQ_EXPORT
int
zmq_unbind
(
void
*
s
,
const
char
*
addr
);
ZMQ_EXPORT
int
zmq_unbind
(
void
*
s
,
const
char
*
addr
);
ZMQ_EXPORT
int
zmq_disconnect
(
void
*
s
,
const
char
*
addr
);
ZMQ_EXPORT
int
zmq_disconnect
(
void
*
s
,
const
char
*
addr
);
ZMQ_EXPORT
ssize_
t
zmq_send
(
void
*
s
,
const
void
*
buf
,
size_t
len
,
int
flags
);
ZMQ_EXPORT
in
t
zmq_send
(
void
*
s
,
const
void
*
buf
,
size_t
len
,
int
flags
);
ZMQ_EXPORT
ssize_
t
zmq_send_const
(
void
*
s
,
const
void
*
buf
,
size_t
len
,
int
flags
);
ZMQ_EXPORT
in
t
zmq_send_const
(
void
*
s
,
const
void
*
buf
,
size_t
len
,
int
flags
);
ZMQ_EXPORT
ssize_
t
zmq_recv
(
void
*
s
,
void
*
buf
,
size_t
len
,
int
flags
);
ZMQ_EXPORT
in
t
zmq_recv
(
void
*
s
,
void
*
buf
,
size_t
len
,
int
flags
);
ZMQ_EXPORT
int
zmq_socket_monitor
(
void
*
s
,
const
char
*
addr
,
int
events
);
ZMQ_EXPORT
int
zmq_socket_monitor
(
void
*
s
,
const
char
*
addr
,
int
events
);
...
...
src/zmq.cpp
View file @
0d459318
...
@@ -329,38 +329,37 @@ int zmq_disconnect (void *s_, const char *addr_)
...
@@ -329,38 +329,37 @@ int zmq_disconnect (void *s_, const char *addr_)
// Sending functions.
// Sending functions.
static
ssize_
t
static
in
t
s_sendmsg
(
zmq
::
socket_base_t
*
s_
,
zmq_msg_t
*
msg_
,
int
flags_
)
s_sendmsg
(
zmq
::
socket_base_t
*
s_
,
zmq_msg_t
*
msg_
,
int
flags_
)
{
{
size_t
nbytes
=
zmq_msg_size
(
msg_
);
int
sz
=
(
int
)
zmq_msg_size
(
msg_
);
int
rc
=
s_
->
send
((
zmq
::
msg_t
*
)
msg_
,
flags_
);
int
rc
=
s_
->
send
((
zmq
::
msg_t
*
)
msg_
,
flags_
);
if
(
unlikely
(
rc
<
0
))
if
(
unlikely
(
rc
<
0
))
return
-
1
;
return
-
1
;
return
nbytes
;
return
sz
;
}
}
/* To be deprecated once zmq_msg_send() is stable */
/* To be deprecated once zmq_msg_send() is stable */
int
zmq_sendmsg
(
void
*
s_
,
zmq_msg_t
*
msg_
,
int
flags_
)
int
zmq_sendmsg
(
void
*
s_
,
zmq_msg_t
*
msg_
,
int
flags_
)
{
{
// reproduce behaviour of truncated int in deprecated API
return
zmq_msg_send
(
msg_
,
s_
,
flags_
);
return
(
int
)
zmq_msg_send
(
msg_
,
s_
,
flags_
);
}
}
ssize_
t
zmq_send
(
void
*
s_
,
const
void
*
buf_
,
size_t
len_
,
int
flags_
)
in
t
zmq_send
(
void
*
s_
,
const
void
*
buf_
,
size_t
len_
,
int
flags_
)
{
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
errno
=
ENOTSOCK
;
errno
=
ENOTSOCK
;
return
-
1
;
return
-
1
;
}
}
zmq_msg_t
msg
;
zmq_msg_t
msg
;
int
rc
1
=
zmq_msg_init_size
(
&
msg
,
len_
);
int
rc
=
zmq_msg_init_size
(
&
msg
,
len_
);
if
(
rc
1
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
-
1
;
memcpy
(
zmq_msg_data
(
&
msg
),
buf_
,
len_
);
memcpy
(
zmq_msg_data
(
&
msg
),
buf_
,
len_
);
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
ssize_t
nbytes
=
s_sendmsg
(
s
,
&
msg
,
flags_
);
rc
=
s_sendmsg
(
s
,
&
msg
,
flags_
);
if
(
unlikely
(
nbytes
<
0
))
{
if
(
unlikely
(
rc
<
0
))
{
int
err
=
errno
;
int
err
=
errno
;
int
rc2
=
zmq_msg_close
(
&
msg
);
int
rc2
=
zmq_msg_close
(
&
msg
);
errno_assert
(
rc2
==
0
);
errno_assert
(
rc2
==
0
);
...
@@ -370,23 +369,23 @@ ssize_t zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
...
@@ -370,23 +369,23 @@ ssize_t zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
// Note the optimisation here. We don't close the msg object as it is
// Note the optimisation here. We don't close the msg object as it is
// empty anyway. This may change when implementation of zmq_msg_t changes.
// empty anyway. This may change when implementation of zmq_msg_t changes.
return
nbytes
;
return
rc
;
}
}
ssize_
t
zmq_send_const
(
void
*
s_
,
const
void
*
buf_
,
size_t
len_
,
int
flags_
)
in
t
zmq_send_const
(
void
*
s_
,
const
void
*
buf_
,
size_t
len_
,
int
flags_
)
{
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
errno
=
ENOTSOCK
;
errno
=
ENOTSOCK
;
return
-
1
;
return
-
1
;
}
}
zmq_msg_t
msg
;
zmq_msg_t
msg
;
int
rc
1
=
zmq_msg_init_data
(
&
msg
,
(
void
*
)
buf_
,
len_
,
NULL
,
NULL
);
int
rc
=
zmq_msg_init_data
(
&
msg
,
(
void
*
)
buf_
,
len_
,
NULL
,
NULL
);
if
(
rc
1
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
-
1
;
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
ssize_t
nbytes
=
s_sendmsg
(
s
,
&
msg
,
flags_
);
rc
=
s_sendmsg
(
s
,
&
msg
,
flags_
);
if
(
unlikely
(
nbytes
<
0
))
{
if
(
unlikely
(
rc
<
0
))
{
int
err
=
errno
;
int
err
=
errno
;
int
rc2
=
zmq_msg_close
(
&
msg
);
int
rc2
=
zmq_msg_close
(
&
msg
);
errno_assert
(
rc2
==
0
);
errno_assert
(
rc2
==
0
);
...
@@ -396,7 +395,7 @@ ssize_t zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_)
...
@@ -396,7 +395,7 @@ ssize_t zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_)
// Note the optimisation here. We don't close the msg object as it is
// Note the optimisation here. We don't close the msg object as it is
// empty anyway. This may change when implementation of zmq_msg_t changes.
// empty anyway. This may change when implementation of zmq_msg_t changes.
return
nbytes
;
return
rc
;
}
}
...
@@ -426,8 +425,8 @@ int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_)
...
@@ -426,8 +425,8 @@ int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_)
memcpy
(
zmq_msg_data
(
&
msg
),
a_
[
i
].
iov_base
,
a_
[
i
].
iov_len
);
memcpy
(
zmq_msg_data
(
&
msg
),
a_
[
i
].
iov_base
,
a_
[
i
].
iov_len
);
if
(
i
==
count_
-
1
)
if
(
i
==
count_
-
1
)
flags_
=
flags_
&
~
ZMQ_SNDMORE
;
flags_
=
flags_
&
~
ZMQ_SNDMORE
;
ssize_t
nbytes
=
s_sendmsg
(
s
,
&
msg
,
flags_
);
rc
=
s_sendmsg
(
s
,
&
msg
,
flags_
);
if
(
unlikely
(
nbytes
<
0
))
{
if
(
unlikely
(
rc
<
0
))
{
int
err
=
errno
;
int
err
=
errno
;
int
rc2
=
zmq_msg_close
(
&
msg
);
int
rc2
=
zmq_msg_close
(
&
msg
);
errno_assert
(
rc2
==
0
);
errno_assert
(
rc2
==
0
);
...
@@ -441,24 +440,23 @@ int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_)
...
@@ -441,24 +440,23 @@ int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_)
// Receiving functions.
// Receiving functions.
static
ssize_
t
static
in
t
s_recvmsg
(
zmq
::
socket_base_t
*
s_
,
zmq_msg_t
*
msg_
,
int
flags_
)
s_recvmsg
(
zmq
::
socket_base_t
*
s_
,
zmq_msg_t
*
msg_
,
int
flags_
)
{
{
int
rc
=
s_
->
recv
((
zmq
::
msg_t
*
)
msg_
,
flags_
);
int
rc
=
s_
->
recv
((
zmq
::
msg_t
*
)
msg_
,
flags_
);
if
(
unlikely
(
rc
<
0
))
if
(
unlikely
(
rc
<
0
))
return
-
1
;
return
-
1
;
return
zmq_msg_size
(
msg_
);
return
(
int
)
zmq_msg_size
(
msg_
);
}
}
/* To be deprecated once zmq_msg_recv() is stable */
/* To be deprecated once zmq_msg_recv() is stable */
int
zmq_recvmsg
(
void
*
s_
,
zmq_msg_t
*
msg_
,
int
flags_
)
int
zmq_recvmsg
(
void
*
s_
,
zmq_msg_t
*
msg_
,
int
flags_
)
{
{
// reproduce behaviour of truncated int in deprecated API
return
zmq_msg_recv
(
msg_
,
s_
,
flags_
);
return
(
int
)
zmq_msg_recv
(
msg_
,
s_
,
flags_
);
}
}
ssize_
t
zmq_recv
(
void
*
s_
,
void
*
buf_
,
size_t
len_
,
int
flags_
)
in
t
zmq_recv
(
void
*
s_
,
void
*
buf_
,
size_t
len_
,
int
flags_
)
{
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
errno
=
ENOTSOCK
;
errno
=
ENOTSOCK
;
...
@@ -469,7 +467,7 @@ ssize_t zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
...
@@ -469,7 +467,7 @@ ssize_t zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
ssize_
t
nbytes
=
s_recvmsg
(
s
,
&
msg
,
flags_
);
in
t
nbytes
=
s_recvmsg
(
s
,
&
msg
,
flags_
);
if
(
unlikely
(
nbytes
<
0
))
{
if
(
unlikely
(
nbytes
<
0
))
{
int
err
=
errno
;
int
err
=
errno
;
rc
=
zmq_msg_close
(
&
msg
);
rc
=
zmq_msg_close
(
&
msg
);
...
@@ -478,7 +476,9 @@ ssize_t zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
...
@@ -478,7 +476,9 @@ ssize_t zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
return
-
1
;
return
-
1
;
}
}
ssize_t
to_copy
=
(
size_t
)
nbytes
<
len_
?
nbytes
:
len_
;
// At the moment an oversized message is silently truncated.
// TODO: Build in a notification mechanism to report the overflows.
size_t
to_copy
=
size_t
(
nbytes
)
<
len_
?
size_t
(
nbytes
)
:
len_
;
memcpy
(
buf_
,
zmq_msg_data
(
&
msg
),
to_copy
);
memcpy
(
buf_
,
zmq_msg_data
(
&
msg
),
to_copy
);
rc
=
zmq_msg_close
(
&
msg
);
rc
=
zmq_msg_close
(
&
msg
);
...
@@ -523,7 +523,7 @@ int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_)
...
@@ -523,7 +523,7 @@ int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_)
int
rc
=
zmq_msg_init
(
&
msg
);
int
rc
=
zmq_msg_init
(
&
msg
);
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
ssize_
t
nbytes
=
s_recvmsg
(
s
,
&
msg
,
flags_
);
in
t
nbytes
=
s_recvmsg
(
s
,
&
msg
,
flags_
);
if
(
unlikely
(
nbytes
<
0
))
{
if
(
unlikely
(
nbytes
<
0
))
{
int
err
=
errno
;
int
err
=
errno
;
rc
=
zmq_msg_close
(
&
msg
);
rc
=
zmq_msg_close
(
&
msg
);
...
@@ -569,26 +569,26 @@ int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
...
@@ -569,26 +569,26 @@ int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
return
((
zmq
::
msg_t
*
)
msg_
)
->
init_data
(
data_
,
size_
,
ffn_
,
hint_
);
return
((
zmq
::
msg_t
*
)
msg_
)
->
init_data
(
data_
,
size_
,
ffn_
,
hint_
);
}
}
ssize_
t
zmq_msg_send
(
zmq_msg_t
*
msg_
,
void
*
s_
,
int
flags_
)
in
t
zmq_msg_send
(
zmq_msg_t
*
msg_
,
void
*
s_
,
int
flags_
)
{
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
errno
=
ENOTSOCK
;
errno
=
ENOTSOCK
;
return
-
1
;
return
-
1
;
}
}
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
ssize_t
nbytes
=
s_sendmsg
(
s
,
msg_
,
flags_
);
int
result
=
s_sendmsg
(
s
,
msg_
,
flags_
);
return
nbytes
;
return
result
;
}
}
ssize_
t
zmq_msg_recv
(
zmq_msg_t
*
msg_
,
void
*
s_
,
int
flags_
)
in
t
zmq_msg_recv
(
zmq_msg_t
*
msg_
,
void
*
s_
,
int
flags_
)
{
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
errno
=
ENOTSOCK
;
errno
=
ENOTSOCK
;
return
-
1
;
return
-
1
;
}
}
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
ssize_t
nbytes
=
s_recvmsg
(
s
,
msg_
,
flags_
);
int
result
=
s_recvmsg
(
s
,
msg_
,
flags_
);
return
nbytes
;
return
result
;
}
}
int
zmq_msg_close
(
zmq_msg_t
*
msg_
)
int
zmq_msg_close
(
zmq_msg_t
*
msg_
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment