Commit f8bf3a4c authored by Martin Sustrik's avatar Martin Sustrik

Rename i_inout to i_engine_sink

Signed-off-by: 's avatarMartin Sustrik <sustrik@250bpm.com>
parent 3ae73ee1
...@@ -24,7 +24,6 @@ libzmq_la_SOURCES = \ ...@@ -24,7 +24,6 @@ libzmq_la_SOURCES = \
err.hpp \ err.hpp \
fd.hpp \ fd.hpp \
fq.hpp \ fq.hpp \
i_inout.hpp \
io_object.hpp \ io_object.hpp \
io_thread.hpp \ io_thread.hpp \
ip.hpp \ ip.hpp \
......
...@@ -22,13 +22,13 @@ ...@@ -22,13 +22,13 @@
#include <string.h> #include <string.h>
#include "decoder.hpp" #include "decoder.hpp"
#include "i_inout.hpp" #include "i_engine.hpp"
#include "wire.hpp" #include "wire.hpp"
#include "err.hpp" #include "err.hpp"
zmq::decoder_t::decoder_t (size_t bufsize_, int64_t maxmsgsize_) : zmq::decoder_t::decoder_t (size_t bufsize_, int64_t maxmsgsize_) :
decoder_base_t <decoder_t> (bufsize_), decoder_base_t <decoder_t> (bufsize_),
destination (NULL), sink (NULL),
maxmsgsize (maxmsgsize_) maxmsgsize (maxmsgsize_)
{ {
int rc = in_progress.init (); int rc = in_progress.init ();
...@@ -44,9 +44,9 @@ zmq::decoder_t::~decoder_t () ...@@ -44,9 +44,9 @@ zmq::decoder_t::~decoder_t ()
errno_assert (rc == 0); errno_assert (rc == 0);
} }
void zmq::decoder_t::set_inout (i_inout *destination_) void zmq::decoder_t::set_sink (i_engine_sink *sink_)
{ {
destination = destination_; sink = sink_;
} }
bool zmq::decoder_t::one_byte_size_ready () bool zmq::decoder_t::one_byte_size_ready ()
...@@ -136,7 +136,7 @@ bool zmq::decoder_t::message_ready () ...@@ -136,7 +136,7 @@ bool zmq::decoder_t::message_ready ()
{ {
// Message is completely read. Push it further and start reading // Message is completely read. Push it further and start reading
// new message. (in_progress is a 0-byte message after this point.) // new message. (in_progress is a 0-byte message after this point.)
if (!destination || !destination->write (&in_progress)) if (!sink || !sink->write (&in_progress))
return false; return false;
next_step (tmpbuf, 1, &decoder_t::one_byte_size_ready); next_step (tmpbuf, 1, &decoder_t::one_byte_size_ready);
......
...@@ -184,7 +184,7 @@ namespace zmq ...@@ -184,7 +184,7 @@ namespace zmq
decoder_t (size_t bufsize_, int64_t maxmsgsize_); decoder_t (size_t bufsize_, int64_t maxmsgsize_);
~decoder_t (); ~decoder_t ();
void set_inout (struct i_inout *destination_); void set_sink (struct i_engine_sink *sink_);
private: private:
...@@ -193,7 +193,7 @@ namespace zmq ...@@ -193,7 +193,7 @@ namespace zmq
bool flags_ready (); bool flags_ready ();
bool message_ready (); bool message_ready ();
struct i_inout *destination; struct i_engine_sink *sink;
unsigned char tmpbuf [8]; unsigned char tmpbuf [8];
msg_t in_progress; msg_t in_progress;
......
...@@ -19,12 +19,12 @@ ...@@ -19,12 +19,12 @@
*/ */
#include "encoder.hpp" #include "encoder.hpp"
#include "i_inout.hpp" #include "i_engine.hpp"
#include "wire.hpp" #include "wire.hpp"
zmq::encoder_t::encoder_t (size_t bufsize_) : zmq::encoder_t::encoder_t (size_t bufsize_) :
encoder_base_t <encoder_t> (bufsize_), encoder_base_t <encoder_t> (bufsize_),
source (NULL) sink (NULL)
{ {
int rc = in_progress.init (); int rc = in_progress.init ();
errno_assert (rc == 0); errno_assert (rc == 0);
...@@ -39,9 +39,9 @@ zmq::encoder_t::~encoder_t () ...@@ -39,9 +39,9 @@ zmq::encoder_t::~encoder_t ()
errno_assert (rc == 0); errno_assert (rc == 0);
} }
void zmq::encoder_t::set_inout (i_inout *source_) void zmq::encoder_t::set_sink (i_engine_sink *sink_)
{ {
source = source_; sink = sink_;
} }
bool zmq::encoder_t::size_ready () bool zmq::encoder_t::size_ready ()
...@@ -62,7 +62,7 @@ bool zmq::encoder_t::message_ready () ...@@ -62,7 +62,7 @@ bool zmq::encoder_t::message_ready ()
// Note that new state is set only if write is successful. That way // Note that new state is set only if write is successful. That way
// unsuccessful write will cause retry on the next state machine // unsuccessful write will cause retry on the next state machine
// invocation. // invocation.
if (!source || !source->read (&in_progress)) { if (!sink || !sink->read (&in_progress)) {
rc = in_progress.init (); rc = in_progress.init ();
errno_assert (rc == 0); errno_assert (rc == 0);
return false; return false;
......
...@@ -163,14 +163,14 @@ namespace zmq ...@@ -163,14 +163,14 @@ namespace zmq
encoder_t (size_t bufsize_); encoder_t (size_t bufsize_);
~encoder_t (); ~encoder_t ();
void set_inout (struct i_inout *source_); void set_sink (struct i_engine_sink *sink_);
private: private:
bool size_ready (); bool size_ready ();
bool message_ready (); bool message_ready ();
struct i_inout *source; struct i_engine_sink *sink;
msg_t in_progress; msg_t in_progress;
unsigned char tmpbuf [10]; unsigned char tmpbuf [10];
......
...@@ -24,13 +24,15 @@ ...@@ -24,13 +24,15 @@
namespace zmq namespace zmq
{ {
// Abstract interface to be implemented by various engines.
struct i_engine struct i_engine
{ {
virtual ~i_engine () {} virtual ~i_engine () {}
// Plug the engine to the session. // Plug the engine to the session.
virtual void plug (class io_thread_t *io_thread_, virtual void plug (class io_thread_t *io_thread_,
struct i_inout *inout_) = 0; struct i_engine_sink *sink_) = 0;
// Unplug the engine from the session. // Unplug the engine from the session.
virtual void unplug () = 0; virtual void unplug () = 0;
...@@ -48,6 +50,25 @@ namespace zmq ...@@ -48,6 +50,25 @@ namespace zmq
virtual void activate_out () = 0; virtual void activate_out () = 0;
}; };
// Abstract interface to be implemented by engine sinks such as sessions.
struct i_engine_sink
{
virtual ~i_engine_sink () {}
// Engine asks for a message to send to the network.
virtual bool read (class msg_t *msg_) = 0;
// Engine received message from the network and sends it further on.
virtual bool write (class msg_t *msg_) = 0;
// Flush all the previously written messages.
virtual void flush () = 0;
// Engine is dead. Drop all the references to it.
virtual void detach () = 0;
};
} }
#endif #endif
/*
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_I_INOUT_HPP_INCLUDED__
#define __ZMQ_I_INOUT_HPP_INCLUDED__
#include "msg.hpp"
#include "stdint.hpp"
namespace zmq
{
struct i_inout
{
virtual ~i_inout () {}
// Engine asks for a message to send to the network.
virtual bool read (msg_t *msg_) = 0;
// Engine received message from the network and sends it further on.
virtual bool write (msg_t *msg_) = 0;
// Flush all the previously written messages.
virtual void flush () = 0;
// Engine is dead. Drop all the references to it.
virtual void detach () = 0;
};
}
#endif
...@@ -32,7 +32,6 @@ ...@@ -32,7 +32,6 @@
#include "err.hpp" #include "err.hpp"
#include "stdint.hpp" #include "stdint.hpp"
#include "wire.hpp" #include "wire.hpp"
#include "i_inout.hpp"
zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_, zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,
const options_t &options_) : const options_t &options_) :
...@@ -40,7 +39,7 @@ zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_, ...@@ -40,7 +39,7 @@ zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,
has_rx_timer (false), has_rx_timer (false),
pgm_socket (true, options_), pgm_socket (true, options_),
options (options_), options (options_),
inout (NULL), sink (NULL),
mru_decoder (NULL), mru_decoder (NULL),
pending_bytes (0) pending_bytes (0)
{ {
...@@ -57,7 +56,7 @@ int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_) ...@@ -57,7 +56,7 @@ int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_)
return pgm_socket.init (udp_encapsulation_, network_); return pgm_socket.init (udp_encapsulation_, network_);
} }
void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_inout *inout_) void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_)
{ {
// Retrieve PGM fds and start polling. // Retrieve PGM fds and start polling.
int socket_fd; int socket_fd;
...@@ -68,7 +67,7 @@ void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_inout *inout_) ...@@ -68,7 +67,7 @@ void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_inout *inout_)
set_pollin (pipe_handle); set_pollin (pipe_handle);
set_pollin (socket_handle); set_pollin (socket_handle);
inout = inout_; sink = sink_;
} }
void zmq::pgm_receiver_t::unplug () void zmq::pgm_receiver_t::unplug ()
...@@ -91,7 +90,7 @@ void zmq::pgm_receiver_t::unplug () ...@@ -91,7 +90,7 @@ void zmq::pgm_receiver_t::unplug ()
rm_fd (socket_handle); rm_fd (socket_handle);
rm_fd (pipe_handle); rm_fd (pipe_handle);
inout = NULL; sink = NULL;
} }
void zmq::pgm_receiver_t::terminate () void zmq::pgm_receiver_t::terminate ()
...@@ -218,7 +217,7 @@ void zmq::pgm_receiver_t::in_event () ...@@ -218,7 +217,7 @@ void zmq::pgm_receiver_t::in_event ()
it->second.decoder = new (std::nothrow) decoder_t (0, it->second.decoder = new (std::nothrow) decoder_t (0,
options.maxmsgsize); options.maxmsgsize);
alloc_assert (it->second.decoder); alloc_assert (it->second.decoder);
it->second.decoder->set_inout (inout); it->second.decoder->set_sink (sink);
} }
mru_decoder = it->second.decoder; mru_decoder = it->second.decoder;
...@@ -244,7 +243,7 @@ void zmq::pgm_receiver_t::in_event () ...@@ -244,7 +243,7 @@ void zmq::pgm_receiver_t::in_event ()
} }
// Flush any messages decoder may have produced. // Flush any messages decoder may have produced.
inout->flush (); sink->flush ();
} }
void zmq::pgm_receiver_t::timer_event (int token) void zmq::pgm_receiver_t::timer_event (int token)
......
...@@ -52,7 +52,7 @@ namespace zmq ...@@ -52,7 +52,7 @@ namespace zmq
int init (bool udp_encapsulation_, const char *network_); int init (bool udp_encapsulation_, const char *network_);
// i_engine interface implementation. // i_engine interface implementation.
void plug (class io_thread_t *io_thread_, struct i_inout *inout_); void plug (class io_thread_t *io_thread_, struct i_engine_sink *sink_);
void unplug (); void unplug ();
void terminate (); void terminate ();
void activate_in (); void activate_in ();
...@@ -100,8 +100,8 @@ namespace zmq ...@@ -100,8 +100,8 @@ namespace zmq
// Socket options. // Socket options.
options_t options; options_t options;
// Parent session. // Associated session.
i_inout *inout; i_engine_sink *sink;
// Most recently used decoder. // Most recently used decoder.
decoder_t *mru_decoder; decoder_t *mru_decoder;
......
...@@ -61,7 +61,7 @@ int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_) ...@@ -61,7 +61,7 @@ int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)
return rc; return rc;
} }
void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_inout *inout_) void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_)
{ {
// Alocate 2 fds for PGM socket. // Alocate 2 fds for PGM socket.
int downlink_socket_fd = 0; int downlink_socket_fd = 0;
...@@ -69,7 +69,7 @@ void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_inout *inout_) ...@@ -69,7 +69,7 @@ void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_inout *inout_)
int rdata_notify_fd = 0; int rdata_notify_fd = 0;
int pending_notify_fd = 0; int pending_notify_fd = 0;
encoder.set_inout (inout_); encoder.set_sink (sink_);
// Fill fds from PGM transport and add them to the poller. // Fill fds from PGM transport and add them to the poller.
pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd, pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd,
...@@ -106,7 +106,7 @@ void zmq::pgm_sender_t::unplug () ...@@ -106,7 +106,7 @@ void zmq::pgm_sender_t::unplug ()
rm_fd (uplink_handle); rm_fd (uplink_handle);
rm_fd (rdata_notify_handle); rm_fd (rdata_notify_handle);
rm_fd (pending_notify_handle); rm_fd (pending_notify_handle);
encoder.set_inout (NULL); encoder.set_sink (NULL);
} }
void zmq::pgm_sender_t::terminate () void zmq::pgm_sender_t::terminate ()
......
...@@ -50,7 +50,7 @@ namespace zmq ...@@ -50,7 +50,7 @@ namespace zmq
int init (bool udp_encapsulation_, const char *network_); int init (bool udp_encapsulation_, const char *network_);
// i_engine interface implementation. // i_engine interface implementation.
void plug (class io_thread_t *io_thread_, struct i_inout *inout_); void plug (class io_thread_t *io_thread_, struct i_engine_sink *sink_);
void unplug (); void unplug ();
void terminate (); void terminate ();
void activate_in (); void activate_in ();
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
#define __ZMQ_SESSION_HPP_INCLUDED__ #define __ZMQ_SESSION_HPP_INCLUDED__
#include "own.hpp" #include "own.hpp"
#include "i_inout.hpp" #include "i_engine.hpp"
#include "io_object.hpp" #include "io_object.hpp"
#include "blob.hpp" #include "blob.hpp"
#include "pipe.hpp" #include "pipe.hpp"
...@@ -33,7 +33,7 @@ namespace zmq ...@@ -33,7 +33,7 @@ namespace zmq
class session_t : class session_t :
public own_t, public own_t,
public io_object_t, public io_object_t,
public i_inout, public i_engine_sink,
public i_pipe_events public i_pipe_events
{ {
public: public:
...@@ -44,7 +44,7 @@ namespace zmq ...@@ -44,7 +44,7 @@ namespace zmq
// To be used once only, when creating the session. // To be used once only, when creating the session.
void attach_pipe (class pipe_t *pipe_); void attach_pipe (class pipe_t *pipe_);
// i_inout interface implementation. // i_engine_sink interface implementation.
bool read (msg_t *msg_); bool read (msg_t *msg_);
bool write (msg_t *msg_); bool write (msg_t *msg_);
void flush (); void flush ();
......
...@@ -29,7 +29,6 @@ ...@@ -29,7 +29,6 @@
#include "zmq_engine.hpp" #include "zmq_engine.hpp"
#include "zmq_connecter.hpp" #include "zmq_connecter.hpp"
#include "io_thread.hpp" #include "io_thread.hpp"
#include "i_inout.hpp"
#include "config.hpp" #include "config.hpp"
#include "err.hpp" #include "err.hpp"
...@@ -40,8 +39,8 @@ zmq::zmq_engine_t::zmq_engine_t (fd_t fd_, const options_t &options_) : ...@@ -40,8 +39,8 @@ zmq::zmq_engine_t::zmq_engine_t (fd_t fd_, const options_t &options_) :
outpos (NULL), outpos (NULL),
outsize (0), outsize (0),
encoder (out_batch_size), encoder (out_batch_size),
inout (NULL), sink (NULL),
ephemeral_inout (NULL), ephemeral_sink (NULL),
options (options_), options (options_),
plugged (false) plugged (false)
{ {
...@@ -55,18 +54,18 @@ zmq::zmq_engine_t::~zmq_engine_t () ...@@ -55,18 +54,18 @@ zmq::zmq_engine_t::~zmq_engine_t ()
zmq_assert (!plugged); zmq_assert (!plugged);
} }
void zmq::zmq_engine_t::plug (io_thread_t *io_thread_, i_inout *inout_) void zmq::zmq_engine_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_)
{ {
zmq_assert (!plugged); zmq_assert (!plugged);
plugged = true; plugged = true;
ephemeral_inout = NULL; ephemeral_sink = NULL;
// Connect to session/init object. // Connect to session/init object.
zmq_assert (!inout); zmq_assert (!sink);
zmq_assert (inout_); zmq_assert (sink_);
encoder.set_inout (inout_); encoder.set_sink (sink_);
decoder.set_inout (inout_); decoder.set_sink (sink_);
inout = inout_; sink = sink_;
// Connect to I/O threads poller object. // Connect to I/O threads poller object.
io_object_t::plug (io_thread_); io_object_t::plug (io_thread_);
...@@ -90,10 +89,10 @@ void zmq::zmq_engine_t::unplug () ...@@ -90,10 +89,10 @@ void zmq::zmq_engine_t::unplug ()
io_object_t::unplug (); io_object_t::unplug ();
// Disconnect from init/session object. // Disconnect from init/session object.
encoder.set_inout (NULL); encoder.set_sink (NULL);
decoder.set_inout (NULL); decoder.set_sink (NULL);
ephemeral_inout = inout; ephemeral_sink = sink;
inout = NULL; sink = NULL;
} }
void zmq::zmq_engine_t::terminate () void zmq::zmq_engine_t::terminate ()
...@@ -149,13 +148,13 @@ void zmq::zmq_engine_t::in_event () ...@@ -149,13 +148,13 @@ void zmq::zmq_engine_t::in_event ()
// Flush all messages the decoder may have produced. // Flush all messages the decoder may have produced.
// If IO handler has unplugged engine, flush transient IO handler. // If IO handler has unplugged engine, flush transient IO handler.
if (unlikely (!plugged)) { if (unlikely (!plugged)) {
zmq_assert (ephemeral_inout); zmq_assert (ephemeral_sink);
ephemeral_inout->flush (); ephemeral_sink->flush ();
} else { } else {
inout->flush (); sink->flush ();
} }
if (inout && disconnection) if (sink && disconnection)
error (); error ();
} }
...@@ -169,8 +168,8 @@ void zmq::zmq_engine_t::out_event () ...@@ -169,8 +168,8 @@ void zmq::zmq_engine_t::out_event ()
// If IO handler has unplugged engine, flush transient IO handler. // If IO handler has unplugged engine, flush transient IO handler.
if (unlikely (!plugged)) { if (unlikely (!plugged)) {
zmq_assert (ephemeral_inout); zmq_assert (ephemeral_sink);
ephemeral_inout->flush (); ephemeral_sink->flush ();
return; return;
} }
...@@ -219,8 +218,8 @@ void zmq::zmq_engine_t::activate_in () ...@@ -219,8 +218,8 @@ void zmq::zmq_engine_t::activate_in ()
void zmq::zmq_engine_t::error () void zmq::zmq_engine_t::error ()
{ {
zmq_assert (inout); zmq_assert (sink);
inout->detach (); sink->detach ();
unplug (); unplug ();
delete this; delete this;
} }
...@@ -43,7 +43,7 @@ namespace zmq ...@@ -43,7 +43,7 @@ namespace zmq
~zmq_engine_t (); ~zmq_engine_t ();
// i_engine interface implementation. // i_engine interface implementation.
void plug (class io_thread_t *io_thread_, struct i_inout *inout_); void plug (class io_thread_t *io_thread_, struct i_engine_sink *sink_);
void unplug (); void unplug ();
void terminate (); void terminate ();
void activate_in (); void activate_in ();
...@@ -69,10 +69,10 @@ namespace zmq ...@@ -69,10 +69,10 @@ namespace zmq
size_t outsize; size_t outsize;
encoder_t encoder; encoder_t encoder;
i_inout *inout; i_engine_sink *sink;
// Detached transient inout handler. // Detached transient sink.
i_inout *ephemeral_inout; i_engine_sink *ephemeral_sink;
options_t options; options_t options;
......
...@@ -23,7 +23,6 @@ ...@@ -23,7 +23,6 @@
#include <vector> #include <vector>
#include "i_inout.hpp"
#include "i_engine.hpp" #include "i_engine.hpp"
#include "stdint.hpp" #include "stdint.hpp"
#include "blob.hpp" #include "blob.hpp"
...@@ -36,7 +35,9 @@ namespace zmq ...@@ -36,7 +35,9 @@ namespace zmq
// The class handles initialisation phase of 0MQ wire-level protocol. // The class handles initialisation phase of 0MQ wire-level protocol.
class zmq_init_t : public own_t, public i_inout class zmq_init_t :
public own_t,
public i_engine_sink
{ {
public: public:
...@@ -56,7 +57,7 @@ namespace zmq ...@@ -56,7 +57,7 @@ namespace zmq
void finalise_initialisation (); void finalise_initialisation ();
void dispatch_engine (); void dispatch_engine ();
// i_inout interface implementation. // i_engine_sink interface implementation.
bool read (class msg_t *msg_); bool read (class msg_t *msg_);
bool write (class msg_t *msg_); bool write (class msg_t *msg_);
void flush (); void flush ();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment