Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
973d13d5
Commit
973d13d5
authored
Dec 03, 2013
by
Martin Hurton
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Code cleanup
parent
39e2b799
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
37 additions
and
44 deletions
+37
-44
session_base.cpp
src/session_base.cpp
+31
-38
session_base.hpp
src/session_base.hpp
+5
-5
stream_engine.cpp
src/stream_engine.cpp
+1
-1
No files found.
src/session_base.cpp
View file @
973d13d5
...
@@ -33,13 +33,13 @@
...
@@ -33,13 +33,13 @@
#include "req.hpp"
#include "req.hpp"
zmq
::
session_base_t
*
zmq
::
session_base_t
::
create
(
class
io_thread_t
*
io_thread_
,
zmq
::
session_base_t
*
zmq
::
session_base_t
::
create
(
class
io_thread_t
*
io_thread_
,
bool
connect
_
,
class
socket_base_t
*
socket_
,
const
options_t
&
options_
,
bool
active
_
,
class
socket_base_t
*
socket_
,
const
options_t
&
options_
,
const
address_t
*
addr_
)
const
address_t
*
addr_
)
{
{
session_base_t
*
s
=
NULL
;
session_base_t
*
s
=
NULL
;
switch
(
options_
.
type
)
{
switch
(
options_
.
type
)
{
case
ZMQ_REQ
:
case
ZMQ_REQ
:
s
=
new
(
std
::
nothrow
)
req_session_t
(
io_thread_
,
connect
_
,
s
=
new
(
std
::
nothrow
)
req_session_t
(
io_thread_
,
active
_
,
socket_
,
options_
,
addr_
);
socket_
,
options_
,
addr_
);
break
;
break
;
case
ZMQ_DEALER
:
case
ZMQ_DEALER
:
...
@@ -53,7 +53,7 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
...
@@ -53,7 +53,7 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
case
ZMQ_PULL
:
case
ZMQ_PULL
:
case
ZMQ_PAIR
:
case
ZMQ_PAIR
:
case
ZMQ_STREAM
:
case
ZMQ_STREAM
:
s
=
new
(
std
::
nothrow
)
session_base_t
(
io_thread_
,
connect
_
,
s
=
new
(
std
::
nothrow
)
session_base_t
(
io_thread_
,
active
_
,
socket_
,
options_
,
addr_
);
socket_
,
options_
,
addr_
);
break
;
break
;
default
:
default
:
...
@@ -65,11 +65,11 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
...
@@ -65,11 +65,11 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
}
}
zmq
::
session_base_t
::
session_base_t
(
class
io_thread_t
*
io_thread_
,
zmq
::
session_base_t
::
session_base_t
(
class
io_thread_t
*
io_thread_
,
bool
connect
_
,
class
socket_base_t
*
socket_
,
const
options_t
&
options_
,
bool
active
_
,
class
socket_base_t
*
socket_
,
const
options_t
&
options_
,
const
address_t
*
addr_
)
:
const
address_t
*
addr_
)
:
own_t
(
io_thread_
,
options_
),
own_t
(
io_thread_
,
options_
),
io_object_t
(
io_thread_
),
io_object_t
(
io_thread_
),
connect
(
connect
_
),
active
(
active
_
),
pipe
(
NULL
),
pipe
(
NULL
),
zap_pipe
(
NULL
),
zap_pipe
(
NULL
),
incomplete_in
(
false
),
incomplete_in
(
false
),
...
@@ -177,23 +177,22 @@ void zmq::session_base_t::flush ()
...
@@ -177,23 +177,22 @@ void zmq::session_base_t::flush ()
void
zmq
::
session_base_t
::
clean_pipes
()
void
zmq
::
session_base_t
::
clean_pipes
()
{
{
if
(
pipe
)
{
zmq_assert
(
pipe
!=
NULL
);
// Get rid of half-processed messages in the out pipe. Flush any
// Get rid of half-processed messages in the out pipe. Flush any
// unflushed messages upstream.
// unflushed messages upstream.
pipe
->
rollback
();
pipe
->
rollback
();
pipe
->
flush
();
pipe
->
flush
();
// Remove any half-read message from the in pipe.
// Remove any half-read message from the in pipe.
while
(
incomplete_in
)
{
while
(
incomplete_in
)
{
msg_t
msg
;
msg_t
msg
;
int
rc
=
msg
.
init
();
int
rc
=
msg
.
init
();
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
rc
=
pull_msg
(
&
msg
);
rc
=
pull_msg
(
&
msg
);
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
rc
=
msg
.
close
();
rc
=
msg
.
close
();
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
}
}
}
}
}
...
@@ -208,9 +207,8 @@ void zmq::session_base_t::pipe_terminated (pipe_t *pipe_)
...
@@ -208,9 +207,8 @@ void zmq::session_base_t::pipe_terminated (pipe_t *pipe_)
// If this is our current pipe, remove it
// If this is our current pipe, remove it
pipe
=
NULL
;
pipe
=
NULL
;
else
else
if
(
pipe_
==
zap_pipe
)
{
if
(
pipe_
==
zap_pipe
)
zap_pipe
=
NULL
;
zap_pipe
=
NULL
;
}
else
else
// Remove the pipe from the detached pipes set
// Remove the pipe from the detached pipes set
terminating_pipes
.
erase
(
pipe_
);
terminating_pipes
.
erase
(
pipe_
);
...
@@ -275,7 +273,7 @@ zmq::socket_base_t *zmq::session_base_t::get_socket ()
...
@@ -275,7 +273,7 @@ zmq::socket_base_t *zmq::session_base_t::get_socket ()
void
zmq
::
session_base_t
::
process_plug
()
void
zmq
::
session_base_t
::
process_plug
()
{
{
if
(
connect
)
if
(
active
)
start_connecting
(
false
);
start_connecting
(
false
);
}
}
...
@@ -364,16 +362,19 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
...
@@ -364,16 +362,19 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
engine
->
plug
(
io_thread
,
this
);
engine
->
plug
(
io_thread
,
this
);
}
}
void
zmq
::
session_base_t
::
detach
()
void
zmq
::
session_base_t
::
engine_error
()
{
{
// Engine is dead. Let's forget about it.
// Engine is dead. Let's forget about it.
engine
=
NULL
;
engine
=
NULL
;
// Remove any half-done messages from the pipes.
// Remove any half-done messages from the pipes.
clean_pipes
();
if
(
pipe
)
clean_pipes
();
// Send the event to the derived class.
if
(
active
)
detached
();
reconnect
();
else
terminate
();
// Just in case there's only a delimiter in the pipe.
// Just in case there's only a delimiter in the pipe.
if
(
pipe
)
if
(
pipe
)
...
@@ -432,7 +433,6 @@ void zmq::session_base_t::proceed_with_term ()
...
@@ -432,7 +433,6 @@ void zmq::session_base_t::proceed_with_term ()
void
zmq
::
session_base_t
::
timer_event
(
int
id_
)
void
zmq
::
session_base_t
::
timer_event
(
int
id_
)
{
{
// Linger period expired. We can proceed with termination even though
// Linger period expired. We can proceed with termination even though
// there are still pending messages to be sent.
// there are still pending messages to be sent.
zmq_assert
(
id_
==
linger_timer_id
);
zmq_assert
(
id_
==
linger_timer_id
);
...
@@ -443,14 +443,8 @@ void zmq::session_base_t::timer_event (int id_)
...
@@ -443,14 +443,8 @@ void zmq::session_base_t::timer_event (int id_)
pipe
->
terminate
(
false
);
pipe
->
terminate
(
false
);
}
}
void
zmq
::
session_base_t
::
detached
()
void
zmq
::
session_base_t
::
reconnect
()
{
{
// Transient session self-destructs after peer disconnects.
if
(
!
connect
)
{
terminate
();
return
;
}
// For delayed connect situations, terminate the pipe
// For delayed connect situations, terminate the pipe
// and reestablish later on
// and reestablish later on
if
(
pipe
&&
options
.
immediate
==
1
if
(
pipe
&&
options
.
immediate
==
1
...
@@ -475,7 +469,7 @@ void zmq::session_base_t::detached ()
...
@@ -475,7 +469,7 @@ void zmq::session_base_t::detached ()
void
zmq
::
session_base_t
::
start_connecting
(
bool
wait_
)
void
zmq
::
session_base_t
::
start_connecting
(
bool
wait_
)
{
{
zmq_assert
(
connect
);
zmq_assert
(
active
);
// Choose I/O thread to run connecter in. Given that we are already
// Choose I/O thread to run connecter in. Given that we are already
// running in an I/O thread, there must be at least one available.
// running in an I/O thread, there must be at least one available.
...
@@ -506,12 +500,11 @@ void zmq::session_base_t::start_connecting (bool wait_)
...
@@ -506,12 +500,11 @@ void zmq::session_base_t::start_connecting (bool wait_)
tipc_connecter_t
*
connecter
=
new
(
std
::
nothrow
)
tipc_connecter_t
(
tipc_connecter_t
*
connecter
=
new
(
std
::
nothrow
)
tipc_connecter_t
(
io_thread
,
this
,
options
,
addr
,
wait_
);
io_thread
,
this
,
options
,
addr
,
wait_
);
alloc_assert
(
connecter
);
alloc_assert
(
connecter
);
launch_child
(
connecter
);
launch_child
(
connecter
);
return
;
return
;
}
}
#endif
#endif
#ifdef ZMQ_HAVE_OPENPGM
#ifdef ZMQ_HAVE_OPENPGM
// Both PGM and EPGM transports are using the same infrastructure.
// Both PGM and EPGM transports are using the same infrastructure.
...
...
src/session_base.hpp
View file @
973d13d5
...
@@ -46,7 +46,7 @@ namespace zmq
...
@@ -46,7 +46,7 @@ namespace zmq
// Create a session of the particular type.
// Create a session of the particular type.
static
session_base_t
*
create
(
zmq
::
io_thread_t
*
io_thread_
,
static
session_base_t
*
create
(
zmq
::
io_thread_t
*
io_thread_
,
bool
connect
_
,
zmq
::
socket_base_t
*
socket_
,
bool
active
_
,
zmq
::
socket_base_t
*
socket_
,
const
options_t
&
options_
,
const
address_t
*
addr_
);
const
options_t
&
options_
,
const
address_t
*
addr_
);
// To be used once only, when creating the session.
// To be used once only, when creating the session.
...
@@ -55,7 +55,7 @@ namespace zmq
...
@@ -55,7 +55,7 @@ namespace zmq
// Following functions are the interface exposed towards the engine.
// Following functions are the interface exposed towards the engine.
virtual
void
reset
();
virtual
void
reset
();
void
flush
();
void
flush
();
void
detach
();
void
engine_error
();
// i_pipe_events interface implementation.
// i_pipe_events interface implementation.
void
read_activated
(
zmq
::
pipe_t
*
pipe_
);
void
read_activated
(
zmq
::
pipe_t
*
pipe_
);
...
@@ -88,7 +88,7 @@ namespace zmq
...
@@ -88,7 +88,7 @@ namespace zmq
protected
:
protected
:
session_base_t
(
zmq
::
io_thread_t
*
io_thread_
,
bool
connect
_
,
session_base_t
(
zmq
::
io_thread_t
*
io_thread_
,
bool
active
_
,
zmq
::
socket_base_t
*
socket_
,
const
options_t
&
options_
,
zmq
::
socket_base_t
*
socket_
,
const
options_t
&
options_
,
const
address_t
*
addr_
);
const
address_t
*
addr_
);
virtual
~
session_base_t
();
virtual
~
session_base_t
();
...
@@ -97,7 +97,7 @@ namespace zmq
...
@@ -97,7 +97,7 @@ namespace zmq
void
start_connecting
(
bool
wait_
);
void
start_connecting
(
bool
wait_
);
void
detached
();
void
reconnect
();
// Handlers for incoming commands.
// Handlers for incoming commands.
void
process_plug
();
void
process_plug
();
...
@@ -116,7 +116,7 @@ namespace zmq
...
@@ -116,7 +116,7 @@ namespace zmq
// If true, this session (re)connects to the peer. Otherwise, it's
// If true, this session (re)connects to the peer. Otherwise, it's
// a transient session created by the listener.
// a transient session created by the listener.
bool
connect
;
bool
active
;
// Pipe connecting the session to its socket.
// Pipe connecting the session to its socket.
zmq
::
pipe_t
*
pipe
;
zmq
::
pipe_t
*
pipe
;
...
...
src/stream_engine.cpp
View file @
973d13d5
...
@@ -743,7 +743,7 @@ void zmq::stream_engine_t::error ()
...
@@ -743,7 +743,7 @@ void zmq::stream_engine_t::error ()
zmq_assert
(
session
);
zmq_assert
(
session
);
socket
->
event_disconnected
(
endpoint
,
s
);
socket
->
event_disconnected
(
endpoint
,
s
);
session
->
flush
();
session
->
flush
();
session
->
detach
();
session
->
engine_error
();
unplug
();
unplug
();
delete
this
;
delete
this
;
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment