Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
06538fc1
Commit
06538fc1
authored
Mar 27, 2010
by
Martin Sustrik
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
multi-part messages work with REQ/REP sockets
parent
bbfac783
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
101 additions
and
49 deletions
+101
-49
rep.cpp
src/rep.cpp
+47
-24
rep.hpp
src/rep.hpp
+7
-2
req.cpp
src/req.cpp
+40
-21
req.hpp
src/req.hpp
+7
-2
No files found.
src/rep.cpp
View file @
06538fc1
...
@@ -27,7 +27,8 @@ zmq::rep_t::rep_t (class app_thread_t *parent_) :
...
@@ -27,7 +27,8 @@ zmq::rep_t::rep_t (class app_thread_t *parent_) :
socket_base_t
(
parent_
),
socket_base_t
(
parent_
),
active
(
0
),
active
(
0
),
current
(
0
),
current
(
0
),
waiting_for_reply
(
false
),
sending_reply
(
false
),
tbc
(
false
),
reply_pipe
(
NULL
)
reply_pipe
(
NULL
)
{
{
options
.
requires_in
=
true
;
options
.
requires_in
=
true
;
...
@@ -58,6 +59,8 @@ void zmq::rep_t::xattach_pipes (class reader_t *inpipe_,
...
@@ -58,6 +59,8 @@ void zmq::rep_t::xattach_pipes (class reader_t *inpipe_,
void
zmq
::
rep_t
::
xdetach_inpipe
(
class
reader_t
*
pipe_
)
void
zmq
::
rep_t
::
xdetach_inpipe
(
class
reader_t
*
pipe_
)
{
{
zmq_assert
(
sending_reply
||
!
tbc
||
in_pipes
[
current
]
!=
pipe_
);
zmq_assert
(
pipe_
);
zmq_assert
(
pipe_
);
zmq_assert
(
in_pipes
.
size
()
==
out_pipes
.
size
());
zmq_assert
(
in_pipes
.
size
()
==
out_pipes
.
size
());
...
@@ -90,6 +93,8 @@ void zmq::rep_t::xdetach_inpipe (class reader_t *pipe_)
...
@@ -90,6 +93,8 @@ void zmq::rep_t::xdetach_inpipe (class reader_t *pipe_)
void
zmq
::
rep_t
::
xdetach_outpipe
(
class
writer_t
*
pipe_
)
void
zmq
::
rep_t
::
xdetach_outpipe
(
class
writer_t
*
pipe_
)
{
{
zmq_assert
(
!
sending_reply
||
!
tbc
||
reply_pipe
!=
pipe_
);
zmq_assert
(
pipe_
);
zmq_assert
(
pipe_
);
zmq_assert
(
in_pipes
.
size
()
==
out_pipes
.
size
());
zmq_assert
(
in_pipes
.
size
()
==
out_pipes
.
size
());
...
@@ -98,7 +103,7 @@ void zmq::rep_t::xdetach_outpipe (class writer_t *pipe_)
...
@@ -98,7 +103,7 @@ void zmq::rep_t::xdetach_outpipe (class writer_t *pipe_)
// If the connection we've got the request from disconnects,
// If the connection we've got the request from disconnects,
// there's nowhere to send the reply. Forget about the reply pipe.
// there's nowhere to send the reply. Forget about the reply pipe.
// Once the reply is sent it will be dropped.
// Once the reply is sent it will be dropped.
if
(
waiting_for
_reply
&&
pipe_
==
reply_pipe
)
if
(
sending
_reply
&&
pipe_
==
reply_pipe
)
reply_pipe
=
NULL
;
reply_pipe
=
NULL
;
// If corresponding inpipe is still in place simply nullify the pointer
// If corresponding inpipe is still in place simply nullify the pointer
...
@@ -157,29 +162,36 @@ int zmq::rep_t::xsetsockopt (int option_, const void *optval_,
...
@@ -157,29 +162,36 @@ int zmq::rep_t::xsetsockopt (int option_, const void *optval_,
int
zmq
::
rep_t
::
xsend
(
zmq_msg_t
*
msg_
,
int
flags_
)
int
zmq
::
rep_t
::
xsend
(
zmq_msg_t
*
msg_
,
int
flags_
)
{
{
if
(
!
waiting_for
_reply
)
{
if
(
!
sending
_reply
)
{
errno
=
EFSM
;
errno
=
EFSM
;
return
-
1
;
return
-
1
;
}
}
// Push message to the selected pipe. If requester have disconnected
// Check whether it's last part of the reply.
// in the meantime, drop the reply.
tbc
=
msg_
->
flags
&
ZMQ_MSG_TBC
;
if
(
reply_pipe
)
{
if
(
reply_pipe
)
{
// Push message to the reply pipe.
bool
written
=
reply_pipe
->
write
(
msg_
);
bool
written
=
reply_pipe
->
write
(
msg_
);
if
(
written
)
zmq_assert
(
!
tbc
||
written
);
reply_pipe
->
flush
();
else
// The pipe is full...
// The pipe is full; just drop the reference to
// TODO: Tear down the underlying connection (?)
// the message content.
zmq_assert
(
written
);
// TODO: Tear down the underlying connection.
zmq_msg_close
(
msg_
);
}
}
else
{
else
{
// If the requester have disconnected in the meantime, drop the reply.
zmq_msg_close
(
msg_
);
zmq_msg_close
(
msg_
);
}
}
waiting_for_reply
=
false
;
// Flush the reply to the requester.
reply_pipe
=
NULL
;
if
(
!
tbc
)
{
reply_pipe
->
flush
();
sending_reply
=
false
;
reply_pipe
=
NULL
;
}
// Detach the message from the data buffer.
// Detach the message from the data buffer.
int
rc
=
zmq_msg_init
(
msg_
);
int
rc
=
zmq_msg_init
(
msg_
);
...
@@ -193,7 +205,7 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)
...
@@ -193,7 +205,7 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)
// Deallocate old content of the message.
// Deallocate old content of the message.
zmq_msg_close
(
msg_
);
zmq_msg_close
(
msg_
);
if
(
waiting_for
_reply
)
{
if
(
sending
_reply
)
{
errno
=
EFSM
;
errno
=
EFSM
;
return
-
1
;
return
-
1
;
}
}
...
@@ -201,15 +213,19 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)
...
@@ -201,15 +213,19 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)
// Round-robin over the pipes to get next message.
// Round-robin over the pipes to get next message.
for
(
int
count
=
active
;
count
!=
0
;
count
--
)
{
for
(
int
count
=
active
;
count
!=
0
;
count
--
)
{
bool
fetched
=
in_pipes
[
current
]
->
read
(
msg_
);
bool
fetched
=
in_pipes
[
current
]
->
read
(
msg_
);
zmq_assert
(
!
(
tbc
&&
!
fetched
));
if
(
fetched
)
{
if
(
fetched
)
{
reply_pipe
=
out_pipes
[
current
];
tbc
=
msg_
->
flags
&
ZMQ_MSG_TBC
;
waiting_for_reply
=
true
;
if
(
!
tbc
)
{
}
reply_pipe
=
out_pipes
[
current
];
current
++
;
sending_reply
=
true
;
if
(
current
>=
active
)
current
++
;
current
=
0
;
if
(
current
>=
active
)
if
(
fetched
)
current
=
0
;
}
return
0
;
return
0
;
}
}
}
// No message is available. Initialise the output parameter
// No message is available. Initialise the output parameter
...
@@ -221,9 +237,12 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)
...
@@ -221,9 +237,12 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)
bool
zmq
::
rep_t
::
xhas_in
()
bool
zmq
::
rep_t
::
xhas_in
()
{
{
if
(
!
sending_reply
&&
tbc
)
return
true
;
for
(
int
count
=
active
;
count
!=
0
;
count
--
)
{
for
(
int
count
=
active
;
count
!=
0
;
count
--
)
{
if
(
in_pipes
[
current
]
->
check_read
())
if
(
in_pipes
[
current
]
->
check_read
())
return
!
waiting_for
_reply
;
return
!
sending
_reply
;
current
++
;
current
++
;
if
(
current
>=
active
)
if
(
current
>=
active
)
current
=
0
;
current
=
0
;
...
@@ -234,6 +253,10 @@ bool zmq::rep_t::xhas_in ()
...
@@ -234,6 +253,10 @@ bool zmq::rep_t::xhas_in ()
bool
zmq
::
rep_t
::
xhas_out
()
bool
zmq
::
rep_t
::
xhas_out
()
{
{
return
waiting_for_reply
;
if
(
sending_reply
&&
tbc
)
return
true
;
// TODO: No check for write here...
return
sending_reply
;
}
}
src/rep.hpp
View file @
06538fc1
...
@@ -64,8 +64,13 @@ namespace zmq
...
@@ -64,8 +64,13 @@ namespace zmq
// Index of the next inbound pipe to read a request from.
// Index of the next inbound pipe to read a request from.
in_pipes_t
::
size_type
current
;
in_pipes_t
::
size_type
current
;
// If true, request was already received and reply wasn't sent yet.
// If true, request was already received and reply wasn't completely
bool
waiting_for_reply
;
// sent yet.
bool
sending_reply
;
// True, if message processed at the moment (either sent or received)
// is processed only partially.
bool
tbc
;
// Pipe we are going to send reply to.
// Pipe we are going to send reply to.
class
writer_t
*
reply_pipe
;
class
writer_t
*
reply_pipe
;
...
...
src/req.cpp
View file @
06538fc1
...
@@ -27,8 +27,9 @@ zmq::req_t::req_t (class app_thread_t *parent_) :
...
@@ -27,8 +27,9 @@ zmq::req_t::req_t (class app_thread_t *parent_) :
socket_base_t
(
parent_
),
socket_base_t
(
parent_
),
active
(
0
),
active
(
0
),
current
(
0
),
current
(
0
),
waiting_for
_reply
(
false
),
receiving
_reply
(
false
),
reply_pipe_active
(
false
),
reply_pipe_active
(
false
),
tbc
(
false
),
reply_pipe
(
NULL
)
reply_pipe
(
NULL
)
{
{
options
.
requires_in
=
true
;
options
.
requires_in
=
true
;
...
@@ -56,12 +57,14 @@ void zmq::req_t::xattach_pipes (class reader_t *inpipe_,
...
@@ -56,12 +57,14 @@ void zmq::req_t::xattach_pipes (class reader_t *inpipe_,
void
zmq
::
req_t
::
xdetach_inpipe
(
class
reader_t
*
pipe_
)
void
zmq
::
req_t
::
xdetach_inpipe
(
class
reader_t
*
pipe_
)
{
{
zmq_assert
(
!
receiving_reply
||
!
tbc
||
reply_pipe
!=
pipe_
);
zmq_assert
(
pipe_
);
zmq_assert
(
pipe_
);
zmq_assert
(
in_pipes
.
size
()
==
out_pipes
.
size
());
zmq_assert
(
in_pipes
.
size
()
==
out_pipes
.
size
());
// TODO: The pipe we are awaiting the reply from is detached. What now?
// TODO: The pipe we are awaiting the reply from is detached. What now?
// Return ECONNRESET from subsequent recv?
// Return ECONNRESET from subsequent recv?
if
(
waiting_for
_reply
&&
pipe_
==
reply_pipe
)
{
if
(
receiving
_reply
&&
pipe_
==
reply_pipe
)
{
zmq_assert
(
false
);
zmq_assert
(
false
);
}
}
...
@@ -93,6 +96,8 @@ void zmq::req_t::xdetach_inpipe (class reader_t *pipe_)
...
@@ -93,6 +96,8 @@ void zmq::req_t::xdetach_inpipe (class reader_t *pipe_)
void
zmq
::
req_t
::
xdetach_outpipe
(
class
writer_t
*
pipe_
)
void
zmq
::
req_t
::
xdetach_outpipe
(
class
writer_t
*
pipe_
)
{
{
zmq_assert
(
receiving_reply
||
!
tbc
||
out_pipes
[
current
]
!=
pipe_
);
zmq_assert
(
pipe_
);
zmq_assert
(
pipe_
);
zmq_assert
(
in_pipes
.
size
()
==
out_pipes
.
size
());
zmq_assert
(
in_pipes
.
size
()
==
out_pipes
.
size
());
...
@@ -124,7 +129,7 @@ void zmq::req_t::xdetach_outpipe (class writer_t *pipe_)
...
@@ -124,7 +129,7 @@ void zmq::req_t::xdetach_outpipe (class writer_t *pipe_)
void
zmq
::
req_t
::
xkill
(
class
reader_t
*
pipe_
)
void
zmq
::
req_t
::
xkill
(
class
reader_t
*
pipe_
)
{
{
zmq_assert
(
waiting_for
_reply
);
zmq_assert
(
receiving
_reply
);
zmq_assert
(
pipe_
==
reply_pipe
);
zmq_assert
(
pipe_
==
reply_pipe
);
reply_pipe_active
=
false
;
reply_pipe_active
=
false
;
...
@@ -161,7 +166,7 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)
...
@@ -161,7 +166,7 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)
{
{
// If we've sent a request and we still haven't got the reply,
// If we've sent a request and we still haven't got the reply,
// we can't send another request.
// we can't send another request.
if
(
waiting_for
_reply
)
{
if
(
receiving
_reply
)
{
errno
=
EFSM
;
errno
=
EFSM
;
return
-
1
;
return
-
1
;
}
}
...
@@ -170,6 +175,7 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)
...
@@ -170,6 +175,7 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)
if
(
out_pipes
[
current
]
->
check_write
())
if
(
out_pipes
[
current
]
->
check_write
())
break
;
break
;
zmq_assert
(
!
tbc
);
active
--
;
active
--
;
if
(
current
<
active
)
{
if
(
current
<
active
)
{
in_pipes
.
swap
(
current
,
active
);
in_pipes
.
swap
(
current
,
active
);
...
@@ -187,23 +193,25 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)
...
@@ -187,23 +193,25 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)
// Push message to the selected pipe.
// Push message to the selected pipe.
bool
written
=
out_pipes
[
current
]
->
write
(
msg_
);
bool
written
=
out_pipes
[
current
]
->
write
(
msg_
);
zmq_assert
(
written
);
zmq_assert
(
written
);
out_pipes
[
current
]
->
flush
();
tbc
=
msg_
->
flags
&
ZMQ_MSG_TBC
;
if
(
!
tbc
)
{
waiting_for_reply
=
true
;
out_pipes
[
current
]
->
flush
();
reply_pipe
=
in_pipes
[
current
];
receiving_reply
=
true
;
reply_pipe
=
in_pipes
[
current
];
// We can safely assume that the reply pipe is active as the last time
// we've used it we've read the reply and haven't tried to read from it
// We can safely assume that the reply pipe is active as the last time
// anymore.
// we've used it we've read the reply and haven't tried to read from it
reply_pipe_active
=
true
;
// anymore.
reply_pipe_active
=
true
;
// Move to the next pipe (load-balancing).
current
=
(
current
+
1
)
%
active
;
}
// Detach the message from the data buffer.
// Detach the message from the data buffer.
int
rc
=
zmq_msg_init
(
msg_
);
int
rc
=
zmq_msg_init
(
msg_
);
zmq_assert
(
rc
==
0
);
zmq_assert
(
rc
==
0
);
// Move to the next pipe (load-balancing).
current
=
(
current
+
1
)
%
active
;
return
0
;
return
0
;
}
}
...
@@ -213,7 +221,7 @@ int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_)
...
@@ -213,7 +221,7 @@ int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_)
zmq_msg_close
(
msg_
);
zmq_msg_close
(
msg_
);
// If request wasn't send, we can't wait for reply.
// If request wasn't send, we can't wait for reply.
if
(
!
waiting_for
_reply
)
{
if
(
!
receiving
_reply
)
{
zmq_msg_init
(
msg_
);
zmq_msg_init
(
msg_
);
errno
=
EFSM
;
errno
=
EFSM
;
return
-
1
;
return
-
1
;
...
@@ -226,14 +234,22 @@ int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_)
...
@@ -226,14 +234,22 @@ int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_)
return
-
1
;
return
-
1
;
}
}
waiting_for_reply
=
false
;
// If this was last part of the reply, switch to request phase.
reply_pipe
=
NULL
;
tbc
=
msg_
->
flags
&
ZMQ_MSG_TBC
;
if
(
!
tbc
)
{
receiving_reply
=
false
;
reply_pipe
=
NULL
;
}
return
0
;
return
0
;
}
}
bool
zmq
::
req_t
::
xhas_in
()
bool
zmq
::
req_t
::
xhas_in
()
{
{
if
(
!
waiting_for_reply
||
!
reply_pipe_active
)
if
(
receiving_reply
&&
tbc
)
return
true
;
if
(
!
receiving_reply
||
!
reply_pipe_active
)
return
false
;
return
false
;
zmq_assert
(
reply_pipe
);
zmq_assert
(
reply_pipe
);
...
@@ -247,7 +263,10 @@ bool zmq::req_t::xhas_in ()
...
@@ -247,7 +263,10 @@ bool zmq::req_t::xhas_in ()
bool
zmq
::
req_t
::
xhas_out
()
bool
zmq
::
req_t
::
xhas_out
()
{
{
if
(
waiting_for_reply
)
if
(
!
receiving_reply
&&
tbc
)
return
true
;
if
(
receiving_reply
)
return
false
;
return
false
;
while
(
active
>
0
)
{
while
(
active
>
0
)
{
...
...
src/req.hpp
View file @
06538fc1
...
@@ -70,12 +70,17 @@ namespace zmq
...
@@ -70,12 +70,17 @@ namespace zmq
// that's processing the request at the moment.
// that's processing the request at the moment.
out_pipes_t
::
size_type
current
;
out_pipes_t
::
size_type
current
;
// If true, request was already sent and reply wasn't received yet.
// If true, request was already sent and reply wasn't received yet or
bool
waiting_for_reply
;
// was raceived partially.
bool
receiving_reply
;
// True, if read can be attempted from the reply pipe.
// True, if read can be attempted from the reply pipe.
bool
reply_pipe_active
;
bool
reply_pipe_active
;
// True, if message processed at the moment (either sent or received)
// is processed only partially.
bool
tbc
;
// Pipe we are awaiting the reply from.
// Pipe we are awaiting the reply from.
class
reader_t
*
reply_pipe
;
class
reader_t
*
reply_pipe
;
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment