Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
5a343fc2
Commit
5a343fc2
authored
May 29, 2018
by
Simon Giesecke
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Problem: stream_t/router_t access data member of base class
Solution: pull up functionality to base class
parent
ab3895a4
Show whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
99 additions
and
59 deletions
+99
-59
router.cpp
src/router.cpp
+27
-41
socket_base.cpp
src/socket_base.cpp
+44
-2
socket_base.hpp
src/socket_base.hpp
+19
-3
stream.cpp
src/stream.cpp
+9
-13
No files found.
src/router.cpp
View file @
5a343fc2
...
...
@@ -63,8 +63,6 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
zmq
::
router_t
::~
router_t
()
{
zmq_assert
(
_anonymous_pipes
.
empty
());
;
zmq_assert
(
_out_pipes
.
empty
());
_prefetched_id
.
close
();
_prefetched_msg
.
close
();
}
...
...
@@ -189,19 +187,19 @@ int zmq::router_t::xsend (msg_t *msg_)
// Find the pipe associated with the routing id stored in the prefix.
// If there's no such pipe just silently ignore the message, unless
// router_mandatory is set.
blob_t
routing_id
(
static_cast
<
unsigned
char
*>
(
msg_
->
data
()),
msg_
->
size
(),
zmq
::
reference_tag_t
());
out_pipes_t
::
iterator
it
=
_out_pipes
.
find
(
routing_id
);
// router_mandatory is set.
;
out_pipe_t
*
out_pipe
=
lookup_out_pipe
(
blob_t
(
static_cast
<
unsigned
char
*>
(
msg_
->
data
()),
msg_
->
size
(),
zmq
::
reference_tag_t
())
);
if
(
it
!=
_out_pipes
.
end
()
)
{
_current_out
=
it
->
second
.
pipe
;
if
(
out_pipe
)
{
_current_out
=
out_pipe
->
pipe
;
// Check whether pipe is closed or not
if
(
!
_current_out
->
check_write
())
{
// Check whether pipe is full or not
bool
pipe_full
=
!
_current_out
->
check_hwm
();
it
->
second
.
active
=
false
;
out_pipe
->
active
=
false
;
_current_out
=
NULL
;
if
(
_mandatory
)
{
...
...
@@ -398,6 +396,11 @@ bool zmq::router_t::xhas_in ()
return
true
;
}
static
bool
check_pipe_hwm
(
const
zmq
::
pipe_t
&
pipe
)
{
return
pipe
.
check_hwm
();
}
bool
zmq
::
router_t
::
xhas_out
()
{
// In theory, ROUTER socket is always ready for writing (except when
...
...
@@ -407,12 +410,7 @@ bool zmq::router_t::xhas_out ()
if
(
!
_mandatory
)
return
true
;
bool
has_out
=
false
;
out_pipes_t
::
iterator
it
;
for
(
it
=
_out_pipes
.
begin
();
it
!=
_out_pipes
.
end
();
++
it
)
has_out
|=
it
->
second
.
pipe
->
check_hwm
();
return
has_out
;
return
any_of_out_pipes
(
check_pipe_hwm
);
}
const
zmq
::
blob_t
&
zmq
::
router_t
::
get_credential
()
const
...
...
@@ -426,14 +424,13 @@ int zmq::router_t::get_peer_state (const void *routing_id_,
int
res
=
0
;
blob_t
routing_id_blob
((
unsigned
char
*
)
routing_id_
,
routing_id_size_
);
out_pipes_t
::
const_iterator
it
=
_out_pipes
.
find
(
routing_id_blob
);
if
(
it
==
_out_pipes
.
end
()
)
{
const
out_pipe_t
*
out_pipe
=
lookup_out_pipe
(
routing_id_blob
);
if
(
!
out_pipe
)
{
errno
=
EHOSTUNREACH
;
return
-
1
;
}
const
out_pipe_t
&
outpipe
=
it
->
second
;
if
(
outpipe
.
pipe
->
check_hwm
())
if
(
out_pipe
->
pipe
->
check_hwm
())
res
|=
ZMQ_POLLOUT
;
/** \todo does it make any sense to check the inpipe as well? */
...
...
@@ -444,7 +441,6 @@ int zmq::router_t::get_peer_state (const void *routing_id_,
bool
zmq
::
router_t
::
identify_peer
(
pipe_t
*
pipe_
)
{
msg_t
msg
;
bool
ok
;
blob_t
routing_id
;
const
std
::
string
connect_routing_id
=
extract_connect_routing_id
();
...
...
@@ -453,7 +449,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
reinterpret_cast
<
const
unsigned
char
*>
(
connect_routing_id
.
c_str
()),
connect_routing_id
.
length
());
// Not allowed to duplicate an existing rid
zmq_assert
(
0
==
_out_pipes
.
count
(
routing_id
));
zmq_assert
(
!
has_out_pipe
(
routing_id
));
}
else
if
(
options
.
raw_socket
)
{
// Always assign an integral routing id for raw-socket
...
...
@@ -464,7 +460,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
}
else
if
(
!
options
.
raw_socket
)
{
// Pick up handshake cases and also case where next integral routing id is set
msg
.
init
();
ok
=
pipe_
->
read
(
&
msg
);
bool
ok
=
pipe_
->
read
(
&
msg
);
if
(
!
ok
)
return
false
;
...
...
@@ -478,10 +474,13 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
}
else
{
routing_id
.
set
(
static_cast
<
unsigned
char
*>
(
msg
.
data
()),
msg
.
size
());
out_pipes_t
::
iterator
it
=
_out_pipes
.
find
(
routing_id
);
msg
.
close
();
if
(
it
!=
_out_pipes
.
end
())
{
// Try to remove an existing routing id entry to allow the new
// connection to take the routing id.
out_pipe_t
existing_outpipe
=
try_erase_out_pipe
(
routing_id
);
if
(
existing_outpipe
.
pipe
)
{
if
(
!
_handover
)
// Ignore peers with duplicate ID
return
false
;
...
...
@@ -494,19 +493,10 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
put_uint32
(
buf
+
1
,
_next_integral_routing_id
++
);
blob_t
new_routing_id
(
buf
,
sizeof
buf
);
it
->
second
.
pipe
->
set_router_socket_routing_id
(
new_routing_id
);
out_pipe_t
existing_outpipe
=
{
it
->
second
.
pipe
,
it
->
second
.
active
};
existing_outpipe
.
pipe
->
set_router_socket_routing_id
(
new_routing_id
);
ok
=
_out_pipes
.
ZMQ_MAP_INSERT_OR_EMPLACE
(
ZMQ_MOVE
(
new_routing_id
),
existing_outpipe
)
.
second
;
zmq_assert
(
ok
);
// Remove the existing routing id entry to allow the new
// connection to take the routing id.
_out_pipes
.
erase
(
it
);
add_out_pipe
(
ZMQ_MOVE
(
new_routing_id
),
existing_outpipe
.
pipe
);
if
(
existing_outpipe
.
pipe
==
_current_in
)
_terminate_current_in
=
true
;
...
...
@@ -517,11 +507,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
}
pipe_
->
set_router_socket_routing_id
(
routing_id
);
// Add the record into output pipes lookup table
out_pipe_t
outpipe
=
{
pipe_
,
true
};
ok
=
_out_pipes
.
ZMQ_MAP_INSERT_OR_EMPLACE
(
ZMQ_MOVE
(
routing_id
),
outpipe
)
.
second
;
zmq_assert
(
ok
);
add_out_pipe
(
ZMQ_MOVE
(
routing_id
),
pipe_
);
return
true
;
}
src/socket_base.cpp
View file @
5a343fc2
...
...
@@ -1813,9 +1813,51 @@ std::string zmq::routing_socket_base_t::extract_connect_routing_id ()
return
res
;
}
void
zmq
::
routing_socket_base_t
::
add_out_pipe
(
blob_t
routing_id
,
pipe_t
*
pipe_
)
{
// Add the record into output pipes lookup table
const
out_pipe_t
outpipe
=
{
pipe_
,
true
};
const
bool
ok
=
_out_pipes
.
ZMQ_MAP_INSERT_OR_EMPLACE
(
ZMQ_MOVE
(
routing_id
),
outpipe
)
.
second
;
zmq_assert
(
ok
);
}
bool
zmq
::
routing_socket_base_t
::
has_out_pipe
(
const
blob_t
&
routing_id
)
const
{
return
0
!=
_out_pipes
.
count
(
routing_id
);
}
zmq
::
routing_socket_base_t
::
out_pipe_t
*
zmq
::
routing_socket_base_t
::
lookup_out_pipe
(
const
blob_t
&
routing_id
)
{
// TODO we could probably avoid constructor a temporary blob_t to call this function
out_pipes_t
::
iterator
it
=
_out_pipes
.
find
(
routing_id
);
return
it
==
_out_pipes
.
end
()
?
NULL
:
&
it
->
second
;
}
const
zmq
::
routing_socket_base_t
::
out_pipe_t
*
zmq
::
routing_socket_base_t
::
lookup_out_pipe
(
const
blob_t
&
routing_id
)
const
{
// TODO we could probably avoid constructor a temporary blob_t to call this function
out_pipes_t
::
const_iterator
it
=
_out_pipes
.
find
(
routing_id
);
return
it
==
_out_pipes
.
end
()
?
NULL
:
&
it
->
second
;
}
void
zmq
::
routing_socket_base_t
::
erase_out_pipe
(
pipe_t
*
pipe_
)
{
out_pipes_t
::
iterator
it
=
_out_pipes
.
find
(
pipe_
->
get_routing_id
());
zmq_assert
(
it
!=
_out_pipes
.
end
());
const
size_t
erased
=
_out_pipes
.
erase
(
pipe_
->
get_routing_id
());
zmq_assert
(
erased
);
}
zmq
::
routing_socket_base_t
::
out_pipe_t
zmq
::
routing_socket_base_t
::
try_erase_out_pipe
(
const
blob_t
&
routing_id
)
{
const
out_pipes_t
::
iterator
it
=
_out_pipes
.
find
(
routing_id
);
out_pipe_t
res
=
{
NULL
,
false
};
if
(
it
!=
_out_pipes
.
end
())
{
res
=
it
->
second
;
_out_pipes
.
erase
(
it
);
}
return
res
;
}
src/socket_base.hpp
View file @
5a343fc2
...
...
@@ -312,19 +312,35 @@ class routing_socket_base_t : public socket_base_t
// own methods
std
::
string
extract_connect_routing_id
();
void
erase_out_pipe
(
pipe_t
*
pipe_
);
struct
out_pipe_t
{
pipe_t
*
pipe
;
bool
active
;
};
void
add_out_pipe
(
blob_t
routing_id
,
pipe_t
*
pipe_
);
bool
has_out_pipe
(
const
blob_t
&
routing_id
)
const
;
out_pipe_t
*
lookup_out_pipe
(
const
blob_t
&
routing_id
);
const
out_pipe_t
*
lookup_out_pipe
(
const
blob_t
&
routing_id
)
const
;
void
erase_out_pipe
(
pipe_t
*
pipe_
);
out_pipe_t
try_erase_out_pipe
(
const
blob_t
&
routing_id
);
template
<
typename
Func
>
bool
any_of_out_pipes
(
Func
func
)
{
bool
res
=
false
;
for
(
out_pipes_t
::
iterator
it
=
_out_pipes
.
begin
();
it
!=
_out_pipes
.
end
();
++
it
)
{
if
(
res
|=
func
(
*
it
->
second
.
pipe
))
break
;
}
return
res
;
}
private
:
// Outbound pipes indexed by the peer IDs.
typedef
std
::
map
<
blob_t
,
out_pipe_t
>
out_pipes_t
;
out_pipes_t
_out_pipes
;
private
:
// Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types
std
::
string
_connect_routing_id
;
};
...
...
src/stream.cpp
View file @
5a343fc2
...
...
@@ -95,14 +95,15 @@ int zmq::stream_t::xsend (msg_t *msg_)
if
(
msg_
->
flags
()
&
msg_t
::
more
)
{
// Find the pipe associated with the routing id stored in the prefix.
// If there's no such pipe return an error
blob_t
routing_id
(
static_cast
<
unsigned
char
*>
(
msg_
->
data
()),
msg_
->
size
());
out_pipes_t
::
iterator
it
=
_out_pipes
.
find
(
routing_id
);
if
(
it
!=
_out_pipes
.
end
())
{
_current_out
=
it
->
second
.
pipe
;
out_pipe_t
*
out_pipe
=
lookup_out_pipe
(
blob_t
(
static_cast
<
unsigned
char
*>
(
msg_
->
data
()),
msg_
->
size
(),
reference_tag_t
()));
if
(
out_pipe
)
{
_current_out
=
out_pipe
->
pipe
;
if
(
!
_current_out
->
check_write
())
{
it
->
second
.
active
=
false
;
out_pipe
->
active
=
false
;
_current_out
=
NULL
;
errno
=
EAGAIN
;
return
-
1
;
...
...
@@ -275,7 +276,7 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_)
reinterpret_cast
<
const
unsigned
char
*>
(
connect_routing_id
.
c_str
()),
connect_routing_id
.
length
());
// Not allowed to duplicate an existing rid
zmq_assert
(
0
==
_out_pipes
.
count
(
routing_id
));
zmq_assert
(
!
has_out_pipe
(
routing_id
));
}
else
{
put_uint32
(
buffer
+
1
,
_next_integral_routing_id
++
);
routing_id
.
set
(
buffer
,
sizeof
buffer
);
...
...
@@ -284,10 +285,5 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_)
static_cast
<
unsigned
char
>
(
routing_id
.
size
());
}
pipe_
->
set_router_socket_routing_id
(
routing_id
);
// Add the record into output pipes lookup table
out_pipe_t
outpipe
=
{
pipe_
,
true
};
const
bool
ok
=
_out_pipes
.
ZMQ_MAP_INSERT_OR_EMPLACE
(
ZMQ_MOVE
(
routing_id
),
outpipe
)
.
second
;
zmq_assert
(
ok
);
add_out_pipe
(
ZMQ_MOVE
(
routing_id
),
pipe_
);
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment