Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
7842c710
Commit
7842c710
authored
Nov 01, 2011
by
Martin Sustrik
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
LABELS and COMMANDs removed
Signed-off-by:
Martin Sustrik
<
sustrik@250bpm.com
>
parent
626099aa
Hide whitespace changes
Inline
Side-by-side
Showing
20 changed files
with
75 additions
and
295 deletions
+75
-295
zmq.h
include/zmq.h
+1
-4
dist.cpp
src/dist.cpp
+3
-3
encoder.cpp
src/encoder.cpp
+3
-2
fq.cpp
src/fq.cpp
+2
-1
lb.cpp
src/lb.cpp
+3
-3
msg.hpp
src/msg.hpp
+3
-4
pipe.cpp
src/pipe.cpp
+4
-3
rep.cpp
src/rep.cpp
+10
-9
req.cpp
src/req.cpp
+15
-31
req.hpp
src/req.hpp
+2
-5
session_base.cpp
src/session_base.cpp
+2
-2
socket_base.cpp
src/socket_base.cpp
+1
-32
socket_base.hpp
src/socket_base.hpp
+1
-6
xpub.cpp
src/xpub.cpp
+2
-2
xrep.cpp
src/xrep.cpp
+6
-5
xsub.cpp
src/xsub.cpp
+5
-5
Makefile.am
tests/Makefile.am
+0
-2
test_invalid_rep.cpp
tests/test_invalid_rep.cpp
+7
-6
test_reqrep_device.cpp
tests/test_reqrep_device.cpp
+5
-26
test_reqrep_drop.cpp
tests/test_reqrep_drop.cpp
+0
-144
No files found.
include/zmq.h
View file @
7842c710
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
...
...
@@ -189,15 +190,11 @@ ZMQ_EXPORT int zmq_term (void *context);
#define ZMQ_MULTICAST_HOPS 25
#define ZMQ_RCVTIMEO 27
#define ZMQ_SNDTIMEO 28
#define ZMQ_RCVLABEL 29
#define ZMQ_RCVCMD 30
#define ZMQ_IPV4ONLY 31
/* Send/recv options. */
#define ZMQ_DONTWAIT 1
#define ZMQ_SNDMORE 2
#define ZMQ_SNDLABEL 4
#define ZMQ_SNDCMD 8
ZMQ_EXPORT
void
*
zmq_socket
(
void
*
context
,
int
type
);
ZMQ_EXPORT
int
zmq_close
(
void
*
s
);
...
...
src/dist.cpp
View file @
7842c710
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
...
...
@@ -112,8 +113,7 @@ int zmq::dist_t::send_to_all (msg_t *msg_, int flags_)
int
zmq
::
dist_t
::
send_to_matching
(
msg_t
*
msg_
,
int
flags_
)
{
// Is this end of a multipart message?
bool
msg_more
=
msg_
->
flags
()
&
(
msg_t
::
more
|
msg_t
::
label
)
?
true
:
false
;
bool
msg_more
=
msg_
->
flags
()
&
msg_t
::
more
?
true
:
false
;
// Push the message to matching pipes.
distribute
(
msg_
,
flags_
);
...
...
@@ -182,7 +182,7 @@ bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_)
eligible
--
;
return
false
;
}
if
(
!
(
msg_
->
flags
()
&
(
msg_t
::
more
|
msg_t
::
label
)
))
if
(
!
(
msg_
->
flags
()
&
msg_t
::
more
))
pipe_
->
flush
();
return
true
;
}
...
...
src/encoder.cpp
View file @
7842c710
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
...
...
@@ -90,14 +91,14 @@ bool zmq::encoder_t::message_ready ()
tmpbuf
[
0
]
=
(
unsigned
char
)
size
;
tmpbuf
[
1
]
=
(
in_progress
.
flags
()
&
~
msg_t
::
shared
);
next_step
(
tmpbuf
,
2
,
&
encoder_t
::
size_ready
,
!
(
in_progress
.
flags
()
&
(
msg_t
::
more
|
msg_t
::
label
)
));
!
(
in_progress
.
flags
()
&
msg_t
::
more
));
}
else
{
tmpbuf
[
0
]
=
0xff
;
put_uint64
(
tmpbuf
+
1
,
size
);
tmpbuf
[
9
]
=
(
in_progress
.
flags
()
&
~
msg_t
::
shared
);
next_step
(
tmpbuf
,
10
,
&
encoder_t
::
size_ready
,
!
(
in_progress
.
flags
()
&
(
msg_t
::
more
|
msg_t
::
label
)
));
!
(
in_progress
.
flags
()
&
msg_t
::
more
));
}
return
true
;
}
src/fq.cpp
View file @
7842c710
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
...
...
@@ -92,7 +93,7 @@ int zmq::fq_t::recvpipe (msg_t *msg_, int flags_, pipe_t **pipe_)
if
(
pipe_
)
*
pipe_
=
pipes
[
current
];
more
=
msg_
->
flags
()
&
(
msg_t
::
more
|
msg_t
::
label
)
?
true
:
false
;
msg_
->
flags
()
&
msg_t
::
more
?
true
:
false
;
if
(
!
more
)
{
current
++
;
if
(
current
>=
active
)
...
...
src/lb.cpp
View file @
7842c710
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
...
...
@@ -76,7 +77,7 @@ int zmq::lb_t::send (msg_t *msg_, int flags_)
// switch back to non-dropping mode.
if
(
dropping
)
{
more
=
msg_
->
flags
()
&
(
msg_t
::
more
|
msg_t
::
label
)
?
true
:
false
;
more
=
msg_
->
flags
()
&
msg_t
::
more
?
true
:
false
;
if
(
!
more
)
dropping
=
false
;
...
...
@@ -89,8 +90,7 @@ int zmq::lb_t::send (msg_t *msg_, int flags_)
while
(
active
>
0
)
{
if
(
pipes
[
current
]
->
write
(
msg_
))
{
more
=
msg_
->
flags
()
&
(
msg_t
::
more
|
msg_t
::
label
)
?
true
:
false
;
more
=
msg_
->
flags
()
&
msg_t
::
more
?
true
:
false
;
break
;
}
...
...
src/msg.hpp
View file @
7842c710
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
...
...
@@ -48,10 +49,8 @@ namespace zmq
// Mesage flags.
enum
{
label
=
1
,
command
=
2
,
shared
=
64
,
more
=
128
more
=
1
,
shared
=
128
};
bool
check
();
...
...
src/pipe.cpp
View file @
7842c710
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
...
...
@@ -137,7 +138,7 @@ bool zmq::pipe_t::read (msg_t *msg_)
return
false
;
}
if
(
!
(
msg_
->
flags
()
&
(
msg_t
::
more
|
msg_t
::
label
)
))
if
(
!
(
msg_
->
flags
()
&
msg_t
::
more
))
msgs_read
++
;
if
(
lwm
>
0
&&
msgs_read
%
lwm
==
0
)
...
...
@@ -166,7 +167,7 @@ bool zmq::pipe_t::write (msg_t *msg_)
if
(
unlikely
(
!
check_write
(
msg_
)))
return
false
;
bool
more
=
msg_
->
flags
()
&
(
msg_t
::
more
|
msg_t
::
label
)
?
true
:
false
;
bool
more
=
msg_
->
flags
()
&
msg_t
::
more
?
true
:
false
;
outpipe
->
write
(
*
msg_
,
more
);
if
(
!
more
)
msgs_written
++
;
...
...
@@ -180,7 +181,7 @@ void zmq::pipe_t::rollback ()
msg_t
msg
;
if
(
outpipe
)
{
while
(
outpipe
->
unwrite
(
&
msg
))
{
zmq_assert
(
msg
.
flags
()
&
(
msg_t
::
more
|
msg_t
::
label
)
);
zmq_assert
(
msg
.
flags
()
&
msg_t
::
more
);
int
rc
=
msg
.
close
();
errno_assert
(
rc
==
0
);
}
...
...
src/rep.cpp
View file @
7842c710
...
...
@@ -43,7 +43,7 @@ int zmq::rep_t::xsend (msg_t *msg_, int flags_)
return
-
1
;
}
bool
more
=
msg_
->
flags
()
&
(
msg_t
::
more
|
msg_t
::
label
)
?
true
:
false
;
bool
more
=
msg_
->
flags
()
&
msg_t
::
more
?
true
:
false
;
// Push message to the reply pipe.
int
rc
=
xrep_t
::
xsend
(
msg_
,
flags_
);
...
...
@@ -72,19 +72,20 @@ int zmq::rep_t::xrecv (msg_t *msg_, int flags_)
int
rc
=
xrep_t
::
xrecv
(
msg_
,
flags_
);
if
(
rc
!=
0
)
return
rc
;
if
(
!
(
msg_
->
flags
()
&
msg_t
::
label
))
break
;
zmq_assert
(
msg_
->
flags
()
&
msg_t
::
more
);
bool
bottom
=
(
msg_
->
size
()
==
0
)
;
rc
=
xrep_t
::
xsend
(
msg_
,
flags_
);
errno_assert
(
rc
==
0
);
if
(
bottom
)
break
;
}
request_begins
=
false
;
}
else
{
int
rc
=
xrep_t
::
xrecv
(
msg_
,
flags_
);
if
(
rc
!=
0
)
return
rc
;
}
zmq_assert
(
!
(
msg_
->
flags
()
&
msg_t
::
label
));
// Get next message part to return to the user.
int
rc
=
xrep_t
::
xrecv
(
msg_
,
flags_
);
if
(
rc
!=
0
)
return
rc
;
// If whole request is read, flip the FSM to reply-sending state.
if
(
!
(
msg_
->
flags
()
&
msg_t
::
more
))
{
...
...
src/req.cpp
View file @
7842c710
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
...
...
@@ -29,8 +30,7 @@
zmq
::
req_t
::
req_t
(
class
ctx_t
*
parent_
,
uint32_t
tid_
)
:
xreq_t
(
parent_
,
tid_
),
receiving_reply
(
false
),
message_begins
(
true
),
request_id
(
generate_random
())
message_begins
(
true
)
{
options
.
type
=
ZMQ_REQ
;
}
...
...
@@ -50,19 +50,17 @@ int zmq::req_t::xsend (msg_t *msg_, int flags_)
// First part of the request is the request identity.
if
(
message_begins
)
{
msg_t
prefix
;
int
rc
=
prefix
.
init_size
(
4
);
msg_t
bottom
;
int
rc
=
bottom
.
init
(
);
errno_assert
(
rc
==
0
);
prefix
.
set_flags
(
msg_t
::
label
);
unsigned
char
*
data
=
(
unsigned
char
*
)
prefix
.
data
();
put_uint32
(
data
,
request_id
);
rc
=
xreq_t
::
xsend
(
&
prefix
,
flags_
);
bottom
.
set_flags
(
msg_t
::
more
);
rc
=
xreq_t
::
xsend
(
&
bottom
,
0
);
if
(
rc
!=
0
)
return
rc
;
return
-
1
;
message_begins
=
false
;
}
bool
more
=
msg_
->
flags
()
&
(
msg_t
::
more
|
msg_t
::
label
)
?
true
:
false
;
bool
more
=
msg_
->
flags
()
&
msg_t
::
more
?
true
:
false
;
int
rc
=
xreq_t
::
xsend
(
msg_
,
flags_
);
if
(
rc
!=
0
)
...
...
@@ -92,25 +90,11 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)
return
rc
;
// TODO: This should also close the connection with the peer!
if
(
unlikely
(
!
(
msg_
->
flags
()
&
msg_t
::
label
)
||
msg_
->
size
()
!=
4
))
{
if
(
unlikely
(
!
(
msg_
->
flags
()
&
msg_t
::
more
)
||
msg_
->
size
()
!=
0
))
{
while
(
true
)
{
int
rc
=
xreq_t
::
xrecv
(
msg_
,
flags_
);
errno_assert
(
rc
==
0
);
if
(
!
(
msg_
->
flags
()
&
(
msg_t
::
label
|
msg_t
::
more
)))
break
;
}
msg_
->
close
();
msg_
->
init
();
errno
=
EAGAIN
;
return
-
1
;
}
unsigned
char
*
data
=
(
unsigned
char
*
)
msg_
->
data
();
if
(
unlikely
(
get_uint32
(
data
)
!=
request_id
))
{
while
(
true
)
{
int
rc
=
xreq_t
::
xrecv
(
msg_
,
flags_
);
errno_assert
(
rc
==
0
);
if
(
!
(
msg_
->
flags
()
&
(
msg_t
::
label
|
msg_t
::
more
)))
if
(
!
(
msg_
->
flags
()
&
msg_t
::
more
))
break
;
}
msg_
->
close
();
...
...
@@ -118,6 +102,7 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)
errno
=
EAGAIN
;
return
-
1
;
}
message_begins
=
false
;
}
...
...
@@ -126,8 +111,7 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)
return
rc
;
// If the reply is fully received, flip the FSM into request-sending state.
if
(
!
(
msg_
->
flags
()
&
(
msg_t
::
more
|
msg_t
::
label
)))
{
request_id
++
;
if
(
!
(
msg_
->
flags
()
&
msg_t
::
more
))
{
receiving_reply
=
false
;
message_begins
=
true
;
}
...
...
@@ -167,8 +151,8 @@ zmq::req_session_t::~req_session_t ()
int
zmq
::
req_session_t
::
write
(
msg_t
*
msg_
)
{
if
(
state
==
request_id
)
{
if
(
msg_
->
flags
()
==
msg_t
::
label
&&
msg_
->
size
()
==
4
)
{
if
(
state
==
bottom
)
{
if
(
msg_
->
flags
()
==
msg_t
::
more
&&
msg_
->
size
()
==
0
)
{
state
=
body
;
return
xreq_session_t
::
write
(
msg_
);
}
...
...
@@ -177,7 +161,7 @@ int zmq::req_session_t::write (msg_t *msg_)
if
(
msg_
->
flags
()
==
msg_t
::
more
)
return
xreq_session_t
::
write
(
msg_
);
if
(
msg_
->
flags
()
==
0
)
{
state
=
request_id
;
state
=
bottom
;
return
xreq_session_t
::
write
(
msg_
);
}
}
...
...
src/req.hpp
View file @
7842c710
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
...
...
@@ -51,10 +52,6 @@ namespace zmq
// of the message must be empty message part (backtrace stack bottom).
bool
message_begins
;
// Request ID. Request numbers gradually increase (and wrap over)
// so that we don't have to generate random ID for each request.
uint32_t
request_id
;
req_t
(
const
req_t
&
);
const
req_t
&
operator
=
(
const
req_t
&
);
};
...
...
@@ -74,7 +71,7 @@ namespace zmq
private
:
enum
{
request_id
,
bottom
,
body
}
state
;
...
...
src/session_base.cpp
View file @
7842c710
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
...
...
@@ -149,9 +150,8 @@ int zmq::session_base_t::read (msg_t *msg_)
errno
=
EAGAIN
;
return
-
1
;
}
incomplete_in
=
msg_
->
flags
()
&
msg_t
::
more
?
true
:
false
;
incomplete_in
=
msg_
->
flags
()
&
(
msg_t
::
more
|
msg_t
::
label
)
?
true
:
false
;
return
0
;
}
...
...
src/socket_base.cpp
View file @
7842c710
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
...
...
@@ -120,8 +121,6 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_) :
destroyed
(
false
),
last_tsc
(
0
),
ticks
(
0
),
rcvlabel
(
false
),
rcvcmd
(
false
),
rcvmore
(
false
)
{
}
...
...
@@ -252,26 +251,6 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
return
-
1
;
}
if
(
option_
==
ZMQ_RCVLABEL
)
{
if
(
*
optvallen_
<
sizeof
(
int
))
{
errno
=
EINVAL
;
return
-
1
;
}
*
((
int
*
)
optval_
)
=
rcvlabel
?
1
:
0
;
*
optvallen_
=
sizeof
(
int
);
return
0
;
}
if
(
option_
==
ZMQ_RCVCMD
)
{
if
(
*
optvallen_
<
sizeof
(
int
))
{
errno
=
EINVAL
;
return
-
1
;
}
*
((
int
*
)
optval_
)
=
rcvcmd
?
1
:
0
;
*
optvallen_
=
sizeof
(
int
);
return
0
;
}
if
(
option_
==
ZMQ_RCVMORE
)
{
if
(
*
optvallen_
<
sizeof
(
int
))
{
errno
=
EINVAL
;
...
...
@@ -496,12 +475,8 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
return
-
1
;
// At this point we impose the flags on the message.
if
(
flags_
&
ZMQ_SNDLABEL
)
msg_
->
set_flags
(
msg_t
::
label
);
if
(
flags_
&
ZMQ_SNDMORE
)
msg_
->
set_flags
(
msg_t
::
more
);
if
(
flags_
&
ZMQ_SNDCMD
)
msg_
->
set_flags
(
msg_t
::
command
);
// Try to send the message.
rc
=
xsend
(
msg_
,
flags_
);
...
...
@@ -870,13 +845,7 @@ void zmq::socket_base_t::terminated (pipe_t *pipe_)
void
zmq
::
socket_base_t
::
extract_flags
(
msg_t
*
msg_
)
{
rcvlabel
=
msg_
->
flags
()
&
msg_t
::
label
;
if
(
rcvlabel
)
msg_
->
reset_flags
(
msg_t
::
label
);
rcvmore
=
msg_
->
flags
()
&
msg_t
::
more
?
true
:
false
;
if
(
rcvmore
)
msg_
->
reset_flags
(
msg_t
::
more
);
rcvcmd
=
msg_
->
flags
()
&
msg_t
::
command
?
true
:
false
;
if
(
rcvcmd
)
msg_
->
reset_flags
(
msg_t
::
command
);
}
src/socket_base.hpp
View file @
7842c710
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
...
...
@@ -184,12 +185,6 @@ namespace zmq
// Number of messages received since last command processing.
int
ticks
;
// True if the last message received had LABEL flag set.
bool
rcvlabel
;
// True if the last message received had COMMAND flag set.
bool
rcvcmd
;
// True if the last message received had MORE flag set.
bool
rcvmore
;
...
...
src/xpub.cpp
View file @
7842c710
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
...
...
@@ -101,8 +102,7 @@ void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_)
int
zmq
::
xpub_t
::
xsend
(
msg_t
*
msg_
,
int
flags_
)
{
bool
msg_more
=
msg_
->
flags
()
&
(
msg_t
::
more
|
msg_t
::
label
)
?
true
:
false
;
bool
msg_more
=
msg_
->
flags
()
&
msg_t
::
more
?
true
:
false
;
// For the first part of multi-part message, find the matching pipes.
if
(
!
more
)
...
...
src/xrep.cpp
View file @
7842c710
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
...
...
@@ -128,7 +129,7 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
// If we have malformed message (prefix with no subsequent message)
// then just silently ignore it.
// TODO: The connections should be killed instead.
if
(
msg_
->
flags
()
&
msg_t
::
label
)
{
if
(
msg_
->
flags
()
&
msg_t
::
more
)
{
more_out
=
true
;
...
...
@@ -162,7 +163,7 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
}
// Check whether this is the last part of the message.
more_out
=
msg_
->
flags
()
&
(
msg_t
::
more
|
msg_t
::
label
)
?
true
:
false
;
more_out
=
msg_
->
flags
()
&
msg_t
::
more
?
true
:
false
;
// Push the message into the pipe. If there's no out pipe, just drop it.
if
(
current_out
)
{
...
...
@@ -192,7 +193,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
if
(
prefetched
)
{
int
rc
=
msg_
->
move
(
prefetched_msg
);
errno_assert
(
rc
==
0
);
more_in
=
msg_
->
flags
()
&
(
msg_t
::
more
|
msg_t
::
label
)
?
true
:
false
;
more_in
=
msg_
->
flags
()
&
msg_t
::
more
?
true
:
false
;
prefetched
=
false
;
return
0
;
}
...
...
@@ -205,7 +206,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
// If we are in the middle of reading a message, just return the next part.
if
(
more_in
)
{
more_in
=
msg_
->
flags
()
&
(
msg_t
::
more
|
msg_t
::
label
)
?
true
:
false
;
more_in
=
msg_
->
flags
()
&
msg_t
::
more
?
true
:
false
;
return
0
;
}
...
...
@@ -219,7 +220,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
rc
=
msg_
->
init_size
(
4
);
errno_assert
(
rc
==
0
);
put_uint32
((
unsigned
char
*
)
msg_
->
data
(),
pipe
->
get_pipe_id
());
msg_
->
set_flags
(
msg_t
::
label
);
msg_
->
set_flags
(
msg_t
::
more
);
return
0
;
}
...
...
src/xsub.cpp
View file @
7842c710
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
...
...
@@ -117,7 +118,7 @@ int zmq::xsub_t::xrecv (msg_t *msg_, int flags_)
int
rc
=
msg_
->
move
(
message
);
errno_assert
(
rc
==
0
);
has_message
=
false
;
more
=
msg_
->
flags
()
&
(
msg_t
::
more
|
msg_t
::
label
)
?
true
:
false
;
more
=
msg_
->
flags
()
&
msg_t
::
more
?
true
:
false
;
return
0
;
}
...
...
@@ -137,14 +138,13 @@ int zmq::xsub_t::xrecv (msg_t *msg_, int flags_)
// Check whether the message matches at least one subscription.
// Non-initial parts of the message are passed
if
(
more
||
!
options
.
filter
||
match
(
msg_
))
{
more
=
msg_
->
flags
()
&
(
msg_t
::
more
|
msg_t
::
label
)
?
true
:
false
;
more
=
msg_
->
flags
()
&
msg_t
::
more
?
true
:
false
;
return
0
;
}
// Message doesn't match. Pop any remaining parts of the message
// from the pipe.
while
(
msg_
->
flags
()
&
(
msg_t
::
more
|
msg_t
::
label
)
)
{
while
(
msg_
->
flags
()
&
msg_t
::
more
)
{
rc
=
fq
.
recv
(
msg_
,
ZMQ_DONTWAIT
);
zmq_assert
(
rc
==
0
);
}
...
...
@@ -184,7 +184,7 @@ bool zmq::xsub_t::xhas_in ()
// Message doesn't match. Pop any remaining parts of the message
// from the pipe.
while
(
message
.
flags
()
&
(
msg_t
::
more
|
msg_t
::
label
)
)
{
while
(
message
.
flags
()
&
msg_t
::
more
)
{
rc
=
fq
.
recv
(
&
message
,
ZMQ_DONTWAIT
);
zmq_assert
(
rc
==
0
);
}
...
...
tests/Makefile.am
View file @
7842c710
...
...
@@ -7,7 +7,6 @@ noinst_PROGRAMS = test_pair_inproc \
test_reqrep_tcp
\
test_hwm
\
test_reqrep_device
\
test_reqrep_drop
\
test_sub_forward
\
test_invalid_rep
...
...
@@ -24,7 +23,6 @@ test_reqrep_inproc_SOURCES = test_reqrep_inproc.cpp testutil.hpp
test_reqrep_tcp_SOURCES
=
test_reqrep_tcp.cpp testutil.hpp
test_hwm_SOURCES
=
test_hwm.cpp
test_reqrep_device_SOURCES
=
test_reqrep_device.cpp
test_reqrep_drop_SOURCES
=
test_reqrep_drop.cpp
test_sub_forward_SOURCES
=
test_sub_forward.cpp
test_invalid_rep_SOURCES
=
test_invalid_rep.cpp
...
...
tests/test_invalid_rep.cpp
View file @
7842c710
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
...
...
@@ -47,12 +48,12 @@ int main (int argc, char *argv [])
// Receive the request.
char
addr
[
4
];
char
seqn
[
4
];
char
bottom
[
1
];
char
body
[
1
];
rc
=
zmq_recv
(
xrep_socket
,
addr
,
sizeof
(
addr
),
0
);
assert
(
rc
==
4
);
rc
=
zmq_recv
(
xrep_socket
,
seqn
,
sizeof
(
seqn
),
0
);
assert
(
rc
==
4
);
rc
=
zmq_recv
(
xrep_socket
,
bottom
,
sizeof
(
bottom
),
0
);
assert
(
rc
==
0
);
rc
=
zmq_recv
(
xrep_socket
,
body
,
sizeof
(
body
),
0
);
assert
(
rc
==
1
);
...
...
@@ -61,10 +62,10 @@ int main (int argc, char *argv [])
assert
(
rc
==
4
);
// Send valid reply.
rc
=
zmq_send
(
xrep_socket
,
addr
,
4
,
ZMQ_SNDLABEL
);
assert
(
rc
==
4
);
rc
=
zmq_send
(
xrep_socket
,
seqn
,
4
,
ZMQ_SNDLABEL
);
rc
=
zmq_send
(
xrep_socket
,
addr
,
4
,
ZMQ_SNDMORE
);
assert
(
rc
==
4
);
rc
=
zmq_send
(
xrep_socket
,
bottom
,
0
,
ZMQ_SNDMORE
);
assert
(
rc
==
0
);
rc
=
zmq_send
(
xrep_socket
,
"b"
,
1
,
0
);
assert
(
rc
==
1
);
...
...
tests/test_reqrep_device.cpp
View file @
7842c710
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
...
...
@@ -64,15 +65,11 @@ int main (int argc, char *argv [])
assert
(
rc
==
0
);
rc
=
zmq_recvmsg
(
xrep
,
&
msg
,
0
);
assert
(
rc
>=
0
);
int
rcvlabel
;
size_t
sz
=
sizeof
(
rcvlabel
);
rc
=
zmq_getsockopt
(
xrep
,
ZMQ_RCVLABEL
,
&
rcvlabel
,
&
sz
);
assert
(
rc
==
0
);
int
rcvmore
;
size_t
sz
=
sizeof
(
rcvmore
);
rc
=
zmq_getsockopt
(
xrep
,
ZMQ_RCVMORE
,
&
rcvmore
,
&
sz
);
assert
(
rc
==
0
);
rc
=
zmq_sendmsg
(
xreq
,
&
msg
,
(
rcvlabel
?
ZMQ_SNDLABEL
:
0
)
|
(
rcvmore
?
ZMQ_SNDMORE
:
0
));
rc
=
zmq_sendmsg
(
xreq
,
&
msg
,
rcvmore
?
ZMQ_SNDMORE
:
0
);
assert
(
rc
>=
0
);
}
...
...
@@ -81,21 +78,14 @@ int main (int argc, char *argv [])
rc
=
zmq_recv
(
rep
,
buff
,
3
,
0
);
assert
(
rc
==
3
);
assert
(
memcmp
(
buff
,
"ABC"
,
3
)
==
0
);
int
rcvlabel
;
size_t
sz
=
sizeof
(
rcvlabel
);
rc
=
zmq_getsockopt
(
rep
,
ZMQ_RCVLABEL
,
&
rcvlabel
,
&
sz
);
assert
(
rc
==
0
);
assert
(
!
rcvlabel
);
int
rcvmore
;
size_t
sz
=
sizeof
(
rcvmore
);
rc
=
zmq_getsockopt
(
rep
,
ZMQ_RCVMORE
,
&
rcvmore
,
&
sz
);
assert
(
rc
==
0
);
assert
(
rcvmore
);
rc
=
zmq_recv
(
rep
,
buff
,
3
,
0
);
assert
(
rc
==
3
);
assert
(
memcmp
(
buff
,
"DEF"
,
3
)
==
0
);
rc
=
zmq_getsockopt
(
rep
,
ZMQ_RCVLABEL
,
&
rcvlabel
,
&
sz
);
assert
(
rc
==
0
);
assert
(
!
rcvlabel
);
rc
=
zmq_getsockopt
(
rep
,
ZMQ_RCVMORE
,
&
rcvmore
,
&
sz
);
assert
(
rc
==
0
);
assert
(
!
rcvmore
);
...
...
@@ -113,15 +103,10 @@ int main (int argc, char *argv [])
assert
(
rc
==
0
);
rc
=
zmq_recvmsg
(
xreq
,
&
msg
,
0
);
assert
(
rc
>=
0
);
int
rcvlabel
;
size_t
sz
=
sizeof
(
rcvlabel
);
rc
=
zmq_getsockopt
(
xreq
,
ZMQ_RCVLABEL
,
&
rcvlabel
,
&
sz
);
assert
(
rc
==
0
);
int
rcvmore
;
rc
=
zmq_getsockopt
(
xreq
,
ZMQ_RCVMORE
,
&
rcvmore
,
&
sz
);
assert
(
rc
==
0
);
rc
=
zmq_sendmsg
(
xrep
,
&
msg
,
(
rcvlabel
?
ZMQ_SNDLABEL
:
0
)
|
(
rcvmore
?
ZMQ_SNDMORE
:
0
));
rc
=
zmq_sendmsg
(
xrep
,
&
msg
,
rcvmore
?
ZMQ_SNDMORE
:
0
);
assert
(
rc
>=
0
);
}
...
...
@@ -129,18 +114,12 @@ int main (int argc, char *argv [])
rc
=
zmq_recv
(
req
,
buff
,
3
,
0
);
assert
(
rc
==
3
);
assert
(
memcmp
(
buff
,
"GHI"
,
3
)
==
0
);
rc
=
zmq_getsockopt
(
req
,
ZMQ_RCVLABEL
,
&
rcvlabel
,
&
sz
);
assert
(
rc
==
0
);
assert
(
!
rcvlabel
);
rc
=
zmq_getsockopt
(
req
,
ZMQ_RCVMORE
,
&
rcvmore
,
&
sz
);
assert
(
rc
==
0
);
assert
(
rcvmore
);
rc
=
zmq_recv
(
req
,
buff
,
3
,
0
);
assert
(
rc
==
3
);
assert
(
memcmp
(
buff
,
"JKL"
,
3
)
==
0
);
rc
=
zmq_getsockopt
(
req
,
ZMQ_RCVLABEL
,
&
rcvlabel
,
&
sz
);
assert
(
rc
==
0
);
assert
(
!
rcvlabel
);
rc
=
zmq_getsockopt
(
req
,
ZMQ_RCVMORE
,
&
rcvmore
,
&
sz
);
assert
(
rc
==
0
);
assert
(
!
rcvmore
);
...
...
tests/test_reqrep_drop.cpp
deleted
100644 → 0
View file @
626099aa
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include "../include/zmq.h"
#include "../include/zmq_utils.h"
int
main
(
int
argc
,
char
*
argv
[])
{
void
*
ctx
=
zmq_init
(
1
);
assert
(
ctx
);
// Check whether requests are discarded because of disconnected requester.
// Create a server.
void
*
xrep
=
zmq_socket
(
ctx
,
ZMQ_XREP
);
assert
(
xrep
);
int
rc
=
zmq_bind
(
xrep
,
"tcp://127.0.0.1:5560"
);
assert
(
rc
==
0
);
// Create a client.
void
*
xreq
=
zmq_socket
(
ctx
,
ZMQ_XREQ
);
assert
(
xreq
);
rc
=
zmq_connect
(
xreq
,
"tcp://127.0.0.1:5560"
);
assert
(
rc
==
0
);
// Send requests.
rc
=
zmq_send
(
xreq
,
"ABC"
,
3
,
0
);
assert
(
rc
==
3
);
rc
=
zmq_send
(
xreq
,
"DEF"
,
3
,
0
);
assert
(
rc
==
3
);
// Disconnect client.
rc
=
zmq_close
(
xreq
);
assert
(
rc
==
0
);
// Wait a while for disconnect to happen.
zmq_sleep
(
1
);
// Try to receive a request -- it should have been discarded.
char
buff
[
3
];
rc
=
zmq_recv
(
xrep
,
buff
,
3
,
ZMQ_DONTWAIT
);
assert
(
rc
<
0
);
assert
(
errno
==
EAGAIN
);
// Clean up.
rc
=
zmq_close
(
xrep
);
assert
(
rc
==
0
);
// New test. Check whether reply is dropped because of HWM overflow.
int
one
=
1
;
xreq
=
zmq_socket
(
ctx
,
ZMQ_XREQ
);
assert
(
xreq
);
rc
=
zmq_setsockopt
(
xreq
,
ZMQ_RCVHWM
,
&
one
,
sizeof
(
one
));
assert
(
rc
==
0
);
rc
=
zmq_bind
(
xreq
,
"inproc://a"
);
assert
(
rc
==
0
);
void
*
rep
=
zmq_socket
(
ctx
,
ZMQ_REP
);
assert
(
rep
);
rc
=
zmq_setsockopt
(
rep
,
ZMQ_SNDHWM
,
&
one
,
sizeof
(
one
));
assert
(
rc
==
0
);
rc
=
zmq_connect
(
rep
,
"inproc://a"
);
assert
(
rc
==
0
);
// Send request 1
rc
=
zmq_send
(
xreq
,
buff
,
1
,
0
);
assert
(
rc
==
1
);
// Send request 2
rc
=
zmq_send
(
xreq
,
buff
,
1
,
0
);
assert
(
rc
==
1
);
// Receive request 1
rc
=
zmq_recv
(
rep
,
buff
,
1
,
0
);
assert
(
rc
==
1
);
// Send request 3
rc
=
zmq_send
(
xreq
,
buff
,
1
,
0
);
assert
(
rc
==
1
);
// Send reply 1
rc
=
zmq_send
(
rep
,
buff
,
1
,
0
);
assert
(
rc
==
1
);
// Receive request 2
rc
=
zmq_recv
(
rep
,
buff
,
1
,
0
);
assert
(
rc
==
1
);
// Send reply 2
rc
=
zmq_send
(
rep
,
buff
,
1
,
0
);
assert
(
rc
==
1
);
// Receive request 3
rc
=
zmq_recv
(
rep
,
buff
,
1
,
0
);
assert
(
rc
==
1
);
// Send reply 3
rc
=
zmq_send
(
rep
,
buff
,
1
,
0
);
assert
(
rc
==
1
);
// Receive reply 1
rc
=
zmq_recv
(
xreq
,
buff
,
1
,
0
);
assert
(
rc
==
1
);
// Receive reply 2
rc
=
zmq_recv
(
xreq
,
buff
,
1
,
0
);
assert
(
rc
==
1
);
// Try to receive reply 3, it should have been dropped.
rc
=
zmq_recv
(
xreq
,
buff
,
1
,
ZMQ_DONTWAIT
);
assert
(
rc
==
-
1
&&
errno
==
EAGAIN
);
// Clean up.
rc
=
zmq_close
(
xreq
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
rep
);
assert
(
rc
==
0
);
rc
=
zmq_term
(
ctx
);
assert
(
rc
==
0
);
return
0
;
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment