Commit 95eb8a7a authored by Simon Giesecke's avatar Simon Giesecke

Problem: parts of in_event duplicated across subclasses of stream_listener_base_t

Solution: extract function create_engine into base class
parent 93c1843f
...@@ -37,10 +37,8 @@ ...@@ -37,10 +37,8 @@
#include <string.h> #include <string.h>
#include "stream_engine.hpp"
#include "ipc_address.hpp" #include "ipc_address.hpp"
#include "io_thread.hpp" #include "io_thread.hpp"
#include "session_base.hpp"
#include "config.hpp" #include "config.hpp"
#include "err.hpp" #include "err.hpp"
#include "ip.hpp" #include "ip.hpp"
...@@ -149,23 +147,7 @@ void zmq::ipc_listener_t::in_event () ...@@ -149,23 +147,7 @@ void zmq::ipc_listener_t::in_event ()
} }
// Create the engine object for this connection. // Create the engine object for this connection.
stream_engine_t *engine = create_engine (fd);
new (std::nothrow) stream_engine_t (fd, options, _endpoint);
alloc_assert (engine);
// Choose I/O thread to run connecter in. Given that we are already
// running in an I/O thread, there must be at least one available.
io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_assert (io_thread);
// Create and launch a session object.
session_base_t *session =
session_base_t::create (io_thread, false, _socket, options, NULL);
errno_assert (session);
session->inc_seqnum ();
launch_child (session);
send_attach (session, engine, false);
_socket->event_accepted (_endpoint, fd);
} }
int zmq::ipc_listener_t::get_address (std::string &addr_) int zmq::ipc_listener_t::get_address (std::string &addr_)
......
...@@ -29,7 +29,9 @@ ...@@ -29,7 +29,9 @@
#include "precompiled.hpp" #include "precompiled.hpp"
#include "stream_listener_base.hpp" #include "stream_listener_base.hpp"
#include "session_base.hpp"
#include "socket_base.hpp" #include "socket_base.hpp"
#include "stream_engine.hpp"
zmq::stream_listener_base_t::stream_listener_base_t ( zmq::stream_listener_base_t::stream_listener_base_t (
zmq::io_thread_t *io_thread_, zmq::io_thread_t *io_thread_,
...@@ -92,3 +94,24 @@ int zmq::stream_listener_base_t::close () ...@@ -92,3 +94,24 @@ int zmq::stream_listener_base_t::close ()
return 0; return 0;
} }
void zmq::stream_listener_base_t::create_engine (fd_t fd)
{
stream_engine_t *engine =
new (std::nothrow) stream_engine_t (fd, options, _endpoint);
alloc_assert (engine);
// Choose I/O thread to run connecter in. Given that we are already
// running in an I/O thread, there must be at least one available.
io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_assert (io_thread);
// Create and launch a session object.
session_base_t *session =
session_base_t::create (io_thread, false, _socket, options, NULL);
errno_assert (session);
session->inc_seqnum ();
launch_child (session);
send_attach (session, engine, false);
_socket->event_accepted (_endpoint, fd);
}
...@@ -69,6 +69,8 @@ class stream_listener_base_t : public own_t, public io_object_t ...@@ -69,6 +69,8 @@ class stream_listener_base_t : public own_t, public io_object_t
// Close the listening socket. // Close the listening socket.
virtual int close (); virtual int close ();
void create_engine (fd_t fd);
// Underlying socket. // Underlying socket.
fd_t _s; fd_t _s;
......
...@@ -34,9 +34,7 @@ ...@@ -34,9 +34,7 @@
#include <stdio.h> #include <stdio.h>
#include "tcp_listener.hpp" #include "tcp_listener.hpp"
#include "stream_engine.hpp"
#include "io_thread.hpp" #include "io_thread.hpp"
#include "session_base.hpp"
#include "config.hpp" #include "config.hpp"
#include "err.hpp" #include "err.hpp"
#include "ip.hpp" #include "ip.hpp"
...@@ -90,23 +88,7 @@ void zmq::tcp_listener_t::in_event () ...@@ -90,23 +88,7 @@ void zmq::tcp_listener_t::in_event ()
} }
// Create the engine object for this connection. // Create the engine object for this connection.
stream_engine_t *engine = create_engine (fd);
new (std::nothrow) stream_engine_t (fd, options, _endpoint);
alloc_assert (engine);
// Choose I/O thread to run connecter in. Given that we are already
// running in an I/O thread, there must be at least one available.
io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_assert (io_thread);
// Create and launch a session object.
session_base_t *session =
session_base_t::create (io_thread, false, _socket, options, NULL);
errno_assert (session);
session->inc_seqnum ();
launch_child (session);
send_attach (session, engine, false);
_socket->event_accepted (_endpoint, fd);
} }
int zmq::tcp_listener_t::get_address (std::string &addr_) int zmq::tcp_listener_t::get_address (std::string &addr_)
......
...@@ -37,10 +37,8 @@ ...@@ -37,10 +37,8 @@
#include <string.h> #include <string.h>
#include "stream_engine.hpp"
#include "tipc_address.hpp" #include "tipc_address.hpp"
#include "io_thread.hpp" #include "io_thread.hpp"
#include "session_base.hpp"
#include "config.hpp" #include "config.hpp"
#include "err.hpp" #include "err.hpp"
#include "ip.hpp" #include "ip.hpp"
...@@ -75,23 +73,7 @@ void zmq::tipc_listener_t::in_event () ...@@ -75,23 +73,7 @@ void zmq::tipc_listener_t::in_event ()
} }
// Create the engine object for this connection. // Create the engine object for this connection.
stream_engine_t *engine = create_engine (fd);
new (std::nothrow) stream_engine_t (fd, options, _endpoint);
alloc_assert (engine);
// Choose I/O thread to run connecter in. Given that we are already
// running in an I/O thread, there must be at least one available.
io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_assert (io_thread);
// Create and launch a session object.
session_base_t *session =
session_base_t::create (io_thread, false, _socket, options, NULL);
errno_assert (session);
session->inc_seqnum ();
launch_child (session);
send_attach (session, engine, false);
_socket->event_accepted (_endpoint, fd);
} }
int zmq::tipc_listener_t::get_address (std::string &addr_) int zmq::tipc_listener_t::get_address (std::string &addr_)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment