Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
3e97c0fe
Commit
3e97c0fe
authored
Aug 07, 2010
by
Martin Sustrik
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
REQ socket implementation is layered on top of XREQ
parent
f77edfce
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
47 additions
and
263 deletions
+47
-263
fq.cpp
src/fq.cpp
+7
-0
pipe.cpp
src/pipe.cpp
+3
-3
req.cpp
src/req.cpp
+32
-211
req.hpp
src/req.hpp
+5
-49
No files found.
src/fq.cpp
View file @
3e97c0fe
...
...
@@ -46,6 +46,9 @@ void zmq::fq_t::attach (reader_t *pipe_)
void
zmq
::
fq_t
::
terminated
(
reader_t
*
pipe_
)
{
// TODO: This is a problem with session-initiated termination. It breaks
// message atomicity. However, for socket initiated termination it's
// just fine.
zmq_assert
(
!
more
||
pipes
[
current
]
!=
pipe_
);
// Remove the pipe from the list; adjust number of active pipes
...
...
@@ -87,6 +90,10 @@ int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_)
// Try to fetch new message. If we've already read part of the message
// subsequent part should be immediately available.
bool
fetched
=
pipes
[
current
]
->
read
(
msg_
);
// Check the atomicity of the message. If we've already received the
// first part of the message we should get the remaining parts
// without blocking.
zmq_assert
(
!
(
more
&&
!
fetched
));
// Note that when message is not fetched, current pipe is killed and
...
...
src/pipe.cpp
View file @
3e97c0fe
...
...
@@ -269,9 +269,6 @@ void zmq::writer_t::terminate ()
if
(
terminating
)
return
;
// Rollback any unfinished messages.
rollback
();
if
(
msg_store
==
NULL
||
(
msg_store
->
empty
()
&&
!
extra_msg_flag
))
write_delimiter
();
else
...
...
@@ -280,6 +277,9 @@ void zmq::writer_t::terminate ()
void
zmq
::
writer_t
::
write_delimiter
()
{
// Rollback any unfinished messages.
rollback
();
// Push delimiter into the pipe.
// Trick the compiler to belive that the tag is a valid pointer.
zmq_msg_t
msg
;
...
...
src/req.cpp
View file @
3e97c0fe
...
...
@@ -21,129 +21,16 @@
#include "req.hpp"
#include "err.hpp"
#include "pipe.hpp"
zmq
::
req_t
::
req_t
(
class
ctx_t
*
parent_
,
uint32_t
slot_
)
:
socket_base_t
(
parent_
,
slot_
),
active
(
0
),
current
(
0
),
xreq_t
(
parent_
,
slot_
),
receiving_reply
(
false
),
reply_pipe_active
(
false
),
more
(
false
),
reply_pipe
(
NULL
)
message_begins
(
true
)
{
options
.
requires_in
=
true
;
options
.
requires_out
=
true
;
}
zmq
::
req_t
::~
req_t
()
{
zmq_assert
(
in_pipes
.
empty
());
zmq_assert
(
out_pipes
.
empty
());
}
void
zmq
::
req_t
::
xattach_pipes
(
reader_t
*
inpipe_
,
writer_t
*
outpipe_
,
const
blob_t
&
peer_identity_
)
{
zmq_assert
(
inpipe_
&&
outpipe_
);
zmq_assert
(
in_pipes
.
size
()
==
out_pipes
.
size
());
inpipe_
->
set_event_sink
(
this
);
in_pipes
.
push_back
(
inpipe_
);
in_pipes
.
swap
(
active
,
in_pipes
.
size
()
-
1
);
outpipe_
->
set_event_sink
(
this
);
out_pipes
.
push_back
(
outpipe_
);
out_pipes
.
swap
(
active
,
out_pipes
.
size
()
-
1
);
active
++
;
}
void
zmq
::
req_t
::
xterm_pipes
()
{
for
(
in_pipes_t
::
size_type
i
=
0
;
i
!=
in_pipes
.
size
();
i
++
)
in_pipes
[
i
]
->
terminate
();
for
(
out_pipes_t
::
size_type
i
=
0
;
i
!=
out_pipes
.
size
();
i
++
)
out_pipes
[
i
]
->
terminate
();
}
void
zmq
::
req_t
::
terminated
(
reader_t
*
pipe_
)
{
zmq_assert
(
!
receiving_reply
||
!
more
||
reply_pipe
!=
pipe_
);
zmq_assert
(
pipe_
);
zmq_assert
(
in_pipes
.
size
()
==
out_pipes
.
size
());
// TODO: The pipe we are awaiting the reply from is detached. What now?
if
(
receiving_reply
&&
pipe_
==
reply_pipe
)
{
zmq_assert
(
false
);
}
in_pipes_t
::
size_type
index
=
in_pipes
.
index
(
pipe_
);
// ???
if
(
!
zombie
)
{
if
(
out_pipes
[
index
])
out_pipes
[
index
]
->
terminate
();
out_pipes
.
erase
(
index
);
}
in_pipes
.
erase
(
index
);
if
(
index
<
active
)
{
active
--
;
if
(
current
==
active
)
current
=
0
;
}
}
void
zmq
::
req_t
::
terminated
(
writer_t
*
pipe_
)
{
zmq_assert
(
receiving_reply
||
!
more
||
out_pipes
[
current
]
!=
pipe_
);
zmq_assert
(
pipe_
);
zmq_assert
(
in_pipes
.
size
()
==
out_pipes
.
size
());
out_pipes_t
::
size_type
index
=
out_pipes
.
index
(
pipe_
);
// ???
if
(
!
zombie
)
{
if
(
in_pipes
[
index
])
in_pipes
[
index
]
->
terminate
();
in_pipes
.
erase
(
index
);
}
out_pipes
.
erase
(
index
);
if
(
index
<
active
)
{
active
--
;
if
(
current
==
active
)
current
=
0
;
}
}
bool
zmq
::
req_t
::
xhas_pipes
()
{
return
!
in_pipes
.
empty
()
||
!
out_pipes
.
empty
();
}
void
zmq
::
req_t
::
activated
(
reader_t
*
pipe_
)
{
// TODO: Actually, misbehaving peer can cause this kind of thing.
// Handle it decently, presumably kill the offending connection.
zmq_assert
(
pipe_
==
reply_pipe
);
reply_pipe_active
=
true
;
}
void
zmq
::
req_t
::
activated
(
writer_t
*
pipe_
)
{
out_pipes_t
::
size_type
index
=
out_pipes
.
index
(
pipe_
);
zmq_assert
(
index
>=
active
);
if
(
in_pipes
[
index
]
!=
NULL
)
{
in_pipes
.
swap
(
index
,
active
);
out_pipes
.
swap
(
index
,
active
);
active
++
;
}
}
int
zmq
::
req_t
::
xsend
(
zmq_msg_t
*
msg_
,
int
flags_
)
...
...
@@ -155,99 +42,58 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)
return
-
1
;
}
while
(
active
>
0
)
{
if
(
out_pipes
[
current
]
->
check_write
())
break
;
zmq_assert
(
!
more
);
active
--
;
if
(
current
<
active
)
{
in_pipes
.
swap
(
current
,
active
);
out_pipes
.
swap
(
current
,
active
);
}
else
current
=
0
;
}
if
(
active
==
0
)
{
errno
=
EAGAIN
;
return
-
1
;
}
// If we are starting to send the request, generate a prefix.
if
(
!
more
)
{
// First part of the request is empty message part (stack bottom).
if
(
message_begins
)
{
zmq_msg_t
prefix
;
int
rc
=
zmq_msg_init
(
&
prefix
);
zmq_assert
(
rc
==
0
);
prefix
.
flags
|=
ZMQ_MSG_MORE
;
bool
written
=
out_pipes
[
current
]
->
write
(
&
prefix
);
zmq_assert
(
written
);
prefix
.
flags
=
ZMQ_MSG_MORE
;
rc
=
xreq_t
::
xsend
(
&
prefix
,
flags_
);
if
(
rc
!=
0
)
return
rc
;
message_begins
=
false
;
}
// Push the message to the selected pipe.
bool
written
=
out_pipes
[
current
]
->
write
(
msg_
);
zmq_assert
(
written
);
more
=
msg_
->
flags
&
ZMQ_MSG_MORE
;
if
(
!
more
)
{
out_pipes
[
current
]
->
flush
();
receiving_reply
=
true
;
reply_pipe
=
in_pipes
[
current
];
bool
more
=
msg_
->
flags
&
ZMQ_MSG_MORE
;
// We can safely assume that the reply pipe is active as the last time
// we've used it we've read the reply and haven't tried to read from it
// anymore.
reply_pipe_active
=
true
;
int
rc
=
xreq_t
::
xsend
(
msg_
,
flags_
);
if
(
rc
!=
0
)
return
rc
;
// Move to the next pipe (load-balancing).
current
=
(
current
+
1
)
%
active
;
// If the request was fully sent, flip the FSM into reply-receiving state.
if
(
!
more
)
{
receiving_reply
=
true
;
message_begins
=
true
;
}
// Detach the message from the data buffer.
int
rc
=
zmq_msg_init
(
msg_
);
zmq_assert
(
rc
==
0
);
return
0
;
}
int
zmq
::
req_t
::
xrecv
(
zmq_msg_t
*
msg_
,
int
flags_
)
{
// Deallocate old content of the message.
int
rc
=
zmq_msg_close
(
msg_
);
zmq_assert
(
rc
==
0
);
// If request wasn't send, we can't wait for reply.
if
(
!
receiving_reply
)
{
zmq_msg_init
(
msg_
);
errno
=
EFSM
;
return
-
1
;
}
// Get the reply from the reply pipe.
if
(
!
reply_pipe_active
||
!
reply_pipe
->
read
(
msg_
))
{
reply_pipe_active
=
false
;
zmq_msg_init
(
msg_
);
errno
=
EAGAIN
;
return
-
1
;
}
// If we are starting to receive new reply, check whether prefix
// is well-formed and drop it.
if
(
!
more
)
{
// First part of the reply should be empty message part (stack bottom).
if
(
message_begins
)
{
int
rc
=
xreq_t
::
xrecv
(
msg_
,
flags_
);
if
(
rc
!=
0
)
return
rc
;
zmq_assert
(
msg_
->
flags
&
ZMQ_MSG_MORE
);
zmq_assert
(
zmq_msg_size
(
msg_
)
==
0
);
rc
=
zmq_msg_close
(
msg_
);
zmq_assert
(
rc
==
0
);
// Get the actual reply.
bool
recvd
=
reply_pipe
->
read
(
msg_
);
zmq_assert
(
recvd
);
}
// If this was last part of the reply, switch to request phase.
more
=
msg_
->
flags
&
ZMQ_MSG_MORE
;
if
(
!
more
)
{
int
rc
=
xreq_t
::
xrecv
(
msg_
,
flags_
);
if
(
rc
!=
0
)
return
rc
;
// If the reply is fully received, flip the FSM into request-sending state.
if
(
!
(
msg_
->
flags
&
ZMQ_MSG_MORE
))
{
receiving_reply
=
false
;
reply_pipe
=
NULL
;
message_begins
=
true
;
}
return
0
;
...
...
@@ -255,43 +101,18 @@ int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_)
bool
zmq
::
req_t
::
xhas_in
()
{
if
(
receiving_reply
&&
more
)
return
true
;
if
(
!
receiving_reply
||
!
reply_pipe_active
)
return
false
;
zmq_assert
(
reply_pipe
);
if
(
!
reply_pipe
->
check_read
())
{
reply_pipe_active
=
false
;
if
(
!
receiving_reply
)
return
false
;
}
return
true
;
return
xreq_t
::
xhas_in
()
;
}
bool
zmq
::
req_t
::
xhas_out
()
{
if
(
!
receiving_reply
&&
more
)
return
true
;
if
(
receiving_reply
)
return
false
;
while
(
active
>
0
)
{
if
(
out_pipes
[
current
]
->
check_write
())
return
true
;;
active
--
;
if
(
current
<
active
)
{
in_pipes
.
swap
(
current
,
active
);
out_pipes
.
swap
(
current
,
active
);
}
else
current
=
0
;
}
return
false
;
return
xreq_t
::
xhas_out
();
}
src/req.hpp
View file @
3e97c0fe
...
...
@@ -20,17 +20,12 @@
#ifndef __ZMQ_REQ_HPP_INCLUDED__
#define __ZMQ_REQ_HPP_INCLUDED__
#include "socket_base.hpp"
#include "yarray.hpp"
#include "pipe.hpp"
#include "xreq.hpp"
namespace
zmq
{
class
req_t
:
public
socket_base_t
,
public
i_reader_events
,
public
i_writer_events
class
req_t
:
public
xreq_t
{
public
:
...
...
@@ -38,59 +33,20 @@ namespace zmq
~
req_t
();
// Overloads of functions from socket_base_t.
void
xattach_pipes
(
reader_t
*
inpipe_
,
writer_t
*
outpipe_
,
const
blob_t
&
peer_identity_
);
void
xterm_pipes
();
bool
xhas_pipes
();
int
xsend
(
zmq_msg_t
*
msg_
,
int
flags_
);
int
xrecv
(
zmq_msg_t
*
msg_
,
int
flags_
);
bool
xhas_in
();
bool
xhas_out
();
// i_reader_events interface implementation.
void
activated
(
reader_t
*
pipe_
);
void
terminated
(
reader_t
*
pipe_
);
// i_writer_events interface implementation.
void
activated
(
writer_t
*
pipe_
);
void
terminated
(
writer_t
*
pipe_
);
private
:
// List in outbound and inbound pipes. Note that the two lists are
// always in sync. I.e. outpipe with index N communicates with the
// same session as inpipe with index N.
//
// TODO: Once we have queue limits in place, list of active outpipes
// is to be held (presumably by stacking active outpipes at
// the beginning of the array). We don't have to do the same thing for
// inpipes, because we know which pipe we want to read the
// reply from.
typedef
yarray_t
<
writer_t
>
out_pipes_t
;
out_pipes_t
out_pipes
;
typedef
yarray_t
<
reader_t
>
in_pipes_t
;
in_pipes_t
in_pipes
;
// Number of active pipes.
size_t
active
;
// Req_t load-balances the requests - 'current' points to the session
// that's processing the request at the moment.
out_pipes_t
::
size_type
current
;
// If true, request was already sent and reply wasn't received yet or
// was raceived partially.
bool
receiving_reply
;
// True, if read can be attempted from the reply pipe.
bool
reply_pipe_active
;
// True, if message processed at the moment (either sent or received)
// is processed only partially.
bool
more
;
// Pipe we are awaiting the reply from.
reader_t
*
reply_pipe
;
// If true, we are starting to send/recv a message. The first part
// of the message must be empty message part (backtrace stack bottom).
bool
message_begins
;
req_t
(
const
req_t
&
);
void
operator
=
(
const
req_t
&
);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment