Commit bde396f1 authored by Martin Sustrik's avatar Martin Sustrik

fix to 3-thread synchronisation algorithm

parent 5b5b5133
...@@ -19,10 +19,13 @@ ...@@ -19,10 +19,13 @@
#include "io_object.hpp" #include "io_object.hpp"
#include "io_thread.hpp" #include "io_thread.hpp"
#include "err.hpp"
zmq::io_object_t::io_object_t (io_thread_t *parent_, object_t *owner_) : zmq::io_object_t::io_object_t (io_thread_t *parent_, object_t *owner_) :
object_t (parent_), object_t (parent_),
owner (owner_) owner (owner_),
plugged_in (false),
terminated (false)
{ {
// Retrieve the poller from the thread we are running in. // Retrieve the poller from the thread we are running in.
poller = parent_->get_poller (); poller = parent_->get_poller ();
...@@ -32,6 +35,23 @@ zmq::io_object_t::~io_object_t () ...@@ -32,6 +35,23 @@ zmq::io_object_t::~io_object_t ()
{ {
} }
void zmq::io_object_t::process_plug ()
{
zmq_assert (!plugged_in);
// If termination of the object was already requested, destroy it and
// send the termination acknowledgement.
if (terminated) {
send_term_ack (owner);
delete this;
return;
}
// Notify the generic termination mechanism (io_object_t) that the object
// is already plugged in.
plugged_in = true;
}
zmq::handle_t zmq::io_object_t::add_fd (fd_t fd_, i_poll_events *events_) zmq::handle_t zmq::io_object_t::add_fd (fd_t fd_, i_poll_events *events_)
{ {
return poller->add_fd (fd_, events_); return poller->add_fd (fd_, events_);
...@@ -72,6 +92,21 @@ void zmq::io_object_t::cancel_timer (i_poll_events *events_) ...@@ -72,6 +92,21 @@ void zmq::io_object_t::cancel_timer (i_poll_events *events_)
poller->cancel_timer (events_); poller->cancel_timer (events_);
} }
void zmq::io_object_t::in_event ()
{
zmq_assert (false);
}
void zmq::io_object_t::out_event ()
{
zmq_assert (false);
}
void zmq::io_object_t::timer_event ()
{
zmq_assert (false);
}
void zmq::io_object_t::term () void zmq::io_object_t::term ()
{ {
send_term_req (owner, this); send_term_req (owner, this);
...@@ -79,6 +114,17 @@ void zmq::io_object_t::term () ...@@ -79,6 +114,17 @@ void zmq::io_object_t::term ()
void zmq::io_object_t::process_term () void zmq::io_object_t::process_term ()
{ {
zmq_assert (!terminated);
// If termination request has occured even before the object was plugged in
// wait till plugging in happens, then acknowledge the termination.
if (!plugged_in) {
terminated = true;
return;
}
// Otherwise, destroy the object and acknowledge the termination
// straight away.
send_term_ack (owner); send_term_ack (owner);
delete this; delete this;
} }
...@@ -22,11 +22,12 @@ ...@@ -22,11 +22,12 @@
#include "object.hpp" #include "object.hpp"
#include "i_poller.hpp" #include "i_poller.hpp"
#include "i_poll_events.hpp"
namespace zmq namespace zmq
{ {
class io_object_t : public object_t class io_object_t : public object_t, public i_poll_events
{ {
public: public:
...@@ -45,6 +46,11 @@ namespace zmq ...@@ -45,6 +46,11 @@ namespace zmq
// of I/O object correctly. // of I/O object correctly.
virtual ~io_object_t (); virtual ~io_object_t ();
// Handlers for incoming commands. It vital that every I/O object
// invokes io_object_t::process_plug at the end of it's own plug
// handler.
void process_plug ();
// Methods to access underlying poller object. // Methods to access underlying poller object.
handle_t add_fd (fd_t fd_, struct i_poll_events *events_); handle_t add_fd (fd_t fd_, struct i_poll_events *events_);
void rm_fd (handle_t handle_); void rm_fd (handle_t handle_);
...@@ -55,12 +61,25 @@ namespace zmq ...@@ -55,12 +61,25 @@ namespace zmq
void add_timer (struct i_poll_events *events_); void add_timer (struct i_poll_events *events_);
void cancel_timer (struct i_poll_events *events_); void cancel_timer (struct i_poll_events *events_);
// i_poll_events interface implementation.
void in_event ();
void out_event ();
void timer_event ();
// Socket owning this I/O object. It is responsible for destroying // Socket owning this I/O object. It is responsible for destroying
// it when it's being closed. // it when it's being closed.
object_t *owner; object_t *owner;
// Set to true when object is plugged in. It's responsibility
// of derived object to set the property after the feat.
bool plugged_in;
private: private:
// Set to true when object was terminated before it was plugged in.
// In such case destruction is delayed till 'plug' command arrives.
bool terminated;
struct i_poller *poller; struct i_poller *poller;
// Handlers for incoming commands. // Handlers for incoming commands.
......
...@@ -70,7 +70,11 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -70,7 +70,11 @@ int zmq::socket_base_t::bind (const char *addr_)
{ {
// TODO: The taskset should be taken from socket options. // TODO: The taskset should be taken from socket options.
uint64_t taskset = 0; uint64_t taskset = 0;
object_t *listener = new zmq_listener_t (choose_io_thread (taskset), this); zmq_listener_t *listener = new zmq_listener_t (choose_io_thread (taskset), this);
int rc = listener->set_address (addr_);
if (rc != 0)
return -1;
send_plug (listener); send_plug (listener);
send_own (this, listener); send_own (this, listener);
return 0; return 0;
......
...@@ -17,6 +17,8 @@ ...@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <string.h>
#include "tcp_listener.hpp" #include "tcp_listener.hpp"
#include "platform.hpp" #include "platform.hpp"
#include "ip.hpp" #include "ip.hpp"
...@@ -41,6 +43,7 @@ ...@@ -41,6 +43,7 @@
zmq::tcp_listener_t::tcp_listener_t () : zmq::tcp_listener_t::tcp_listener_t () :
s (retired_fd) s (retired_fd)
{ {
memset (&addr, 0, sizeof (addr));
} }
zmq::tcp_listener_t::~tcp_listener_t () zmq::tcp_listener_t::~tcp_listener_t ()
...@@ -49,14 +52,14 @@ zmq::tcp_listener_t::~tcp_listener_t () ...@@ -49,14 +52,14 @@ zmq::tcp_listener_t::~tcp_listener_t ()
close (); close ();
} }
int zmq::tcp_listener_t::open (const char *addr_) int zmq::tcp_listener_t::set_address (const char *addr_)
{ {
// Convert the interface into sockaddr_in structure. // Convert the interface into sockaddr_in structure.
sockaddr_in ip_address; return resolve_ip_interface (&addr, addr_);
int rc = resolve_ip_interface (&ip_address, addr_); }
if (rc != 0)
return -1;
int zmq::tcp_listener_t::open ()
{
// Create a listening socket. // Create a listening socket.
s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (s == -1) if (s == -1)
...@@ -64,7 +67,7 @@ int zmq::tcp_listener_t::open (const char *addr_) ...@@ -64,7 +67,7 @@ int zmq::tcp_listener_t::open (const char *addr_)
// Allow reusing of the address. // Allow reusing of the address.
int flag = 1; int flag = 1;
rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int)); int rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
errno_assert (rc == 0); errno_assert (rc == 0);
// Set the non-blocking flag. // Set the non-blocking flag.
...@@ -75,7 +78,7 @@ int zmq::tcp_listener_t::open (const char *addr_) ...@@ -75,7 +78,7 @@ int zmq::tcp_listener_t::open (const char *addr_)
errno_assert (rc != -1); errno_assert (rc != -1);
// Bind the socket to the network interface and port. // Bind the socket to the network interface and port.
rc = bind (s, (struct sockaddr*) &ip_address, sizeof (ip_address)); rc = bind (s, (struct sockaddr*) &addr, sizeof (addr));
if (rc != 0) { if (rc != 0) {
close (); close ();
return -1; return -1;
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#define __ZMQ_TCP_LISTENER_HPP_INCLUDED__ #define __ZMQ_TCP_LISTENER_HPP_INCLUDED__
#include "fd.hpp" #include "fd.hpp"
#include "ip.hpp"
namespace zmq namespace zmq
{ {
...@@ -34,10 +35,13 @@ namespace zmq ...@@ -34,10 +35,13 @@ namespace zmq
tcp_listener_t (); tcp_listener_t ();
~tcp_listener_t (); ~tcp_listener_t ();
// Open TCP listining socket. Address is in // Set up the address to listen on. Address is in
// <interface-name>:<port-number> format. Interface name may be '*' // <interface-name>:<port-number> format. Interface name may be '*'
// to bind to all the interfaces. // to bind to all the interfaces.
int open (const char *addr_); int set_address (const char *addr_);
// Open TCP listining socket.
int open ();
// Close the listening socket. // Close the listening socket.
int close (); int close ();
...@@ -53,6 +57,9 @@ namespace zmq ...@@ -53,6 +57,9 @@ namespace zmq
private: private:
// IP address/port to listen on.
sockaddr_in addr;
// Underlying socket. // Underlying socket.
fd_t s; fd_t s;
......
...@@ -29,12 +29,46 @@ zmq::zmq_listener_t::zmq_listener_t (io_thread_t *parent_, object_t *owner_) : ...@@ -29,12 +29,46 @@ zmq::zmq_listener_t::zmq_listener_t (io_thread_t *parent_, object_t *owner_) :
zmq::zmq_listener_t::~zmq_listener_t () zmq::zmq_listener_t::~zmq_listener_t ()
{ {
if (plugged_in)
rm_fd (handle);
}
int zmq::zmq_listener_t::set_address (const char *addr_)
{
return tcp_listener.set_address (addr_);
} }
void zmq::zmq_listener_t::process_plug () void zmq::zmq_listener_t::process_plug ()
{ {
// TODO: Testing code follows... // Open the listening socket.
int rc = tcp_listener.open ();
zmq_assert (rc == 0);
// Start polling for incoming connections.
handle = add_fd (tcp_listener.get_fd (), this);
set_pollin (handle);
io_object_t::process_plug ();
}
void zmq::zmq_listener_t::in_event ()
{
fd_t fd = tcp_listener.accept ();
// If connection was reset by the peer in the meantime, just ignore it.
// TODO: Handle specific errors like ENFILE/EMFILE etc.
if (fd == retired_fd)
return;
// TODO
zmq_assert (false);
/*
object_t *engine = new zmq_engine_t (choose_io_thread (0), owner); object_t *engine = new zmq_engine_t (choose_io_thread (0), owner);
send_plug (engine); send_plug (engine);
send_own (owner, engine); send_own (owner, engine);
*/
} }
...@@ -20,7 +20,10 @@ ...@@ -20,7 +20,10 @@
#ifndef __ZMQ_ZMQ_LISTENER_HPP_INCLUDED__ #ifndef __ZMQ_ZMQ_LISTENER_HPP_INCLUDED__
#define __ZMQ_ZMQ_LISTENER_HPP_INCLUDED__ #define __ZMQ_ZMQ_LISTENER_HPP_INCLUDED__
#include <string>
#include "io_object.hpp" #include "io_object.hpp"
#include "tcp_listener.hpp"
namespace zmq namespace zmq
{ {
...@@ -31,6 +34,9 @@ namespace zmq ...@@ -31,6 +34,9 @@ namespace zmq
zmq_listener_t (class io_thread_t *parent_, object_t *owner_); zmq_listener_t (class io_thread_t *parent_, object_t *owner_);
// Set IP address to listen on.
int set_address (const char *addr_);
private: private:
~zmq_listener_t (); ~zmq_listener_t ();
...@@ -38,6 +44,15 @@ namespace zmq ...@@ -38,6 +44,15 @@ namespace zmq
// Handlers for incoming commands. // Handlers for incoming commands.
void process_plug (); void process_plug ();
// Handle I/O events.
void in_event ();
// Actual listening socket.
tcp_listener_t tcp_listener;
// Handle corresponding to the listening socket.
handle_t handle;
zmq_listener_t (const zmq_listener_t&); zmq_listener_t (const zmq_listener_t&);
void operator = (const zmq_listener_t&); void operator = (const zmq_listener_t&);
}; };
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment