Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
2cef05d8
Commit
2cef05d8
authored
Dec 15, 2009
by
Martin Sustrik
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
reconnection mechanism added to tcp transport
parent
14b11230
Hide whitespace changes
Inline
Side-by-side
Showing
14 changed files
with
130 additions
and
37 deletions
+130
-37
i_inout.hpp
src/i_inout.hpp
+13
-2
io_object.hpp
src/io_object.hpp
+1
-0
io_thread.hpp
src/io_thread.hpp
+1
-0
object.hpp
src/object.hpp
+3
-2
session.cpp
src/session.cpp
+21
-2
session.hpp
src/session.hpp
+4
-1
zmq_connecter.cpp
src/zmq_connecter.cpp
+7
-3
zmq_connecter.hpp
src/zmq_connecter.hpp
+4
-1
zmq_connecter_init.cpp
src/zmq_connecter_init.cpp
+21
-15
zmq_connecter_init.hpp
src/zmq_connecter_init.hpp
+7
-4
zmq_engine.cpp
src/zmq_engine.cpp
+20
-3
zmq_engine.hpp
src/zmq_engine.hpp
+4
-1
zmq_listener_init.cpp
src/zmq_listener_init.cpp
+20
-2
zmq_listener_init.hpp
src/zmq_listener_init.hpp
+4
-1
No files found.
src/i_inout.hpp
View file @
2cef05d8
...
...
@@ -36,8 +36,19 @@ namespace zmq
// Flush all the previously written messages downstream.
virtual
void
flush
()
=
0
;
// Drop all the references to the engine.
virtual
void
detach
()
=
0
;
// Drop all the references to the engine. The parameter is the object
// to use to reconnect. If reconnection is not required, the argument
// is set to NULL.
virtual
void
detach
(
class
owned_t
*
reconnecter_
)
=
0
;
// Returns least loaded I/O thread.
virtual
class
io_thread_t
*
get_io_thread
()
=
0
;
// Return pointer to the owning socket.
virtual
class
socket_base_t
*
get_owner
()
=
0
;
// Returns the name of associated session.
virtual
const
char
*
get_session_name
()
=
0
;
};
}
...
...
src/io_object.hpp
View file @
2cef05d8
...
...
@@ -22,6 +22,7 @@
#include <stddef.h>
#include "stdint.hpp"
#include "poller.hpp"
#include "i_poll_events.hpp"
...
...
src/io_thread.hpp
View file @
2cef05d8
...
...
@@ -22,6 +22,7 @@
#include <vector>
#include "stdint.hpp"
#include "object.hpp"
#include "poller.hpp"
#include "i_poll_events.hpp"
...
...
src/object.hpp
View file @
2cef05d8
...
...
@@ -55,9 +55,10 @@ namespace zmq
void
unregister_endpoints
(
class
socket_base_t
*
socket_
);
class
socket_base_t
*
find_endpoint
(
const
char
*
addr_
);
// Derived object can use following functions to interact with
// global repositories. See dispatcher.hpp for function details.
// Returns number of thead slots in the dispatcher.
int
thread_slot_count
();
// Chooses least loaded I/O thread.
class
io_thread_t
*
choose_io_thread
(
uint64_t
taskset_
);
// Derived object can use these functions to send commands
...
...
src/session.cpp
View file @
2cef05d8
...
...
@@ -65,9 +65,13 @@ void zmq::session_t::flush ()
out_pipe
->
flush
();
}
void
zmq
::
session_t
::
detach
()
void
zmq
::
session_t
::
detach
(
owned_t
*
reconnecter_
)
{
// TODO: Start reconnection process here.
// Plug in the reconnecter object if any.
if
(
reconnecter_
)
{
send_plug
(
reconnecter_
);
send_own
(
owner
,
reconnecter_
);
}
// Engine is terminating itself. No need to deallocate it from here.
engine
=
NULL
;
...
...
@@ -77,6 +81,21 @@ void zmq::session_t::detach ()
term
();
}
zmq
::
io_thread_t
*
zmq
::
session_t
::
get_io_thread
()
{
return
choose_io_thread
(
options
.
affinity
);
}
class
zmq
::
socket_base_t
*
zmq
::
session_t
::
get_owner
()
{
return
owner
;
}
const
char
*
zmq
::
session_t
::
get_session_name
()
{
return
name
.
c_str
();
}
void
zmq
::
session_t
::
attach_pipes
(
class
reader_t
*
inpipe_
,
class
writer_t
*
outpipe_
)
{
...
...
src/session.hpp
View file @
2cef05d8
...
...
@@ -41,7 +41,10 @@ namespace zmq
bool
read
(
::
zmq_msg_t
*
msg_
);
bool
write
(
::
zmq_msg_t
*
msg_
);
void
flush
();
void
detach
();
void
detach
(
owned_t
*
reconnecter_
);
class
io_thread_t
*
get_io_thread
();
class
socket_base_t
*
get_owner
();
const
char
*
get_session_name
();
// i_endpoint interface implementation.
void
attach_pipes
(
class
reader_t
*
inpipe_
,
class
writer_t
*
outpipe_
);
...
...
src/zmq_connecter.cpp
View file @
2cef05d8
...
...
@@ -38,9 +38,13 @@ zmq::zmq_connecter_t::~zmq_connecter_t ()
{
}
int
zmq
::
zmq_connecter_t
::
set_address
(
const
char
*
addr_
)
int
zmq
::
zmq_connecter_t
::
set_address
(
const
char
*
addr
ess
_
)
{
return
tcp_connecter
.
set_address
(
addr_
);
int
rc
=
tcp_connecter
.
set_address
(
address_
);
if
(
rc
!=
0
)
return
rc
;
address
=
address_
;
return
0
;
}
void
zmq
::
zmq_connecter_t
::
process_plug
()
...
...
@@ -84,7 +88,7 @@ void zmq::zmq_connecter_t::out_event ()
// Create an init object.
io_thread_t
*
io_thread
=
choose_io_thread
(
options
.
affinity
);
zmq_connecter_init_t
*
init
=
new
zmq_connecter_init_t
(
io_thread
,
owner
,
fd
,
options
,
session_name
.
c_str
());
fd
,
options
,
session_name
.
c_str
()
,
address
.
c_str
()
);
zmq_assert
(
init
);
send_plug
(
init
);
send_own
(
owner
,
init
);
...
...
src/zmq_connecter.hpp
View file @
2cef05d8
...
...
@@ -40,7 +40,7 @@ namespace zmq
~
zmq_connecter_t
();
// Set IP address to connect to.
int
set_address
(
const
char
*
addr_
);
int
set_address
(
const
char
*
addr
ess
_
);
private
:
...
...
@@ -75,6 +75,9 @@ namespace zmq
// Name of the session associated with the connecter.
std
::
string
session_name
;
// Address to connect to.
std
::
string
address
;
zmq_connecter_t
(
const
zmq_connecter_t
&
);
void
operator
=
(
const
zmq_connecter_t
&
);
};
...
...
src/zmq_connecter_init.cpp
View file @
2cef05d8
...
...
@@ -25,13 +25,13 @@
zmq
::
zmq_connecter_init_t
::
zmq_connecter_init_t
(
io_thread_t
*
parent_
,
socket_base_t
*
owner_
,
fd_t
fd_
,
const
options_t
&
options_
,
const
char
*
session_name_
)
:
const
char
*
session_name_
,
const
char
*
address_
)
:
owned_t
(
parent_
,
owner_
),
options
(
options_
),
session_name
(
session_name_
)
{
// Create associated engine object.
engine
=
new
zmq_engine_t
(
parent_
,
fd_
,
options
);
engine
=
new
zmq_engine_t
(
parent_
,
fd_
,
options
,
true
,
address_
);
zmq_assert
(
engine
);
}
...
...
@@ -87,27 +87,33 @@ void zmq::zmq_connecter_init_t::flush ()
// We are not expecting any messages. No point in flushing.
}
void
zmq
::
zmq_connecter_init_t
::
detach
()
void
zmq
::
zmq_connecter_init_t
::
detach
(
owned_t
*
reconnecter_
)
{
// TODO: Start reconnection process here.
/*
// Create a connecter object to attempt reconnect. Ask it to wait for a
// while before reconnecting.
io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_connecter_t *connecter = new zmq_connecter_t (io_thread, owner,
options, session_name.c_str (), true);
connecter->set_address (...);
zmq_assert (connecter);
send_plug (connecter);
send_own (owner, connecter);
*/
// Plug in the reconnecter object.
zmq_assert
(
reconnecter_
);
send_plug
(
reconnecter_
);
send_own
(
owner
,
reconnecter_
);
// This function is called by engine when disconnection occurs.
// The engine will destroy itself, so we just drop the pointer here and
// start termination of the init object.
engine
=
NULL
;
term
();
}
zmq
::
io_thread_t
*
zmq
::
zmq_connecter_init_t
::
get_io_thread
()
{
return
choose_io_thread
(
options
.
affinity
);
}
class
zmq
::
socket_base_t
*
zmq
::
zmq_connecter_init_t
::
get_owner
()
{
return
owner
;
}
const
char
*
zmq
::
zmq_connecter_init_t
::
get_session_name
()
{
return
session_name
.
c_str
();
}
void
zmq
::
zmq_connecter_init_t
::
process_plug
()
...
...
src/zmq_connecter_init.hpp
View file @
2cef05d8
...
...
@@ -40,7 +40,8 @@ namespace zmq
public
:
zmq_connecter_init_t
(
class
io_thread_t
*
parent_
,
socket_base_t
*
owner_
,
fd_t
fd_
,
const
options_t
&
options
,
const
char
*
session_name_
);
fd_t
fd_
,
const
options_t
&
options
,
const
char
*
session_name_
,
const
char
*
address_
);
~
zmq_connecter_init_t
();
private
:
...
...
@@ -49,7 +50,10 @@ namespace zmq
bool
read
(
::
zmq_msg_t
*
msg_
);
bool
write
(
::
zmq_msg_t
*
msg_
);
void
flush
();
void
detach
();
void
detach
(
owned_t
*
reconnecter_
);
class
io_thread_t
*
get_io_thread
();
class
socket_base_t
*
get_owner
();
const
char
*
get_session_name
();
// Handlers for incoming commands.
void
process_plug
();
...
...
@@ -63,8 +67,7 @@ namespace zmq
// Associated socket options.
options_t
options
;
// Name of the session to bind new connection to. Makes sense only
// when 'connected' is true.
// Name of the session to bind new connection to.
std
::
string
session_name
;
zmq_connecter_init_t
(
const
zmq_connecter_init_t
&
);
...
...
src/zmq_engine.cpp
View file @
2cef05d8
...
...
@@ -25,7 +25,7 @@
#include "err.hpp"
zmq
::
zmq_engine_t
::
zmq_engine_t
(
io_thread_t
*
parent_
,
fd_t
fd_
,
const
options_t
&
options_
)
:
const
options_t
&
options_
,
bool
reconnect_
,
const
char
*
address_
)
:
io_object_t
(
parent_
),
inpos
(
NULL
),
insize
(
0
),
...
...
@@ -34,8 +34,12 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_,
outsize
(
0
),
encoder
(
out_batch_size
,
false
),
inout
(
NULL
),
options
(
options_
)
options
(
options_
),
reconnect
(
reconnect_
)
{
if
(
reconnect
)
address
=
address_
;
// Initialise the underlying socket.
int
rc
=
tcp_socket
.
open
(
fd_
,
options
.
sndbuf
,
options
.
rcvbuf
);
zmq_assert
(
rc
==
0
);
...
...
@@ -145,7 +149,20 @@ void zmq::zmq_engine_t::revive ()
void
zmq
::
zmq_engine_t
::
error
()
{
zmq_assert
(
inout
);
inout
->
detach
();
zmq_connecter_t
*
reconnecter
=
NULL
;
if
(
reconnect
)
{
// Create a connecter object to attempt reconnect.
// Ask it to wait for a while before reconnecting.
reconnecter
=
new
zmq_connecter_t
(
inout
->
get_io_thread
(),
inout
->
get_owner
(),
options
,
inout
->
get_session_name
(),
true
);
zmq_assert
(
reconnecter
);
reconnecter
->
set_address
(
address
.
c_str
());
}
inout
->
detach
(
reconnecter
);
unplug
();
delete
this
;
}
src/zmq_engine.hpp
View file @
2cef05d8
...
...
@@ -37,7 +37,7 @@ namespace zmq
public
:
zmq_engine_t
(
class
io_thread_t
*
parent_
,
fd_t
fd_
,
const
options_t
&
options_
);
const
options_t
&
options_
,
bool
reconnect_
,
const
char
*
address_
);
~
zmq_engine_t
();
// i_engine interface implementation.
...
...
@@ -69,6 +69,9 @@ namespace zmq
options_t
options
;
bool
reconnect
;
std
::
string
address
;
zmq_engine_t
(
const
zmq_engine_t
&
);
void
operator
=
(
const
zmq_engine_t
&
);
};
...
...
src/zmq_listener_init.cpp
View file @
2cef05d8
...
...
@@ -29,7 +29,7 @@ zmq::zmq_listener_init_t::zmq_listener_init_t (io_thread_t *parent_,
has_peer_identity
(
false
)
{
// Create associated engine object.
engine
=
new
zmq_engine_t
(
parent_
,
fd_
,
options
);
engine
=
new
zmq_engine_t
(
parent_
,
fd_
,
options
,
false
,
NULL
);
zmq_assert
(
engine
);
}
...
...
@@ -93,8 +93,11 @@ void zmq::zmq_listener_init_t::flush ()
term
();
}
void
zmq
::
zmq_listener_init_t
::
detach
()
void
zmq
::
zmq_listener_init_t
::
detach
(
owned_t
*
reconnecter_
)
{
// On the listening side of the connection we are never reconnecting.
zmq_assert
(
reconnecter_
==
NULL
);
// This function is called by engine when disconnection occurs.
// The engine will destroy itself, so we just drop the pointer here and
// start termination of the init object.
...
...
@@ -102,6 +105,21 @@ void zmq::zmq_listener_init_t::detach ()
term
();
}
zmq
::
io_thread_t
*
zmq
::
zmq_listener_init_t
::
get_io_thread
()
{
return
choose_io_thread
(
options
.
affinity
);
}
class
zmq
::
socket_base_t
*
zmq
::
zmq_listener_init_t
::
get_owner
()
{
return
owner
;
}
const
char
*
zmq
::
zmq_listener_init_t
::
get_session_name
()
{
zmq_assert
(
false
);
}
void
zmq
::
zmq_listener_init_t
::
process_plug
()
{
zmq_assert
(
engine
);
...
...
src/zmq_listener_init.hpp
View file @
2cef05d8
...
...
@@ -49,7 +49,10 @@ namespace zmq
bool
read
(
::
zmq_msg_t
*
msg_
);
bool
write
(
::
zmq_msg_t
*
msg_
);
void
flush
();
void
detach
();
void
detach
(
owned_t
*
reconnecter_
);
class
io_thread_t
*
get_io_thread
();
class
socket_base_t
*
get_owner
();
const
char
*
get_session_name
();
// Handlers for incoming commands.
void
process_plug
();
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment