Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
4405250d
Commit
4405250d
authored
Feb 13, 2010
by
Martin Sustrik
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Multi-hop REQ/REP, part IX., pass the peer identity as far as socket_base_t
parent
f5ce81f2
Hide whitespace changes
Inline
Side-by-side
Showing
10 changed files
with
46 additions
and
16 deletions
+46
-16
command.cpp
src/command.cpp
+4
-0
command.hpp
src/command.hpp
+2
-0
object.cpp
src/object.cpp
+24
-4
object.hpp
src/object.hpp
+2
-2
options.cpp
src/options.cpp
+1
-1
options.hpp
src/options.hpp
+2
-3
pgm_socket.cpp
src/pgm_socket.cpp
+5
-2
session.cpp
src/session.cpp
+1
-1
socket_base.cpp
src/socket_base.cpp
+3
-2
socket_base.hpp
src/socket_base.hpp
+2
-1
No files found.
src/command.cpp
View file @
4405250d
...
...
@@ -28,6 +28,10 @@ void zmq::deallocate_command (command_t *cmd_)
if
(
cmd_
->
args
.
attach
.
peer_identity
)
free
(
cmd_
->
args
.
attach
.
peer_identity
);
break
;
case
command_t
:
:
bind
:
if
(
cmd_
->
args
.
bind
.
peer_identity
)
free
(
cmd_
->
args
.
bind
.
peer_identity
);
break
;
default
:
/* noop */
;
}
...
...
src/command.hpp
View file @
4405250d
...
...
@@ -75,6 +75,8 @@ namespace zmq
struct
{
class
reader_t
*
in_pipe
;
class
writer_t
*
out_pipe
;
unsigned
char
peer_identity_size
;
unsigned
char
*
peer_identity
;
}
bind
;
// Sent by pipe writer to inform dormant pipe reader that there
...
...
src/object.cpp
View file @
4405250d
...
...
@@ -89,7 +89,9 @@ void zmq::object_t::process_command (command_t &cmd_)
break
;
case
command_t
:
:
bind
:
process_bind
(
cmd_
.
args
.
bind
.
in_pipe
,
cmd_
.
args
.
bind
.
out_pipe
);
process_bind
(
cmd_
.
args
.
bind
.
in_pipe
,
cmd_
.
args
.
bind
.
out_pipe
,
blob_t
(
cmd_
.
args
.
bind
.
peer_identity
,
cmd_
.
args
.
bind
.
peer_identity_size
));
process_seqnum
();
break
;
...
...
@@ -198,7 +200,9 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,
cmd
.
args
.
attach
.
peer_identity
=
NULL
;
}
else
{
cmd
.
args
.
attach
.
peer_identity_size
=
peer_identity_
.
size
();
zmq_assert
(
peer_identity_
.
size
()
<=
0xff
);
cmd
.
args
.
attach
.
peer_identity_size
=
(
unsigned
char
)
peer_identity_
.
size
();
cmd
.
args
.
attach
.
peer_identity
=
(
unsigned
char
*
)
malloc
(
peer_identity_
.
size
());
zmq_assert
(
cmd
.
args
.
attach
.
peer_identity_size
);
...
...
@@ -209,7 +213,8 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,
}
void
zmq
::
object_t
::
send_bind
(
socket_base_t
*
destination_
,
reader_t
*
in_pipe_
,
writer_t
*
out_pipe_
,
bool
inc_seqnum_
)
reader_t
*
in_pipe_
,
writer_t
*
out_pipe_
,
const
blob_t
&
peer_identity_
,
bool
inc_seqnum_
)
{
if
(
inc_seqnum_
)
destination_
->
inc_seqnum
();
...
...
@@ -219,6 +224,20 @@ void zmq::object_t::send_bind (socket_base_t *destination_,
cmd
.
type
=
command_t
::
bind
;
cmd
.
args
.
bind
.
in_pipe
=
in_pipe_
;
cmd
.
args
.
bind
.
out_pipe
=
out_pipe_
;
if
(
peer_identity_
.
empty
())
{
cmd
.
args
.
bind
.
peer_identity_size
=
0
;
cmd
.
args
.
bind
.
peer_identity
=
NULL
;
}
else
{
zmq_assert
(
peer_identity_
.
size
()
<=
0xff
);
cmd
.
args
.
bind
.
peer_identity_size
=
(
unsigned
char
)
peer_identity_
.
size
();
cmd
.
args
.
bind
.
peer_identity
=
(
unsigned
char
*
)
malloc
(
peer_identity_
.
size
());
zmq_assert
(
cmd
.
args
.
bind
.
peer_identity_size
);
memcpy
(
cmd
.
args
.
bind
.
peer_identity
,
peer_identity_
.
data
(),
peer_identity_
.
size
());
}
send_command
(
cmd
);
}
...
...
@@ -293,7 +312,8 @@ void zmq::object_t::process_attach (i_engine *engine_,
zmq_assert
(
false
);
}
void
zmq
::
object_t
::
process_bind
(
reader_t
*
in_pipe_
,
writer_t
*
out_pipe_
)
void
zmq
::
object_t
::
process_bind
(
reader_t
*
in_pipe_
,
writer_t
*
out_pipe_
,
const
blob_t
&
peer_identity_
)
{
zmq_assert
(
false
);
}
...
...
src/object.hpp
View file @
4405250d
...
...
@@ -69,7 +69,7 @@ namespace zmq
bool
inc_seqnum_
=
true
);
void
send_bind
(
class
socket_base_t
*
destination_
,
class
reader_t
*
in_pipe_
,
class
writer_t
*
out_pipe_
,
bool
inc_seqnum_
=
true
);
const
blob_t
&
peer_identity_
,
bool
inc_seqnum_
=
true
);
void
send_revive
(
class
object_t
*
destination_
);
void
send_pipe_term
(
class
writer_t
*
destination_
);
void
send_pipe_term_ack
(
class
reader_t
*
destination_
);
...
...
@@ -86,7 +86,7 @@ namespace zmq
virtual
void
process_attach
(
struct
i_engine
*
engine_
,
const
blob_t
&
peer_identity_
);
virtual
void
process_bind
(
class
reader_t
*
in_pipe_
,
class
writer_t
*
out_pipe_
);
class
writer_t
*
out_pipe_
,
const
blob_t
&
peer_identity_
);
virtual
void
process_revive
();
virtual
void
process_pipe_term
();
virtual
void
process_pipe_term_ack
();
...
...
src/options.cpp
View file @
4405250d
...
...
@@ -76,7 +76,7 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
return
0
;
case
ZMQ_IDENTITY
:
identity
.
assign
((
const
char
*
)
optval_
,
optvallen_
);
identity
.
assign
((
const
unsigned
char
*
)
optval_
,
optvallen_
);
return
0
;
case
ZMQ_RATE
:
...
...
src/options.hpp
View file @
4405250d
...
...
@@ -20,10 +20,9 @@
#ifndef __ZMQ_OPTIONS_HPP_INCLUDED__
#define __ZMQ_OPTIONS_HPP_INCLUDED__
#include <string>
#include "stddef.h"
#include "stdint.hpp"
#include "blob.hpp"
namespace
zmq
{
...
...
@@ -38,7 +37,7 @@ namespace zmq
int64_t
lwm
;
int64_t
swap
;
uint64_t
affinity
;
std
::
string
identity
;
blob_t
identity
;
// Maximum tranfer rate [kb/s]. Default 100kb/s.
uint32_t
rate
;
...
...
src/pgm_socket.cpp
View file @
4405250d
...
...
@@ -89,8 +89,11 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
if
(
options
.
identity
.
size
()
>
0
)
{
// Create gsi from identity string.
gsi_base
=
options
.
identity
;
// Create gsi from identity.
// TODO: We assume that identity is standard C string here.
// What if it contains binary zeroes?
gsi_base
.
assign
((
const
char
*
)
options
.
identity
.
data
(),
options
.
identity
.
size
());
}
else
{
// Generate random gsi.
...
...
src/session.cpp
View file @
4405250d
...
...
@@ -245,7 +245,7 @@ void zmq::session_t::process_attach (i_engine *engine_,
}
send_bind
(
owner
,
outbound
?
&
outbound
->
reader
:
NULL
,
inbound
?
&
inbound
->
writer
:
NULL
);
inbound
?
&
inbound
->
writer
:
NULL
,
peer_identity
);
}
// Plug in the engine.
...
...
src/socket_base.cpp
View file @
4405250d
...
...
@@ -171,7 +171,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// was incremented in find_endpoint function. The callee is notified
// about the fact via the last parameter.
send_bind
(
peer
,
out_pipe
?
&
out_pipe
->
reader
:
NULL
,
in_pipe
?
&
in_pipe
->
writer
:
NULL
,
false
);
in_pipe
?
&
in_pipe
->
writer
:
NULL
,
options
.
identity
,
false
);
return
0
;
}
...
...
@@ -564,7 +564,8 @@ void zmq::socket_base_t::process_own (owned_t *object_)
io_objects
.
insert
(
object_
);
}
void
zmq
::
socket_base_t
::
process_bind
(
reader_t
*
in_pipe_
,
writer_t
*
out_pipe_
)
void
zmq
::
socket_base_t
::
process_bind
(
reader_t
*
in_pipe_
,
writer_t
*
out_pipe_
,
const
blob_t
&
peer_identity_
)
{
attach_pipes
(
in_pipe_
,
out_pipe_
);
}
...
...
src/socket_base.hpp
View file @
4405250d
...
...
@@ -122,7 +122,8 @@ namespace zmq
// Handlers for incoming commands.
void
process_own
(
class
owned_t
*
object_
);
void
process_bind
(
class
reader_t
*
in_pipe_
,
class
writer_t
*
out_pipe_
);
void
process_bind
(
class
reader_t
*
in_pipe_
,
class
writer_t
*
out_pipe_
,
const
blob_t
&
peer_identity_
);
void
process_term_req
(
class
owned_t
*
object_
);
void
process_term_ack
();
void
process_seqnum
();
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment