Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
527eddc9
Commit
527eddc9
authored
Oct 20, 2014
by
Pieter Hintjens
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #1183 from mschneider/size_fix
ZMQ API diverges from POSIX
parents
b1d766a3
b55288fd
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
39 additions
and
38 deletions
+39
-38
zmq.h
include/zmq.h
+6
-5
zmq.cpp
src/zmq.cpp
+33
-33
No files found.
include/zmq.h
View file @
527eddc9
...
@@ -87,6 +87,7 @@ typedef unsigned __int8 uint8_t;
...
@@ -87,6 +87,7 @@ typedef unsigned __int8 uint8_t;
# include <stdint.h>
# include <stdint.h>
#endif
#endif
#include <sys/types.h>
/******************************************************************************/
/******************************************************************************/
/* 0MQ errors. */
/* 0MQ errors. */
...
@@ -212,8 +213,8 @@ ZMQ_EXPORT int zmq_msg_init (zmq_msg_t *msg);
...
@@ -212,8 +213,8 @@ ZMQ_EXPORT int zmq_msg_init (zmq_msg_t *msg);
ZMQ_EXPORT
int
zmq_msg_init_size
(
zmq_msg_t
*
msg
,
size_t
size
);
ZMQ_EXPORT
int
zmq_msg_init_size
(
zmq_msg_t
*
msg
,
size_t
size
);
ZMQ_EXPORT
int
zmq_msg_init_data
(
zmq_msg_t
*
msg
,
void
*
data
,
ZMQ_EXPORT
int
zmq_msg_init_data
(
zmq_msg_t
*
msg
,
void
*
data
,
size_t
size
,
zmq_free_fn
*
ffn
,
void
*
hint
);
size_t
size
,
zmq_free_fn
*
ffn
,
void
*
hint
);
ZMQ_EXPORT
in
t
zmq_msg_send
(
zmq_msg_t
*
msg
,
void
*
s
,
int
flags
);
ZMQ_EXPORT
ssize_
t
zmq_msg_send
(
zmq_msg_t
*
msg
,
void
*
s
,
int
flags
);
ZMQ_EXPORT
in
t
zmq_msg_recv
(
zmq_msg_t
*
msg
,
void
*
s
,
int
flags
);
ZMQ_EXPORT
ssize_
t
zmq_msg_recv
(
zmq_msg_t
*
msg
,
void
*
s
,
int
flags
);
ZMQ_EXPORT
int
zmq_msg_close
(
zmq_msg_t
*
msg
);
ZMQ_EXPORT
int
zmq_msg_close
(
zmq_msg_t
*
msg
);
ZMQ_EXPORT
int
zmq_msg_move
(
zmq_msg_t
*
dest
,
zmq_msg_t
*
src
);
ZMQ_EXPORT
int
zmq_msg_move
(
zmq_msg_t
*
dest
,
zmq_msg_t
*
src
);
ZMQ_EXPORT
int
zmq_msg_copy
(
zmq_msg_t
*
dest
,
zmq_msg_t
*
src
);
ZMQ_EXPORT
int
zmq_msg_copy
(
zmq_msg_t
*
dest
,
zmq_msg_t
*
src
);
...
@@ -360,9 +361,9 @@ ZMQ_EXPORT int zmq_bind (void *s, const char *addr);
...
@@ -360,9 +361,9 @@ ZMQ_EXPORT int zmq_bind (void *s, const char *addr);
ZMQ_EXPORT
int
zmq_connect
(
void
*
s
,
const
char
*
addr
);
ZMQ_EXPORT
int
zmq_connect
(
void
*
s
,
const
char
*
addr
);
ZMQ_EXPORT
int
zmq_unbind
(
void
*
s
,
const
char
*
addr
);
ZMQ_EXPORT
int
zmq_unbind
(
void
*
s
,
const
char
*
addr
);
ZMQ_EXPORT
int
zmq_disconnect
(
void
*
s
,
const
char
*
addr
);
ZMQ_EXPORT
int
zmq_disconnect
(
void
*
s
,
const
char
*
addr
);
ZMQ_EXPORT
in
t
zmq_send
(
void
*
s
,
const
void
*
buf
,
size_t
len
,
int
flags
);
ZMQ_EXPORT
ssize_
t
zmq_send
(
void
*
s
,
const
void
*
buf
,
size_t
len
,
int
flags
);
ZMQ_EXPORT
in
t
zmq_send_const
(
void
*
s
,
const
void
*
buf
,
size_t
len
,
int
flags
);
ZMQ_EXPORT
ssize_
t
zmq_send_const
(
void
*
s
,
const
void
*
buf
,
size_t
len
,
int
flags
);
ZMQ_EXPORT
in
t
zmq_recv
(
void
*
s
,
void
*
buf
,
size_t
len
,
int
flags
);
ZMQ_EXPORT
ssize_
t
zmq_recv
(
void
*
s
,
void
*
buf
,
size_t
len
,
int
flags
);
ZMQ_EXPORT
int
zmq_socket_monitor
(
void
*
s
,
const
char
*
addr
,
int
events
);
ZMQ_EXPORT
int
zmq_socket_monitor
(
void
*
s
,
const
char
*
addr
,
int
events
);
...
...
src/zmq.cpp
View file @
527eddc9
...
@@ -329,37 +329,38 @@ int zmq_disconnect (void *s_, const char *addr_)
...
@@ -329,37 +329,38 @@ int zmq_disconnect (void *s_, const char *addr_)
// Sending functions.
// Sending functions.
static
in
t
static
ssize_
t
s_sendmsg
(
zmq
::
socket_base_t
*
s_
,
zmq_msg_t
*
msg_
,
int
flags_
)
s_sendmsg
(
zmq
::
socket_base_t
*
s_
,
zmq_msg_t
*
msg_
,
int
flags_
)
{
{
int
sz
=
(
int
)
zmq_msg_size
(
msg_
);
size_t
nbytes
=
zmq_msg_size
(
msg_
);
int
rc
=
s_
->
send
((
zmq
::
msg_t
*
)
msg_
,
flags_
);
int
rc
=
s_
->
send
((
zmq
::
msg_t
*
)
msg_
,
flags_
);
if
(
unlikely
(
rc
<
0
))
if
(
unlikely
(
rc
<
0
))
return
-
1
;
return
-
1
;
return
sz
;
return
nbytes
;
}
}
/* To be deprecated once zmq_msg_send() is stable */
/* To be deprecated once zmq_msg_send() is stable */
int
zmq_sendmsg
(
void
*
s_
,
zmq_msg_t
*
msg_
,
int
flags_
)
int
zmq_sendmsg
(
void
*
s_
,
zmq_msg_t
*
msg_
,
int
flags_
)
{
{
return
zmq_msg_send
(
msg_
,
s_
,
flags_
);
// reproduce behaviour of truncated int in deprecated API
return
(
int
)
zmq_msg_send
(
msg_
,
s_
,
flags_
);
}
}
in
t
zmq_send
(
void
*
s_
,
const
void
*
buf_
,
size_t
len_
,
int
flags_
)
ssize_
t
zmq_send
(
void
*
s_
,
const
void
*
buf_
,
size_t
len_
,
int
flags_
)
{
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
errno
=
ENOTSOCK
;
errno
=
ENOTSOCK
;
return
-
1
;
return
-
1
;
}
}
zmq_msg_t
msg
;
zmq_msg_t
msg
;
int
rc
=
zmq_msg_init_size
(
&
msg
,
len_
);
int
rc
1
=
zmq_msg_init_size
(
&
msg
,
len_
);
if
(
rc
!=
0
)
if
(
rc
1
!=
0
)
return
-
1
;
return
-
1
;
memcpy
(
zmq_msg_data
(
&
msg
),
buf_
,
len_
);
memcpy
(
zmq_msg_data
(
&
msg
),
buf_
,
len_
);
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
rc
=
s_sendmsg
(
s
,
&
msg
,
flags_
);
ssize_t
nbytes
=
s_sendmsg
(
s
,
&
msg
,
flags_
);
if
(
unlikely
(
rc
<
0
))
{
if
(
unlikely
(
nbytes
<
0
))
{
int
err
=
errno
;
int
err
=
errno
;
int
rc2
=
zmq_msg_close
(
&
msg
);
int
rc2
=
zmq_msg_close
(
&
msg
);
errno_assert
(
rc2
==
0
);
errno_assert
(
rc2
==
0
);
...
@@ -369,23 +370,23 @@ int zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
...
@@ -369,23 +370,23 @@ int zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
// Note the optimisation here. We don't close the msg object as it is
// Note the optimisation here. We don't close the msg object as it is
// empty anyway. This may change when implementation of zmq_msg_t changes.
// empty anyway. This may change when implementation of zmq_msg_t changes.
return
rc
;
return
nbytes
;
}
}
in
t
zmq_send_const
(
void
*
s_
,
const
void
*
buf_
,
size_t
len_
,
int
flags_
)
ssize_
t
zmq_send_const
(
void
*
s_
,
const
void
*
buf_
,
size_t
len_
,
int
flags_
)
{
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
errno
=
ENOTSOCK
;
errno
=
ENOTSOCK
;
return
-
1
;
return
-
1
;
}
}
zmq_msg_t
msg
;
zmq_msg_t
msg
;
int
rc
=
zmq_msg_init_data
(
&
msg
,
(
void
*
)
buf_
,
len_
,
NULL
,
NULL
);
int
rc
1
=
zmq_msg_init_data
(
&
msg
,
(
void
*
)
buf_
,
len_
,
NULL
,
NULL
);
if
(
rc
!=
0
)
if
(
rc
1
!=
0
)
return
-
1
;
return
-
1
;
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
rc
=
s_sendmsg
(
s
,
&
msg
,
flags_
);
ssize_t
nbytes
=
s_sendmsg
(
s
,
&
msg
,
flags_
);
if
(
unlikely
(
rc
<
0
))
{
if
(
unlikely
(
nbytes
<
0
))
{
int
err
=
errno
;
int
err
=
errno
;
int
rc2
=
zmq_msg_close
(
&
msg
);
int
rc2
=
zmq_msg_close
(
&
msg
);
errno_assert
(
rc2
==
0
);
errno_assert
(
rc2
==
0
);
...
@@ -395,7 +396,7 @@ int zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_)
...
@@ -395,7 +396,7 @@ int zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_)
// Note the optimisation here. We don't close the msg object as it is
// Note the optimisation here. We don't close the msg object as it is
// empty anyway. This may change when implementation of zmq_msg_t changes.
// empty anyway. This may change when implementation of zmq_msg_t changes.
return
rc
;
return
nbytes
;
}
}
...
@@ -425,8 +426,8 @@ int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_)
...
@@ -425,8 +426,8 @@ int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_)
memcpy
(
zmq_msg_data
(
&
msg
),
a_
[
i
].
iov_base
,
a_
[
i
].
iov_len
);
memcpy
(
zmq_msg_data
(
&
msg
),
a_
[
i
].
iov_base
,
a_
[
i
].
iov_len
);
if
(
i
==
count_
-
1
)
if
(
i
==
count_
-
1
)
flags_
=
flags_
&
~
ZMQ_SNDMORE
;
flags_
=
flags_
&
~
ZMQ_SNDMORE
;
rc
=
s_sendmsg
(
s
,
&
msg
,
flags_
);
ssize_t
nbytes
=
s_sendmsg
(
s
,
&
msg
,
flags_
);
if
(
unlikely
(
rc
<
0
))
{
if
(
unlikely
(
nbytes
<
0
))
{
int
err
=
errno
;
int
err
=
errno
;
int
rc2
=
zmq_msg_close
(
&
msg
);
int
rc2
=
zmq_msg_close
(
&
msg
);
errno_assert
(
rc2
==
0
);
errno_assert
(
rc2
==
0
);
...
@@ -440,23 +441,24 @@ int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_)
...
@@ -440,23 +441,24 @@ int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_)
// Receiving functions.
// Receiving functions.
static
in
t
static
ssize_
t
s_recvmsg
(
zmq
::
socket_base_t
*
s_
,
zmq_msg_t
*
msg_
,
int
flags_
)
s_recvmsg
(
zmq
::
socket_base_t
*
s_
,
zmq_msg_t
*
msg_
,
int
flags_
)
{
{
int
rc
=
s_
->
recv
((
zmq
::
msg_t
*
)
msg_
,
flags_
);
int
rc
=
s_
->
recv
((
zmq
::
msg_t
*
)
msg_
,
flags_
);
if
(
unlikely
(
rc
<
0
))
if
(
unlikely
(
rc
<
0
))
return
-
1
;
return
-
1
;
return
(
int
)
zmq_msg_size
(
msg_
);
return
zmq_msg_size
(
msg_
);
}
}
/* To be deprecated once zmq_msg_recv() is stable */
/* To be deprecated once zmq_msg_recv() is stable */
int
zmq_recvmsg
(
void
*
s_
,
zmq_msg_t
*
msg_
,
int
flags_
)
int
zmq_recvmsg
(
void
*
s_
,
zmq_msg_t
*
msg_
,
int
flags_
)
{
{
return
zmq_msg_recv
(
msg_
,
s_
,
flags_
);
// reproduce behaviour of truncated int in deprecated API
return
(
int
)
zmq_msg_recv
(
msg_
,
s_
,
flags_
);
}
}
in
t
zmq_recv
(
void
*
s_
,
void
*
buf_
,
size_t
len_
,
int
flags_
)
ssize_
t
zmq_recv
(
void
*
s_
,
void
*
buf_
,
size_t
len_
,
int
flags_
)
{
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
errno
=
ENOTSOCK
;
errno
=
ENOTSOCK
;
...
@@ -467,7 +469,7 @@ int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
...
@@ -467,7 +469,7 @@ int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
in
t
nbytes
=
s_recvmsg
(
s
,
&
msg
,
flags_
);
ssize_
t
nbytes
=
s_recvmsg
(
s
,
&
msg
,
flags_
);
if
(
unlikely
(
nbytes
<
0
))
{
if
(
unlikely
(
nbytes
<
0
))
{
int
err
=
errno
;
int
err
=
errno
;
rc
=
zmq_msg_close
(
&
msg
);
rc
=
zmq_msg_close
(
&
msg
);
...
@@ -476,9 +478,7 @@ int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
...
@@ -476,9 +478,7 @@ int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
return
-
1
;
return
-
1
;
}
}
// At the moment an oversized message is silently truncated.
ssize_t
to_copy
=
(
size_t
)
nbytes
<
len_
?
nbytes
:
len_
;
// TODO: Build in a notification mechanism to report the overflows.
size_t
to_copy
=
size_t
(
nbytes
)
<
len_
?
size_t
(
nbytes
)
:
len_
;
memcpy
(
buf_
,
zmq_msg_data
(
&
msg
),
to_copy
);
memcpy
(
buf_
,
zmq_msg_data
(
&
msg
),
to_copy
);
rc
=
zmq_msg_close
(
&
msg
);
rc
=
zmq_msg_close
(
&
msg
);
...
@@ -523,7 +523,7 @@ int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_)
...
@@ -523,7 +523,7 @@ int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_)
int
rc
=
zmq_msg_init
(
&
msg
);
int
rc
=
zmq_msg_init
(
&
msg
);
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
in
t
nbytes
=
s_recvmsg
(
s
,
&
msg
,
flags_
);
ssize_
t
nbytes
=
s_recvmsg
(
s
,
&
msg
,
flags_
);
if
(
unlikely
(
nbytes
<
0
))
{
if
(
unlikely
(
nbytes
<
0
))
{
int
err
=
errno
;
int
err
=
errno
;
rc
=
zmq_msg_close
(
&
msg
);
rc
=
zmq_msg_close
(
&
msg
);
...
@@ -569,26 +569,26 @@ int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
...
@@ -569,26 +569,26 @@ int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
return
((
zmq
::
msg_t
*
)
msg_
)
->
init_data
(
data_
,
size_
,
ffn_
,
hint_
);
return
((
zmq
::
msg_t
*
)
msg_
)
->
init_data
(
data_
,
size_
,
ffn_
,
hint_
);
}
}
in
t
zmq_msg_send
(
zmq_msg_t
*
msg_
,
void
*
s_
,
int
flags_
)
ssize_
t
zmq_msg_send
(
zmq_msg_t
*
msg_
,
void
*
s_
,
int
flags_
)
{
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
errno
=
ENOTSOCK
;
errno
=
ENOTSOCK
;
return
-
1
;
return
-
1
;
}
}
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
int
result
=
s_sendmsg
(
s
,
msg_
,
flags_
);
ssize_t
nbytes
=
s_sendmsg
(
s
,
msg_
,
flags_
);
return
result
;
return
nbytes
;
}
}
in
t
zmq_msg_recv
(
zmq_msg_t
*
msg_
,
void
*
s_
,
int
flags_
)
ssize_
t
zmq_msg_recv
(
zmq_msg_t
*
msg_
,
void
*
s_
,
int
flags_
)
{
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
errno
=
ENOTSOCK
;
errno
=
ENOTSOCK
;
return
-
1
;
return
-
1
;
}
}
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
int
result
=
s_recvmsg
(
s
,
msg_
,
flags_
);
ssize_t
nbytes
=
s_recvmsg
(
s
,
msg_
,
flags_
);
return
result
;
return
nbytes
;
}
}
int
zmq_msg_close
(
zmq_msg_t
*
msg_
)
int
zmq_msg_close
(
zmq_msg_t
*
msg_
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment