Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
99f71444
Commit
99f71444
authored
Nov 09, 2012
by
Pieter Hintjens
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #467 from hurtonm/code_cleanup
Code cleanup
parents
793895c4
66dc4d8b
Hide whitespace changes
Inline
Side-by-side
Showing
28 changed files
with
71 additions
and
98 deletions
+71
-98
dealer.cpp
src/dealer.cpp
+3
-6
dealer.hpp
src/dealer.hpp
+2
-2
dist.cpp
src/dist.cpp
+5
-8
dist.hpp
src/dist.hpp
+3
-3
lb.cpp
src/lb.cpp
+1
-4
lb.hpp
src/lb.hpp
+1
-1
pair.cpp
src/pair.cpp
+3
-6
pair.hpp
src/pair.hpp
+2
-2
pub.cpp
src/pub.cpp
+1
-1
pub.hpp
src/pub.hpp
+1
-1
pull.cpp
src/pull.cpp
+1
-4
pull.hpp
src/pull.hpp
+1
-1
push.cpp
src/push.cpp
+2
-2
push.hpp
src/push.hpp
+1
-1
rep.cpp
src/rep.cpp
+6
-6
rep.hpp
src/rep.hpp
+2
-2
req.cpp
src/req.cpp
+7
-7
req.hpp
src/req.hpp
+2
-2
router.cpp
src/router.cpp
+2
-8
router.hpp
src/router.hpp
+2
-2
socket_base.cpp
src/socket_base.cpp
+7
-7
socket_base.hpp
src/socket_base.hpp
+2
-2
sub.cpp
src/sub.cpp
+2
-2
sub.hpp
src/sub.hpp
+1
-1
xpub.cpp
src/xpub.cpp
+3
-6
xpub.hpp
src/xpub.hpp
+2
-2
xsub.cpp
src/xsub.cpp
+4
-7
xsub.hpp
src/xsub.hpp
+2
-2
No files found.
src/dealer.cpp
View file @
99f71444
...
...
@@ -49,16 +49,13 @@ void zmq::dealer_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
lb
.
attach
(
pipe_
);
}
int
zmq
::
dealer_t
::
xsend
(
msg_t
*
msg_
,
int
flags_
)
int
zmq
::
dealer_t
::
xsend
(
msg_t
*
msg_
)
{
return
lb
.
send
(
msg_
,
flags_
);
return
lb
.
send
(
msg_
);
}
int
zmq
::
dealer_t
::
xrecv
(
msg_t
*
msg_
,
int
flags_
)
int
zmq
::
dealer_t
::
xrecv
(
msg_t
*
msg_
)
{
// flags_ is unused
(
void
)
flags_
;
return
fq
.
recv
(
msg_
);
}
...
...
src/dealer.hpp
View file @
99f71444
...
...
@@ -47,8 +47,8 @@ namespace zmq
// Overloads of functions from socket_base_t.
void
xattach_pipe
(
zmq
::
pipe_t
*
pipe_
,
bool
icanhasall_
);
int
xsend
(
zmq
::
msg_t
*
msg_
,
int
flags_
);
int
xrecv
(
zmq
::
msg_t
*
msg_
,
int
flags_
);
int
xsend
(
zmq
::
msg_t
*
msg_
);
int
xrecv
(
zmq
::
msg_t
*
msg_
);
bool
xhas_in
();
bool
xhas_out
();
void
xread_activated
(
zmq
::
pipe_t
*
pipe_
);
...
...
src/dist.cpp
View file @
99f71444
...
...
@@ -103,19 +103,19 @@ void zmq::dist_t::activated (pipe_t *pipe_)
}
}
int
zmq
::
dist_t
::
send_to_all
(
msg_t
*
msg_
,
int
flags_
)
int
zmq
::
dist_t
::
send_to_all
(
msg_t
*
msg_
)
{
matching
=
active
;
return
send_to_matching
(
msg_
,
flags_
);
return
send_to_matching
(
msg_
);
}
int
zmq
::
dist_t
::
send_to_matching
(
msg_t
*
msg_
,
int
flags_
)
int
zmq
::
dist_t
::
send_to_matching
(
msg_t
*
msg_
)
{
// Is this end of a multipart message?
bool
msg_more
=
msg_
->
flags
()
&
msg_t
::
more
?
true
:
false
;
// Push the message to matching pipes.
distribute
(
msg_
,
flags_
);
distribute
(
msg_
);
// If mutlipart message is fully sent, activate all the eligible pipes.
if
(
!
msg_more
)
...
...
@@ -126,11 +126,8 @@ int zmq::dist_t::send_to_matching (msg_t *msg_, int flags_)
return
0
;
}
void
zmq
::
dist_t
::
distribute
(
msg_t
*
msg_
,
int
flags_
)
void
zmq
::
dist_t
::
distribute
(
msg_t
*
msg_
)
{
// flags_ is unused
(
void
)
flags_
;
// If there are no matching pipes available, simply drop the message.
if
(
matching
==
0
)
{
int
rc
=
msg_
->
close
();
...
...
src/dist.hpp
View file @
99f71444
...
...
@@ -58,10 +58,10 @@ namespace zmq
void
terminated
(
zmq
::
pipe_t
*
pipe_
);
// Send the message to the matching outbound pipes.
int
send_to_matching
(
zmq
::
msg_t
*
msg_
,
int
flags_
);
int
send_to_matching
(
zmq
::
msg_t
*
msg_
);
// Send the message to all the outbound pipes.
int
send_to_all
(
zmq
::
msg_t
*
msg_
,
int
flags_
);
int
send_to_all
(
zmq
::
msg_t
*
msg_
);
bool
has_out
();
...
...
@@ -72,7 +72,7 @@ namespace zmq
bool
write
(
zmq
::
pipe_t
*
pipe_
,
zmq
::
msg_t
*
msg_
);
// Put the message to all active pipes.
void
distribute
(
zmq
::
msg_t
*
msg_
,
int
flags_
);
void
distribute
(
zmq
::
msg_t
*
msg_
);
// List of outbound pipes.
typedef
array_t
<
zmq
::
pipe_t
,
2
>
pipes_t
;
...
...
src/lb.cpp
View file @
99f71444
...
...
@@ -71,11 +71,8 @@ void zmq::lb_t::activated (pipe_t *pipe_)
active
++
;
}
int
zmq
::
lb_t
::
send
(
msg_t
*
msg_
,
int
flags_
)
int
zmq
::
lb_t
::
send
(
msg_t
*
msg_
)
{
// flags_ is unused
(
void
)
flags_
;
// Drop the message if required. If we are at the end of the message
// switch back to non-dropping mode.
if
(
dropping
)
{
...
...
src/lb.hpp
View file @
99f71444
...
...
@@ -42,7 +42,7 @@ namespace zmq
void
activated
(
pipe_t
*
pipe_
);
void
terminated
(
pipe_t
*
pipe_
);
int
send
(
msg_t
*
msg_
,
int
flags_
);
int
send
(
msg_t
*
msg_
);
bool
has_out
();
private
:
...
...
src/pair.cpp
View file @
99f71444
...
...
@@ -69,14 +69,14 @@ void zmq::pair_t::xwrite_activated (pipe_t *)
// There's nothing to do here.
}
int
zmq
::
pair_t
::
xsend
(
msg_t
*
msg_
,
int
flags_
)
int
zmq
::
pair_t
::
xsend
(
msg_t
*
msg_
)
{
if
(
!
pipe
||
!
pipe
->
write
(
msg_
))
{
errno
=
EAGAIN
;
return
-
1
;
}
if
(
!
(
flags_
&
ZMQ_SNDMORE
))
if
(
!
(
msg_
->
flags
()
&
msg_t
::
more
))
pipe
->
flush
();
// Detach the original message from the data buffer.
...
...
@@ -86,11 +86,8 @@ int zmq::pair_t::xsend (msg_t *msg_, int flags_)
return
0
;
}
int
zmq
::
pair_t
::
xrecv
(
msg_t
*
msg_
,
int
flags_
)
int
zmq
::
pair_t
::
xrecv
(
msg_t
*
msg_
)
{
// flags_ is unused
(
void
)
flags_
;
// Deallocate old content of the message.
int
rc
=
msg_
->
close
();
errno_assert
(
rc
==
0
);
...
...
src/pair.hpp
View file @
99f71444
...
...
@@ -43,8 +43,8 @@ namespace zmq
// Overloads of functions from socket_base_t.
void
xattach_pipe
(
zmq
::
pipe_t
*
pipe_
,
bool
icanhasall_
);
int
xsend
(
zmq
::
msg_t
*
msg_
,
int
flags_
);
int
xrecv
(
zmq
::
msg_t
*
msg_
,
int
flags_
);
int
xsend
(
zmq
::
msg_t
*
msg_
);
int
xrecv
(
zmq
::
msg_t
*
msg_
);
bool
xhas_in
();
bool
xhas_out
();
void
xread_activated
(
zmq
::
pipe_t
*
pipe_
);
...
...
src/pub.cpp
View file @
99f71444
...
...
@@ -32,7 +32,7 @@ zmq::pub_t::~pub_t ()
{
}
int
zmq
::
pub_t
::
xrecv
(
class
msg_t
*
,
int
)
int
zmq
::
pub_t
::
xrecv
(
class
msg_t
*
)
{
// Messages cannot be received from PUB socket.
errno
=
ENOTSUP
;
...
...
src/pub.hpp
View file @
99f71444
...
...
@@ -40,7 +40,7 @@ namespace zmq
~
pub_t
();
// Implementations of virtual functions from socket_base_t.
int
xrecv
(
zmq
::
msg_t
*
msg_
,
int
flags_
);
int
xrecv
(
zmq
::
msg_t
*
msg_
);
bool
xhas_in
();
private
:
...
...
src/pull.cpp
View file @
99f71444
...
...
@@ -53,11 +53,8 @@ void zmq::pull_t::xterminated (pipe_t *pipe_)
fq
.
terminated
(
pipe_
);
}
int
zmq
::
pull_t
::
xrecv
(
msg_t
*
msg_
,
int
flags_
)
int
zmq
::
pull_t
::
xrecv
(
msg_t
*
msg_
)
{
// flags_ is unused
(
void
)
flags_
;
return
fq
.
recv
(
msg_
);
}
...
...
src/pull.hpp
View file @
99f71444
...
...
@@ -46,7 +46,7 @@ namespace zmq
// Overloads of functions from socket_base_t.
void
xattach_pipe
(
zmq
::
pipe_t
*
pipe_
,
bool
icanhasall_
);
int
xrecv
(
zmq
::
msg_t
*
msg_
,
int
flags_
);
int
xrecv
(
zmq
::
msg_t
*
msg_
);
bool
xhas_in
();
void
xread_activated
(
zmq
::
pipe_t
*
pipe_
);
void
xterminated
(
zmq
::
pipe_t
*
pipe_
);
...
...
src/push.cpp
View file @
99f71444
...
...
@@ -53,9 +53,9 @@ void zmq::push_t::xterminated (pipe_t *pipe_)
lb
.
terminated
(
pipe_
);
}
int
zmq
::
push_t
::
xsend
(
msg_t
*
msg_
,
int
flags_
)
int
zmq
::
push_t
::
xsend
(
msg_t
*
msg_
)
{
return
lb
.
send
(
msg_
,
flags_
);
return
lb
.
send
(
msg_
);
}
bool
zmq
::
push_t
::
xhas_out
()
...
...
src/push.hpp
View file @
99f71444
...
...
@@ -46,7 +46,7 @@ namespace zmq
// Overloads of functions from socket_base_t.
void
xattach_pipe
(
zmq
::
pipe_t
*
pipe_
,
bool
icanhasall_
);
int
xsend
(
zmq
::
msg_t
*
msg_
,
int
flags_
);
int
xsend
(
zmq
::
msg_t
*
msg_
);
bool
xhas_out
();
void
xwrite_activated
(
zmq
::
pipe_t
*
pipe_
);
void
xterminated
(
zmq
::
pipe_t
*
pipe_
);
...
...
src/rep.cpp
View file @
99f71444
...
...
@@ -35,7 +35,7 @@ zmq::rep_t::~rep_t ()
{
}
int
zmq
::
rep_t
::
xsend
(
msg_t
*
msg_
,
int
flags_
)
int
zmq
::
rep_t
::
xsend
(
msg_t
*
msg_
)
{
// If we are in the middle of receiving a request, we cannot send reply.
if
(
!
sending_reply
)
{
...
...
@@ -46,7 +46,7 @@ int zmq::rep_t::xsend (msg_t *msg_, int flags_)
bool
more
=
msg_
->
flags
()
&
msg_t
::
more
?
true
:
false
;
// Push message to the reply pipe.
int
rc
=
router_t
::
xsend
(
msg_
,
flags_
);
int
rc
=
router_t
::
xsend
(
msg_
);
if
(
rc
!=
0
)
return
rc
;
...
...
@@ -57,7 +57,7 @@ int zmq::rep_t::xsend (msg_t *msg_, int flags_)
return
0
;
}
int
zmq
::
rep_t
::
xrecv
(
msg_t
*
msg_
,
int
flags_
)
int
zmq
::
rep_t
::
xrecv
(
msg_t
*
msg_
)
{
// If we are in middle of sending a reply, we cannot receive next request.
if
(
sending_reply
)
{
...
...
@@ -69,7 +69,7 @@ int zmq::rep_t::xrecv (msg_t *msg_, int flags_)
// to the reply pipe.
if
(
request_begins
)
{
while
(
true
)
{
int
rc
=
router_t
::
xrecv
(
msg_
,
flags_
);
int
rc
=
router_t
::
xrecv
(
msg_
);
if
(
rc
!=
0
)
return
rc
;
...
...
@@ -78,7 +78,7 @@ int zmq::rep_t::xrecv (msg_t *msg_, int flags_)
bool
bottom
=
(
msg_
->
size
()
==
0
);
// Push it to the reply pipe.
rc
=
router_t
::
xsend
(
msg_
,
flags_
);
rc
=
router_t
::
xsend
(
msg_
);
errno_assert
(
rc
==
0
);
if
(
bottom
)
...
...
@@ -95,7 +95,7 @@ int zmq::rep_t::xrecv (msg_t *msg_, int flags_)
}
// Get next message part to return to the user.
int
rc
=
router_t
::
xrecv
(
msg_
,
flags_
);
int
rc
=
router_t
::
xrecv
(
msg_
);
if
(
rc
!=
0
)
return
rc
;
...
...
src/rep.hpp
View file @
99f71444
...
...
@@ -40,8 +40,8 @@ namespace zmq
~
rep_t
();
// Overloads of functions from socket_base_t.
int
xsend
(
zmq
::
msg_t
*
msg_
,
int
flags_
);
int
xrecv
(
zmq
::
msg_t
*
msg_
,
int
flags_
);
int
xsend
(
zmq
::
msg_t
*
msg_
);
int
xrecv
(
zmq
::
msg_t
*
msg_
);
bool
xhas_in
();
bool
xhas_out
();
...
...
src/req.cpp
View file @
99f71444
...
...
@@ -39,7 +39,7 @@ zmq::req_t::~req_t ()
{
}
int
zmq
::
req_t
::
xsend
(
msg_t
*
msg_
,
int
flags_
)
int
zmq
::
req_t
::
xsend
(
msg_t
*
msg_
)
{
// If we've sent a request and we still haven't got the reply,
// we can't send another request.
...
...
@@ -54,7 +54,7 @@ int zmq::req_t::xsend (msg_t *msg_, int flags_)
int
rc
=
bottom
.
init
();
errno_assert
(
rc
==
0
);
bottom
.
set_flags
(
msg_t
::
more
);
rc
=
dealer_t
::
xsend
(
&
bottom
,
0
);
rc
=
dealer_t
::
xsend
(
&
bottom
);
if
(
rc
!=
0
)
return
-
1
;
message_begins
=
false
;
...
...
@@ -62,7 +62,7 @@ int zmq::req_t::xsend (msg_t *msg_, int flags_)
bool
more
=
msg_
->
flags
()
&
msg_t
::
more
?
true
:
false
;
int
rc
=
dealer_t
::
xsend
(
msg_
,
flags_
);
int
rc
=
dealer_t
::
xsend
(
msg_
);
if
(
rc
!=
0
)
return
rc
;
...
...
@@ -75,7 +75,7 @@ int zmq::req_t::xsend (msg_t *msg_, int flags_)
return
0
;
}
int
zmq
::
req_t
::
xrecv
(
msg_t
*
msg_
,
int
flags_
)
int
zmq
::
req_t
::
xrecv
(
msg_t
*
msg_
)
{
// If request wasn't send, we can't wait for reply.
if
(
!
receiving_reply
)
{
...
...
@@ -85,14 +85,14 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)
// First part of the reply should be the original request ID.
if
(
message_begins
)
{
int
rc
=
dealer_t
::
xrecv
(
msg_
,
flags_
);
int
rc
=
dealer_t
::
xrecv
(
msg_
);
if
(
rc
!=
0
)
return
rc
;
// TODO: This should also close the connection with the peer!
if
(
unlikely
(
!
(
msg_
->
flags
()
&
msg_t
::
more
)
||
msg_
->
size
()
!=
0
))
{
while
(
true
)
{
int
rc
=
dealer_t
::
xrecv
(
msg_
,
flags_
);
int
rc
=
dealer_t
::
xrecv
(
msg_
);
errno_assert
(
rc
==
0
);
if
(
!
(
msg_
->
flags
()
&
msg_t
::
more
))
break
;
...
...
@@ -106,7 +106,7 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)
message_begins
=
false
;
}
int
rc
=
dealer_t
::
xrecv
(
msg_
,
flags_
);
int
rc
=
dealer_t
::
xrecv
(
msg_
);
if
(
rc
!=
0
)
return
rc
;
...
...
src/req.hpp
View file @
99f71444
...
...
@@ -42,8 +42,8 @@ namespace zmq
~
req_t
();
// Overloads of functions from socket_base_t.
int
xsend
(
zmq
::
msg_t
*
msg_
,
int
flags_
);
int
xrecv
(
zmq
::
msg_t
*
msg_
,
int
flags_
);
int
xsend
(
zmq
::
msg_t
*
msg_
);
int
xrecv
(
zmq
::
msg_t
*
msg_
);
bool
xhas_in
();
bool
xhas_out
();
...
...
src/router.cpp
View file @
99f71444
...
...
@@ -140,11 +140,8 @@ void zmq::router_t::xwrite_activated (pipe_t *pipe_)
it
->
second
.
active
=
true
;
}
int
zmq
::
router_t
::
xsend
(
msg_t
*
msg_
,
int
flags_
)
int
zmq
::
router_t
::
xsend
(
msg_t
*
msg_
)
{
// flags_ is unused
(
void
)
flags_
;
// If this is the first part of the message it's the ID of the
// peer to send the message to.
if
(
!
more_out
)
{
...
...
@@ -227,11 +224,8 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_)
return
0
;
}
int
zmq
::
router_t
::
xrecv
(
msg_t
*
msg_
,
int
flags_
)
int
zmq
::
router_t
::
xrecv
(
msg_t
*
msg_
)
{
// flags_ is unused
(
void
)
flags_
;
if
(
prefetched
)
{
if
(
!
identity_sent
)
{
int
rc
=
msg_
->
move
(
prefetched_id
);
...
...
src/router.hpp
View file @
99f71444
...
...
@@ -50,8 +50,8 @@ namespace zmq
// Overloads of functions from socket_base_t.
void
xattach_pipe
(
zmq
::
pipe_t
*
pipe_
,
bool
icanhasall_
);
int
xsetsockopt
(
int
option_
,
const
void
*
optval_
,
size_t
optvallen_
);
int
xsend
(
msg_t
*
msg_
,
int
flags_
);
int
xrecv
(
msg_t
*
msg_
,
int
flags_
);
int
xsend
(
msg_t
*
msg_
);
int
xrecv
(
msg_t
*
msg_
);
bool
xhas_in
();
bool
xhas_out
();
void
xread_activated
(
zmq
::
pipe_t
*
pipe_
);
...
...
src/socket_base.cpp
View file @
99f71444
...
...
@@ -622,7 +622,7 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
msg_
->
set_flags
(
msg_t
::
more
);
// Try to send the message.
rc
=
xsend
(
msg_
,
flags_
);
rc
=
xsend
(
msg_
);
if
(
rc
==
0
)
return
0
;
if
(
unlikely
(
errno
!=
EAGAIN
))
...
...
@@ -644,7 +644,7 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
while
(
true
)
{
if
(
unlikely
(
process_commands
(
timeout
,
false
)
!=
0
))
return
-
1
;
rc
=
xsend
(
msg_
,
flags_
);
rc
=
xsend
(
msg_
);
if
(
rc
==
0
)
break
;
if
(
unlikely
(
errno
!=
EAGAIN
))
...
...
@@ -689,7 +689,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
}
// Get the message.
int
rc
=
xrecv
(
msg_
,
flags_
);
int
rc
=
xrecv
(
msg_
);
if
(
unlikely
(
rc
!=
0
&&
errno
!=
EAGAIN
))
return
-
1
;
...
...
@@ -708,7 +708,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
return
-
1
;
ticks
=
0
;
rc
=
xrecv
(
msg_
,
flags_
);
rc
=
xrecv
(
msg_
);
if
(
rc
<
0
)
return
rc
;
extract_flags
(
msg_
);
...
...
@@ -726,7 +726,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
while
(
true
)
{
if
(
unlikely
(
process_commands
(
block
?
timeout
:
0
,
false
)
!=
0
))
return
-
1
;
rc
=
xrecv
(
msg_
,
flags_
);
rc
=
xrecv
(
msg_
);
if
(
rc
==
0
)
{
ticks
=
0
;
break
;
...
...
@@ -886,7 +886,7 @@ bool zmq::socket_base_t::xhas_out ()
return
false
;
}
int
zmq
::
socket_base_t
::
xsend
(
msg_t
*
,
int
)
int
zmq
::
socket_base_t
::
xsend
(
msg_t
*
)
{
errno
=
ENOTSUP
;
return
-
1
;
...
...
@@ -897,7 +897,7 @@ bool zmq::socket_base_t::xhas_in ()
return
false
;
}
int
zmq
::
socket_base_t
::
xrecv
(
msg_t
*
,
int
)
int
zmq
::
socket_base_t
::
xrecv
(
msg_t
*
)
{
errno
=
ENOTSUP
;
return
-
1
;
...
...
src/socket_base.hpp
View file @
99f71444
...
...
@@ -133,11 +133,11 @@ namespace zmq
// The default implementation assumes that send is not supported.
virtual
bool
xhas_out
();
virtual
int
xsend
(
zmq
::
msg_t
*
msg_
,
int
flags_
);
virtual
int
xsend
(
zmq
::
msg_t
*
msg_
);
// The default implementation assumes that recv in not supported.
virtual
bool
xhas_in
();
virtual
int
xrecv
(
zmq
::
msg_t
*
msg_
,
int
flags_
);
virtual
int
xrecv
(
zmq
::
msg_t
*
msg_
);
// i_pipe_events will be forwarded to these functions.
virtual
void
xread_activated
(
pipe_t
*
pipe_
);
...
...
src/sub.cpp
View file @
99f71444
...
...
@@ -58,7 +58,7 @@ int zmq::sub_t::xsetsockopt (int option_, const void *optval_,
// Pass it further on in the stack.
int
err
=
0
;
rc
=
xsub_t
::
xsend
(
&
msg
,
0
);
rc
=
xsub_t
::
xsend
(
&
msg
);
if
(
rc
!=
0
)
err
=
errno
;
int
rc2
=
msg
.
close
();
...
...
@@ -68,7 +68,7 @@ int zmq::sub_t::xsetsockopt (int option_, const void *optval_,
return
rc
;
}
int
zmq
::
sub_t
::
xsend
(
msg_t
*
,
int
)
int
zmq
::
sub_t
::
xsend
(
msg_t
*
)
{
// Overload the XSUB's send.
errno
=
ENOTSUP
;
...
...
src/sub.hpp
View file @
99f71444
...
...
@@ -42,7 +42,7 @@ namespace zmq
protected
:
int
xsetsockopt
(
int
option_
,
const
void
*
optval_
,
size_t
optvallen_
);
int
xsend
(
zmq
::
msg_t
*
msg_
,
int
flags_
);
int
xsend
(
zmq
::
msg_t
*
msg_
);
bool
xhas_out
();
private
:
...
...
src/xpub.cpp
View file @
99f71444
...
...
@@ -115,7 +115,7 @@ void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_)
self
->
dist
.
match
(
pipe_
);
}
int
zmq
::
xpub_t
::
xsend
(
msg_t
*
msg_
,
int
flags_
)
int
zmq
::
xpub_t
::
xsend
(
msg_t
*
msg_
)
{
bool
msg_more
=
msg_
->
flags
()
&
msg_t
::
more
?
true
:
false
;
...
...
@@ -126,7 +126,7 @@ int zmq::xpub_t::xsend (msg_t *msg_, int flags_)
// Send the message to all the pipes that were marked as matching
// in the previous step.
int
rc
=
dist
.
send_to_matching
(
msg_
,
flags_
);
int
rc
=
dist
.
send_to_matching
(
msg_
);
if
(
rc
!=
0
)
return
rc
;
...
...
@@ -145,11 +145,8 @@ bool zmq::xpub_t::xhas_out ()
return
dist
.
has_out
();
}
int
zmq
::
xpub_t
::
xrecv
(
msg_t
*
msg_
,
int
flags_
)
int
zmq
::
xpub_t
::
xrecv
(
msg_t
*
msg_
)
{
// flags_ is unused
(
void
)
flags_
;
// If there is at least one
if
(
pending
.
empty
())
{
errno
=
EAGAIN
;
...
...
src/xpub.hpp
View file @
99f71444
...
...
@@ -48,9 +48,9 @@ namespace zmq
// Implementations of virtual functions from socket_base_t.
void
xattach_pipe
(
zmq
::
pipe_t
*
pipe_
,
bool
icanhasall_
=
false
);
int
xsend
(
zmq
::
msg_t
*
msg_
,
int
flags_
);
int
xsend
(
zmq
::
msg_t
*
msg_
);
bool
xhas_out
();
int
xrecv
(
zmq
::
msg_t
*
msg_
,
int
flags_
);
int
xrecv
(
zmq
::
msg_t
*
msg_
);
bool
xhas_in
();
void
xread_activated
(
zmq
::
pipe_t
*
pipe_
);
void
xwrite_activated
(
zmq
::
pipe_t
*
pipe_
);
...
...
src/xsub.cpp
View file @
99f71444
...
...
@@ -82,7 +82,7 @@ void zmq::xsub_t::xhiccuped (pipe_t *pipe_)
pipe_
->
flush
();
}
int
zmq
::
xsub_t
::
xsend
(
msg_t
*
msg_
,
int
flags_
)
int
zmq
::
xsub_t
::
xsend
(
msg_t
*
msg_
)
{
size_t
size
=
msg_
->
size
();
unsigned
char
*
data
=
(
unsigned
char
*
)
msg_
->
data
();
...
...
@@ -100,11 +100,11 @@ int zmq::xsub_t::xsend (msg_t *msg_, int flags_)
// doing it here as well breaks ZMQ_XPUB_VERBOSE
// when there are forwarding devices involved
subscriptions
.
add
(
data
+
1
,
size
-
1
);
return
dist
.
send_to_all
(
msg_
,
flags_
);
return
dist
.
send_to_all
(
msg_
);
}
else
{
if
(
subscriptions
.
rm
(
data
+
1
,
size
-
1
))
return
dist
.
send_to_all
(
msg_
,
flags_
);
return
dist
.
send_to_all
(
msg_
);
}
int
rc
=
msg_
->
close
();
...
...
@@ -121,11 +121,8 @@ bool zmq::xsub_t::xhas_out ()
return
true
;
}
int
zmq
::
xsub_t
::
xrecv
(
msg_t
*
msg_
,
int
flags_
)
int
zmq
::
xsub_t
::
xrecv
(
msg_t
*
msg_
)
{
// flags_ is unused
(
void
)
flags_
;
// If there's already a message prepared by a previous call to zmq_poll,
// return it straight ahead.
if
(
has_message
)
{
...
...
src/xsub.hpp
View file @
99f71444
...
...
@@ -46,9 +46,9 @@ namespace zmq
// Overloads of functions from socket_base_t.
void
xattach_pipe
(
zmq
::
pipe_t
*
pipe_
,
bool
icanhasall_
);
int
xsend
(
zmq
::
msg_t
*
msg_
,
int
flags_
);
int
xsend
(
zmq
::
msg_t
*
msg_
);
bool
xhas_out
();
int
xrecv
(
zmq
::
msg_t
*
msg_
,
int
flags_
);
int
xrecv
(
zmq
::
msg_t
*
msg_
);
bool
xhas_in
();
void
xread_activated
(
zmq
::
pipe_t
*
pipe_
);
void
xwrite_activated
(
zmq
::
pipe_t
*
pipe_
);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment