Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
ad6fa9d0
Commit
ad6fa9d0
authored
Apr 27, 2010
by
Martin Sustrik
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
initial version of multi-hop REQ/REP
parent
1ad6ade0
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
280 additions
and
75 deletions
+280
-75
rep.cpp
src/rep.cpp
+65
-32
req.cpp
src/req.cpp
+26
-2
xrep.cpp
src/xrep.cpp
+158
-37
xrep.hpp
src/xrep.hpp
+31
-4
No files found.
src/rep.cpp
View file @
ad6fa9d0
...
...
@@ -167,15 +167,15 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_)
if
(
reply_pipe
)
{
//
Push message to the reply pipe.
// Push message to the reply pipe.
bool
written
=
reply_pipe
->
write
(
msg_
);
zmq_assert
(
!
more
||
written
);
//
The pipe is full...
//
When this happens, we simply return an error.
//
This makes REP sockets vulnerable to DoS attack when
//
misbehaving requesters stop collecting replies.
//
TODO: Tear down the underlying connection (?)
// The pipe is full...
// When this happens, we simply return an error.
// This makes REP sockets vulnerable to DoS attack when
// misbehaving requesters stop collecting replies.
// TODO: Tear down the underlying connection (?)
if
(
!
written
)
{
errno
=
EAGAIN
;
return
-
1
;
...
...
@@ -185,12 +185,12 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_)
}
else
{
//
If the requester have disconnected in the meantime, drop the reply.
// If the requester have disconnected in the meantime, drop the reply.
more
=
msg_
->
flags
&
ZMQ_MSG_MORE
;
zmq_msg_close
(
msg_
);
}
//
Flush the reply to the requester.
// Flush the reply to the requester.
if
(
!
more
)
{
if
(
reply_pipe
)
reply_pipe
->
flush
();
...
...
@@ -198,7 +198,7 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_)
reply_pipe
=
NULL
;
}
//
Detach the message from the data buffer.
// Detach the message from the data buffer.
int
rc
=
zmq_msg_init
(
msg_
);
zmq_assert
(
rc
==
0
);
...
...
@@ -207,37 +207,70 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_)
int
zmq
::
rep_t
::
xrecv
(
zmq_msg_t
*
msg_
,
int
flags_
)
{
// Deallocate old content of the message.
zmq_msg_close
(
msg_
);
// If we are in middle of sending a reply, we cannot receive next request.
if
(
sending_reply
)
{
errno
=
EFSM
;
return
-
1
;
}
// Round-robin over the pipes to get next message.
for
(
int
count
=
active
;
count
!=
0
;
count
--
)
{
bool
fetched
=
in_pipes
[
current
]
->
read
(
msg_
);
zmq_assert
(
!
(
more
&&
!
fetched
));
if
(
fetched
)
{
more
=
msg_
->
flags
&
ZMQ_MSG_MORE
;
if
(
!
more
)
{
reply_pipe
=
out_pipes
[
current
];
sending_reply
=
true
;
current
++
;
if
(
current
>=
active
)
current
=
0
;
}
return
0
;
// Deallocate old content of the message.
zmq_msg_close
(
msg_
);
// We haven't started reading a request yet...
if
(
!
more
)
{
// Round-robin over the pipes to get next message.
int
count
;
for
(
count
=
active
;
count
!=
0
;
count
--
)
{
if
(
in_pipes
[
current
]
->
read
(
msg_
))
break
;
current
++
;
if
(
current
>=
active
)
current
=
0
;
}
// No message is available. Initialise the output parameter
// to be a 0-byte message.
if
(
count
==
0
)
{
zmq_msg_init
(
msg_
);
errno
=
EAGAIN
;
return
-
1
;
}
// We are aware of a new message now. Setup the reply pipe.
reply_pipe
=
out_pipes
[
current
];
// Copy the routing info to the reply pipe.
while
(
true
)
{
// Push message to the reply pipe.
// TODO: What if the pipe is full?
// Tear down the underlying connection?
bool
written
=
reply_pipe
->
write
(
msg_
);
zmq_assert
(
written
);
// Message part of zero size delimits the traceback stack.
if
(
zmq_msg_size
(
msg_
)
==
0
)
break
;
// Get next part of the message.
bool
fetched
=
in_pipes
[
current
]
->
read
(
msg_
);
zmq_assert
(
fetched
);
}
}
// No message is available. Initialise the output parameter
// to be a 0-byte message.
zmq_msg_init
(
msg_
);
errno
=
EAGAIN
;
return
-
1
;
// Now the routing info is processed. Get the first part
// of the message payload and exit.
bool
fetched
=
in_pipes
[
current
]
->
read
(
msg_
);
zmq_assert
(
fetched
);
more
=
msg_
->
flags
&
ZMQ_MSG_MORE
;
if
(
!
more
)
{
current
++
;
if
(
current
>=
active
)
current
=
0
;
sending_reply
=
true
;
}
return
0
;
}
bool
zmq
::
rep_t
::
xhas_in
()
...
...
src/req.cpp
View file @
ad6fa9d0
...
...
@@ -190,7 +190,17 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)
return
-
1
;
}
// Push message to the selected pipe.
// If we are starting to send the request, generate a prefix.
if
(
!
more
)
{
zmq_msg_t
prefix
;
int
rc
=
zmq_msg_init
(
&
prefix
);
zmq_assert
(
rc
==
0
);
prefix
.
flags
|=
ZMQ_MSG_MORE
;
bool
written
=
out_pipes
[
current
]
->
write
(
&
prefix
);
zmq_assert
(
written
);
}
// Push the message to the selected pipe.
bool
written
=
out_pipes
[
current
]
->
write
(
msg_
);
zmq_assert
(
written
);
more
=
msg_
->
flags
&
ZMQ_MSG_MORE
;
...
...
@@ -218,7 +228,8 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)
int
zmq
::
req_t
::
xrecv
(
zmq_msg_t
*
msg_
,
int
flags_
)
{
// Deallocate old content of the message.
zmq_msg_close
(
msg_
);
int
rc
=
zmq_msg_close
(
msg_
);
zmq_assert
(
rc
==
0
);
// If request wasn't send, we can't wait for reply.
if
(
!
receiving_reply
)
{
...
...
@@ -234,6 +245,19 @@ int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_)
return
-
1
;
}
// If we are starting to receive new reply, check whether prefix
// is well-formed and drop it.
if
(
!
more
)
{
zmq_assert
(
msg_
->
flags
&
ZMQ_MSG_MORE
);
zmq_assert
(
zmq_msg_size
(
msg_
)
==
0
);
rc
=
zmq_msg_close
(
msg_
);
zmq_assert
(
rc
==
0
);
}
// Get the actual reply.
bool
recvd
=
reply_pipe
->
read
(
msg_
);
zmq_assert
(
recvd
);
// If this was last part of the reply, switch to request phase.
more
=
msg_
->
flags
&
ZMQ_MSG_MORE
;
if
(
!
more
)
{
...
...
src/xrep.cpp
View file @
ad6fa9d0
...
...
@@ -24,7 +24,11 @@
#include "pipe.hpp"
zmq
::
xrep_t
::
xrep_t
(
class
app_thread_t
*
parent_
)
:
socket_base_t
(
parent_
)
socket_base_t
(
parent_
),
current_in
(
0
),
more_in
(
false
),
current_out
(
NULL
),
more_out
(
false
)
{
options
.
requires_in
=
true
;
options
.
requires_out
=
true
;
...
...
@@ -32,56 +36,96 @@ zmq::xrep_t::xrep_t (class app_thread_t *parent_) :
// On connect, pipes are created only after initial handshaking.
// That way we are aware of the peer's identity when binding to the pipes.
options
.
immediate_connect
=
false
;
// XREP is unfunctional at the moment. Crash here!
zmq_assert
(
false
);
}
zmq
::
xrep_t
::~
xrep_t
()
{
for
(
inpipes_t
::
iterator
it
=
inpipes
.
begin
();
it
!=
inpipes
.
end
();
it
++
)
it
->
reader
->
term
();
for
(
outpipes_t
::
iterator
it
=
outpipes
.
begin
();
it
!=
outpipes
.
end
();
it
++
)
it
->
second
.
writer
->
term
();
}
void
zmq
::
xrep_t
::
xattach_pipes
(
class
reader_t
*
inpipe_
,
class
writer_t
*
outpipe_
,
const
blob_t
&
peer_identity_
)
{
zmq_assert
(
inpipe_
&&
outpipe_
);
fq
.
attach
(
inpipe_
);
// TODO: What if new connection has same peer identity as the old one?
outpipe_t
outpipe
=
{
outpipe_
,
true
};
bool
ok
=
outpipes
.
insert
(
std
::
make_pair
(
peer_identity_
,
outpipe
_
)).
second
;
peer_identity_
,
outpipe
)).
second
;
zmq_assert
(
ok
);
inpipe_t
inpipe
=
{
inpipe_
,
peer_identity_
,
true
};
inpipes
.
push_back
(
inpipe
);
}
void
zmq
::
xrep_t
::
xdetach_inpipe
(
class
reader_t
*
pipe_
)
{
zmq_assert
(
pipe_
);
fq
.
detach
(
pipe_
);
// TODO:!
for
(
inpipes_t
::
iterator
it
=
inpipes
.
begin
();
it
!=
inpipes
.
end
();
it
++
)
{
if
(
it
->
reader
==
pipe_
)
{
inpipes
.
erase
(
it
);
return
;
}
}
zmq_assert
(
false
);
}
void
zmq
::
xrep_t
::
xdetach_outpipe
(
class
writer_t
*
pipe_
)
{
for
(
outpipes_t
::
iterator
it
=
outpipes
.
begin
();
it
!=
outpipes
.
end
();
++
it
)
if
(
it
->
second
==
pipe_
)
{
it
!=
outpipes
.
end
();
++
it
)
{
if
(
it
->
second
.
writer
==
pipe_
)
{
outpipes
.
erase
(
it
);
if
(
pipe_
==
current_out
)
current_out
=
NULL
;
return
;
}
}
zmq_assert
(
false
);
}
void
zmq
::
xrep_t
::
xkill
(
class
reader_t
*
pipe_
)
{
fq
.
kill
(
pipe_
);
for
(
inpipes_t
::
iterator
it
=
inpipes
.
begin
();
it
!=
inpipes
.
end
();
it
++
)
{
if
(
it
->
reader
==
pipe_
)
{
zmq_assert
(
it
->
active
);
it
->
active
=
false
;
return
;
}
}
zmq_assert
(
false
);
}
void
zmq
::
xrep_t
::
xrevive
(
class
reader_t
*
pipe_
)
{
fq
.
revive
(
pipe_
);
for
(
inpipes_t
::
iterator
it
=
inpipes
.
begin
();
it
!=
inpipes
.
end
();
it
++
)
{
if
(
it
->
reader
==
pipe_
)
{
zmq_assert
(
!
it
->
active
);
it
->
active
=
true
;
return
;
}
}
zmq_assert
(
false
);
}
void
zmq
::
xrep_t
::
xrevive
(
class
writer_t
*
pipe_
)
{
for
(
outpipes_t
::
iterator
it
=
outpipes
.
begin
();
it
!=
outpipes
.
end
();
++
it
)
{
if
(
it
->
second
.
writer
==
pipe_
)
{
zmq_assert
(
!
it
->
second
.
active
);
it
->
second
.
active
=
true
;
return
;
}
}
zmq_assert
(
false
);
}
int
zmq
::
xrep_t
::
xsetsockopt
(
int
option_
,
const
void
*
optval_
,
...
...
@@ -93,33 +137,45 @@ int zmq::xrep_t::xsetsockopt (int option_, const void *optval_,
int
zmq
::
xrep_t
::
xsend
(
zmq_msg_t
*
msg_
,
int
flags_
)
{
unsigned
char
*
data
=
(
unsigned
char
*
)
zmq_msg_data
(
msg_
);
size_t
size
=
zmq_msg_size
(
msg_
);
// Check whether the message is well-formed.
zmq_assert
(
size
>=
1
);
zmq_assert
(
size_t
(
*
data
+
1
)
<=
size
);
// Find the corresponding outbound pipe. If there's none, just drop the
// message.
// TODO: There's an allocation here! It's the critical path! Get rid of it!
blob_t
identity
(
data
+
1
,
*
data
);
outpipes_t
::
iterator
it
=
outpipes
.
find
(
identity
);
if
(
it
==
outpipes
.
end
())
{
int
rc
=
zmq_msg_close
(
msg_
);
zmq_assert
(
rc
==
0
);
rc
=
zmq_msg_init
(
msg_
);
zmq_assert
(
rc
==
0
);
// If this is the first part of the message it's the identity of the
// peer to send the message to.
if
(
!
more_out
)
{
zmq_assert
(
!
current_out
);
// There's no such thing as prefix with no subsequent message.
zmq_assert
(
msg_
->
flags
&
ZMQ_MSG_MORE
);
more_out
=
true
;
// Find the pipe associated with the identity stored in the prefix.
// If there's no such pipe just silently drop the message.
blob_t
identity
((
unsigned
char
*
)
zmq_msg_data
(
msg_
),
zmq_msg_size
(
msg_
));
outpipes_t
::
iterator
it
=
outpipes
.
find
(
identity
);
if
(
it
==
outpipes
.
end
())
return
0
;
// Remember the outgoing pipe.
current_out
=
it
->
second
.
writer
;
return
0
;
}
// Push message to the selected pipe.
if
(
!
it
->
second
->
write
(
msg_
))
{
errno
=
EAGAIN
;
return
-
1
;
}
// Check whether this is the last part of the message.
more_out
=
msg_
->
flags
&
ZMQ_MSG_MORE
;
it
->
second
->
flush
();
// Push the message into the pipe. If there's no out pipe, just drop it.
if
(
current_out
)
{
bool
ok
=
current_out
->
write
(
msg_
);
zmq_assert
(
ok
);
if
(
!
more_out
)
{
current_out
->
flush
();
current_out
=
NULL
;
}
}
else
{
int
rc
=
zmq_msg_close
(
msg_
);
zmq_assert
(
rc
==
0
);
}
// Detach the message from the data buffer.
int
rc
=
zmq_msg_init
(
msg_
);
...
...
@@ -130,12 +186,77 @@ int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_)
int
zmq
::
xrep_t
::
xrecv
(
zmq_msg_t
*
msg_
,
int
flags_
)
{
return
fq
.
recv
(
msg_
,
flags_
);
// Deallocate old content of the message.
zmq_msg_close
(
msg_
);
// If we are in the middle of reading a message, just grab next part of it.
if
(
more_in
)
{
zmq_assert
(
inpipes
[
current_in
].
active
);
bool
fetched
=
inpipes
[
current_in
].
reader
->
read
(
msg_
);
zmq_assert
(
fetched
);
more_in
=
msg_
->
flags
&
ZMQ_MSG_MORE
;
if
(
!
more_in
)
{
current_in
++
;
if
(
current_in
>=
inpipes
.
size
())
current_in
=
0
;
}
return
0
;
}
// Round-robin over the pipes to get the next message.
for
(
int
count
=
inpipes
.
size
();
count
!=
0
;
count
--
)
{
// Try to fetch new message.
bool
fetched
;
if
(
!
inpipes
[
current_in
].
active
)
fetched
=
false
;
else
fetched
=
inpipes
[
current_in
].
reader
->
check_read
();
// If we have a message, create a prefix and return it to the caller.
if
(
fetched
)
{
int
rc
=
zmq_msg_init_size
(
msg_
,
inpipes
[
current_in
].
identity
.
size
());
zmq_assert
(
rc
==
0
);
memcpy
(
zmq_msg_data
(
msg_
),
inpipes
[
current_in
].
identity
.
data
(),
zmq_msg_size
(
msg_
));
more_in
=
true
;
return
0
;
}
// If me don't have a message, move to next pipe.
current_in
++
;
if
(
current_in
>=
inpipes
.
size
())
current_in
=
0
;
}
// No message is available. Initialise the output parameter
// to be a 0-byte message.
zmq_msg_init
(
msg_
);
errno
=
EAGAIN
;
return
-
1
;
}
bool
zmq
::
xrep_t
::
xhas_in
()
{
return
fq
.
has_in
();
// There are subsequent parts of the partly-read message available.
if
(
more_in
)
return
true
;
// Note that messing with current doesn't break the fairness of fair
// queueing algorithm. If there are no messages available current will
// get back to its original value. Otherwise it'll point to the first
// pipe holding messages, skipping only pipes with no messages available.
for
(
int
count
=
inpipes
.
size
();
count
!=
0
;
count
--
)
{
if
(
inpipes
[
current_in
].
active
&&
inpipes
[
current_in
].
reader
->
check_read
())
return
true
;
current_in
++
;
if
(
current_in
>=
inpipes
.
size
())
current_in
=
0
;
}
return
false
;
}
bool
zmq
::
xrep_t
::
xhas_out
()
...
...
src/xrep.hpp
View file @
ad6fa9d0
...
...
@@ -21,14 +21,15 @@
#define __ZMQ_XREP_HPP_INCLUDED__
#include <map>
#include <vector>
#include "socket_base.hpp"
#include "blob.hpp"
#include "fq.hpp"
namespace
zmq
{
// TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm.
class
xrep_t
:
public
socket_base_t
{
public
:
...
...
@@ -52,13 +53,39 @@ namespace zmq
private
:
// Inbound messages are fair-queued.
fq_t
fq
;
struct
inpipe_t
{
class
reader_t
*
reader
;
blob_t
identity
;
bool
active
;
};
// Inbound pipes with the names of corresponging peers.
typedef
std
::
vector
<
inpipe_t
>
inpipes_t
;
inpipes_t
inpipes
;
// The pipe we are currently reading from.
inpipes_t
::
size_type
current_in
;
// If true, more incoming message parts are expected.
bool
more_in
;
struct
outpipe_t
{
class
writer_t
*
writer
;
bool
active
;
};
// Outbound pipes indexed by the peer names.
typedef
std
::
map
<
blob_t
,
class
writer_t
*
>
outpipes_t
;
typedef
std
::
map
<
blob_t
,
outpipe_t
>
outpipes_t
;
outpipes_t
outpipes
;
// The pipe we are currently writing to.
class
writer_t
*
current_out
;
// If true, more outgoing message parts are expected.
bool
more_out
;
xrep_t
(
const
xrep_t
&
);
void
operator
=
(
const
xrep_t
&
);
};
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment