Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
9321dfb8
Commit
9321dfb8
authored
Feb 16, 2012
by
Chuck Remes
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #248 from pieterh/scattered
Renamed scatter/gather methods, cleaned up source
parents
5d9432b2
4697634c
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
58 additions
and
45 deletions
+58
-45
zmq.h
include/zmq.h
+2
-2
zmq.cpp
src/zmq.cpp
+56
-43
No files found.
include/zmq.h
View file @
9321dfb8
...
...
@@ -252,8 +252,8 @@ ZMQ_EXPORT int zmq_sendmsg (zmq_socket_t s, zmq_msg_t *msg, int flags);
ZMQ_EXPORT
int
zmq_recvmsg
(
zmq_socket_t
s
,
zmq_msg_t
*
msg
,
int
flags
);
/* Experimental */
ZMQ_EXPORT
int
zmq_sendv
(
zmq_socket_t
s
,
struct
iovec
*
iov
,
size_t
count
,
int
flags
);
ZMQ_EXPORT
int
zmq_recv
mmsg
(
zmq_socket_t
s
,
struct
iovec
*
iov
,
size_t
*
count
,
int
flags
);
ZMQ_EXPORT
int
zmq_send
io
v
(
zmq_socket_t
s
,
struct
iovec
*
iov
,
size_t
count
,
int
flags
);
ZMQ_EXPORT
int
zmq_recv
iov
(
zmq_socket_t
s
,
struct
iovec
*
iov
,
size_t
*
count
,
int
flags
);
/******************************************************************************/
/* I/O multiplexing. */
...
...
src/zmq.cpp
View file @
9321dfb8
...
...
@@ -57,8 +57,7 @@
#if ZMQ_HAVE_UIO
#include <sys/uio.h>
#else
struct
iovec
{
struct
iovec
{
void
*
iov_base
;
size_t
iov_len
;
};
...
...
@@ -116,7 +115,7 @@ int zmq_errno ()
// Contexts.
static
zmq
::
ctx_t
*
inner
_init
(
int
io_threads_
)
static
zmq
::
ctx_t
*
s
_init
(
int
io_threads_
)
{
if
(
io_threads_
<
0
)
{
errno
=
EINVAL
;
...
...
@@ -170,14 +169,14 @@ static zmq::ctx_t *inner_init (int io_threads_)
void
*
zmq_init
(
int
io_threads_
)
{
return
(
void
*
)
inner
_init
(
io_threads_
);
return
(
void
*
)
s
_init
(
io_threads_
);
}
void
*
zmq_init_thread_safe
(
int
io_threads_
)
{
zmq
::
ctx_t
*
ctx
=
inner
_init
(
io_threads_
);
zmq
::
ctx_t
*
ctx
=
s
_init
(
io_threads_
);
ctx
->
set_thread_safe
();
return
(
void
*
)
ctx
;
return
(
void
*
)
ctx
;
}
int
zmq_term
(
void
*
ctx_
)
...
...
@@ -186,7 +185,6 @@ int zmq_term (void *ctx_)
errno
=
EFAULT
;
return
-
1
;
}
int
rc
=
((
zmq
::
ctx_t
*
)
ctx_
)
->
terminate
();
int
en
=
errno
;
...
...
@@ -217,7 +215,7 @@ void *zmq_socket (void *ctx_, int type_)
zmq
::
ctx_t
*
ctx
=
(
zmq
::
ctx_t
*
)
ctx_
;
zmq
::
socket_base_t
*
s
=
ctx
->
create_socket
(
type_
);
if
(
ctx
->
get_thread_safe
())
s
->
set_thread_safe
();
return
(
void
*
)
s
;
return
(
void
*
)
s
;
}
int
zmq_close
(
void
*
s_
)
...
...
@@ -238,9 +236,11 @@ int zmq_setsockopt (void *s_, int option_, const void *optval_,
return
-
1
;
}
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
if
(
s
->
thread_safe
())
s
->
lock
();
if
(
s
->
thread_safe
())
s
->
lock
();
int
result
=
s
->
setsockopt
(
option_
,
optval_
,
optvallen_
);
if
(
s
->
thread_safe
())
s
->
unlock
();
if
(
s
->
thread_safe
())
s
->
unlock
();
return
result
;
}
...
...
@@ -251,9 +251,11 @@ int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
return
-
1
;
}
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
if
(
s
->
thread_safe
())
s
->
lock
();
if
(
s
->
thread_safe
())
s
->
lock
();
int
result
=
s
->
getsockopt
(
option_
,
optval_
,
optvallen_
);
if
(
s
->
thread_safe
())
s
->
unlock
();
if
(
s
->
thread_safe
())
s
->
unlock
();
return
result
;
}
...
...
@@ -264,9 +266,11 @@ int zmq_bind (void *s_, const char *addr_)
return
-
1
;
}
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
if
(
s
->
thread_safe
())
s
->
lock
();
if
(
s
->
thread_safe
())
s
->
lock
();
int
result
=
s
->
bind
(
addr_
);
if
(
s
->
thread_safe
())
s
->
unlock
();
if
(
s
->
thread_safe
())
s
->
unlock
();
return
result
;
}
...
...
@@ -277,15 +281,18 @@ int zmq_connect (void *s_, const char *addr_)
return
-
1
;
}
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
if
(
s
->
thread_safe
())
s
->
lock
();
if
(
s
->
thread_safe
())
s
->
lock
();
int
result
=
s
->
connect
(
addr_
);
if
(
s
->
thread_safe
())
s
->
unlock
();
if
(
s
->
thread_safe
())
s
->
unlock
();
return
result
;
}
// Sending functions.
static
int
inner_sendmsg
(
zmq
::
socket_base_t
*
s_
,
zmq_msg_t
*
msg_
,
int
flags_
)
static
int
s_sendmsg
(
zmq
::
socket_base_t
*
s_
,
zmq_msg_t
*
msg_
,
int
flags_
)
{
int
sz
=
(
int
)
zmq_msg_size
(
msg_
);
int
rc
=
s_
->
send
((
zmq
::
msg_t
*
)
msg_
,
flags_
);
...
...
@@ -313,9 +320,11 @@ int zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
memcpy
(
zmq_msg_data
(
&
msg
),
buf_
,
len_
);
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
if
(
s
->
thread_safe
())
s
->
lock
();
rc
=
inner_sendmsg
(
s
,
&
msg
,
flags_
);
if
(
s
->
thread_safe
())
s
->
unlock
();
if
(
s
->
thread_safe
())
s
->
lock
();
rc
=
s_sendmsg
(
s
,
&
msg
,
flags_
);
if
(
s
->
thread_safe
())
s
->
unlock
();
if
(
unlikely
(
rc
<
0
))
{
int
err
=
errno
;
int
rc2
=
zmq_msg_close
(
&
msg
);
...
...
@@ -335,7 +344,7 @@ int zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
// a single multi-part message, i.e. the last message has
// ZMQ_SNDMORE bit switched off.
//
int
zmq_sendv
(
void
*
s_
,
iovec
*
a_
,
size_t
count_
,
int
flags_
)
int
zmq_send
io
v
(
void
*
s_
,
iovec
*
a_
,
size_t
count_
,
int
flags_
)
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
errno
=
ENOTSOCK
;
...
...
@@ -344,19 +353,19 @@ int zmq_sendv (void *s_, iovec *a_, size_t count_, int flags_)
int
rc
=
0
;
zmq_msg_t
msg
;
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
if
(
s
->
thread_safe
())
s
->
lock
();
for
(
size_t
i
=
0
;
i
<
count_
;
++
i
)
{
if
(
s
->
thread_safe
())
s
->
lock
();
for
(
size_t
i
=
0
;
i
<
count_
;
++
i
)
{
rc
=
zmq_msg_init_size
(
&
msg
,
a_
[
i
].
iov_len
);
if
(
rc
!=
0
)
{
if
(
rc
!=
0
)
{
rc
=
-
1
;
break
;
}
memcpy
(
zmq_msg_data
(
&
msg
),
a_
[
i
].
iov_base
,
a_
[
i
].
iov_len
);
if
(
i
==
count_
-
1
)
flags_
=
flags_
&
~
ZMQ_SNDMORE
;
rc
=
inner
_sendmsg
(
s
,
&
msg
,
flags_
);
if
(
i
==
count_
-
1
)
flags_
=
flags_
&
~
ZMQ_SNDMORE
;
rc
=
s
_sendmsg
(
s
,
&
msg
,
flags_
);
if
(
unlikely
(
rc
<
0
))
{
int
err
=
errno
;
int
rc2
=
zmq_msg_close
(
&
msg
);
...
...
@@ -366,13 +375,15 @@ int zmq_sendv (void *s_, iovec *a_, size_t count_, int flags_)
break
;
}
}
if
(
s
->
thread_safe
())
s
->
unlock
();
if
(
s
->
thread_safe
())
s
->
unlock
();
return
rc
;
}
// Receiving functions.
static
int
inner_recvmsg
(
zmq
::
socket_base_t
*
s_
,
zmq_msg_t
*
msg_
,
int
flags_
)
static
int
s_recvmsg
(
zmq
::
socket_base_t
*
s_
,
zmq_msg_t
*
msg_
,
int
flags_
)
{
int
rc
=
s_
->
recv
((
zmq
::
msg_t
*
)
msg_
,
flags_
);
if
(
unlikely
(
rc
<
0
))
...
...
@@ -398,9 +409,11 @@ int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
errno_assert
(
rc
==
0
);
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
if
(
s
->
thread_safe
())
s
->
lock
();
int
nbytes
=
inner_recvmsg
(
s
,
&
msg
,
flags_
);
if
(
s
->
thread_safe
())
s
->
unlock
();
if
(
s
->
thread_safe
())
s
->
lock
();
int
nbytes
=
s_recvmsg
(
s
,
&
msg
,
flags_
);
if
(
s
->
thread_safe
())
s
->
unlock
();
if
(
unlikely
(
nbytes
<
0
))
{
int
err
=
errno
;
rc
=
zmq_msg_close
(
&
msg
);
...
...
@@ -440,28 +453,28 @@ int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
// We assume it is safe to steal these buffers by simply
// not closing the zmq::msg_t.
//
int
zmq_recv
mmsg
(
void
*
s_
,
iovec
*
a_
,
size_t
*
count_
,
int
flags_
)
int
zmq_recv
iov
(
void
*
s_
,
iovec
*
a_
,
size_t
*
count_
,
int
flags_
)
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
errno
=
ENOTSOCK
;
return
-
1
;
}
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
if
(
s
->
thread_safe
())
s
->
lock
();
if
(
s
->
thread_safe
())
s
->
lock
();
size_t
count
=
(
int
)
*
count_
;
size_t
count
=
(
int
)
*
count_
;
int
nread
=
0
;
bool
recvmore
=
true
;
for
(
size_t
i
=
0
;
recvmore
&&
i
<
count
;
++
i
)
{
for
(
size_t
i
=
0
;
recvmore
&&
i
<
count
;
++
i
)
{
// Cheat! We never close any msg
// because we want to steal the buffer.
zmq_msg_t
msg
;
int
rc
=
zmq_msg_init
(
&
msg
);
errno_assert
(
rc
==
0
);
int
nbytes
=
inner
_recvmsg
(
s
,
&
msg
,
flags_
);
int
nbytes
=
s
_recvmsg
(
s
,
&
msg
,
flags_
);
if
(
unlikely
(
nbytes
<
0
))
{
int
err
=
errno
;
rc
=
zmq_msg_close
(
&
msg
);
...
...
@@ -478,9 +491,10 @@ int zmq_recvmmsg (void *s_, iovec *a_, size_t *count_, int flags_)
a_
[
i
].
iov_len
=
zmq_msg_size
(
&
msg
);
// Assume zmq_socket ZMQ_RVCMORE is properly set.
recvmore
=
((
zmq
::
msg_t
*
)
(
void
*
)
&
msg
)
->
flags
()
&
zmq
::
msg_t
::
more
;
recvmore
=
((
zmq
::
msg_t
*
)
(
void
*
)
&
msg
)
->
flags
()
&
zmq
::
msg_t
::
more
;
}
if
(
s
->
thread_safe
())
s
->
unlock
();
if
(
s
->
thread_safe
())
s
->
unlock
();
return
nread
;
}
...
...
@@ -957,4 +971,3 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
#if defined ZMQ_POLL_BASED_ON_POLL
#undef ZMQ_POLL_BASED_ON_POLL
#endif
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment