Commit 059beca5 authored by Martin Sustrik's avatar Martin Sustrik

listener/connecter/init/session added

parent bda766ab
...@@ -48,7 +48,7 @@ extern "C" { ...@@ -48,7 +48,7 @@ extern "C" {
#define ZMQ_SWAP 3 #define ZMQ_SWAP 3
#define ZMQ_MASK 4 #define ZMQ_MASK 4
#define ZMQ_AFFINITY 5 #define ZMQ_AFFINITY 5
#define ZMQ_SESSIONID 6 #define ZMQ_IDENTITY 6
// The operation should be performed in non-blocking mode. I.e. if it cannot // The operation should be performed in non-blocking mode. I.e. if it cannot
// be processed immediately, error should be returned with errno set to EAGAIN. // be processed immediately, error should be returned with errno set to EAGAIN.
......
...@@ -15,6 +15,7 @@ libzmq_la_SOURCES = \ ...@@ -15,6 +15,7 @@ libzmq_la_SOURCES = \
err.hpp \ err.hpp \
fd.hpp \ fd.hpp \
fd_signaler.hpp \ fd_signaler.hpp \
i_inout.hpp \
io_object.hpp \ io_object.hpp \
io_thread.hpp \ io_thread.hpp \
ip.hpp \ ip.hpp \
...@@ -25,10 +26,13 @@ libzmq_la_SOURCES = \ ...@@ -25,10 +26,13 @@ libzmq_la_SOURCES = \
msg.hpp \ msg.hpp \
mutex.hpp \ mutex.hpp \
object.hpp \ object.hpp \
options.hpp \
owner.hpp \
pipe.hpp \ pipe.hpp \
platform.hpp \ platform.hpp \
poll.hpp \ poll.hpp \
select.hpp \ select.hpp \
session.hpp \
simple_semaphore.hpp \ simple_semaphore.hpp \
socket_base.hpp \ socket_base.hpp \
stdint.hpp \ stdint.hpp \
...@@ -43,7 +47,10 @@ libzmq_la_SOURCES = \ ...@@ -43,7 +47,10 @@ libzmq_la_SOURCES = \
ypollset.hpp \ ypollset.hpp \
yqueue.hpp \ yqueue.hpp \
zmq_connecter.hpp \ zmq_connecter.hpp \
zmq_decoder.hpp \
zmq_encoder.hpp \
zmq_engine.hpp \ zmq_engine.hpp \
zmq_init.hpp \
zmq_listener.hpp \ zmq_listener.hpp \
app_thread.cpp \ app_thread.cpp \
devpoll.cpp \ devpoll.cpp \
...@@ -56,8 +63,11 @@ libzmq_la_SOURCES = \ ...@@ -56,8 +63,11 @@ libzmq_la_SOURCES = \
ip.cpp \ ip.cpp \
kqueue.cpp \ kqueue.cpp \
object.cpp \ object.cpp \
options.cpp \
owned.cpp \
poll.cpp \ poll.cpp \
select.cpp \ select.cpp \
session.cpp \
socket_base.cpp \ socket_base.cpp \
tcp_connecter.cpp \ tcp_connecter.cpp \
tcp_listener.cpp \ tcp_listener.cpp \
...@@ -67,7 +77,10 @@ libzmq_la_SOURCES = \ ...@@ -67,7 +77,10 @@ libzmq_la_SOURCES = \
ypollset.cpp \ ypollset.cpp \
zmq.cpp \ zmq.cpp \
zmq_connecter.cpp \ zmq_connecter.cpp \
zmq_decoder.cpp \
zmq_encoder.cpp \
zmq_engine.cpp \ zmq_engine.cpp \
zmq_init.cpp \
zmq_listener.cpp zmq_listener.cpp
libzmq_la_LDFLAGS = -version-info 0:0:0 libzmq_la_LDFLAGS = -version-info 0:0:0
......
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_ATOMIC_HPP_INCLUDED__
#define __ZMQ_ATOMIC_HPP_INCLUDED__
#include "stdint.hpp"
#if defined ZMQ_FORCE_MUTEXES
#define ZMQ_ATOMIC_MUTEX
#elif (defined __i386__ || defined __x86_64__) && defined __GNUC__
#define ZMQ_ATOMIC_X86
#elif defined ZMQ_HAVE_WINDOWS
#define ZMQ_ATOMIC_WINDOWS
#elif defined ZMQ_HAVE_SOLARIS
#define ZMQ_ATOMIC_SOLARIS
#else
#define ZMQ_ATOMIC_MUTEX
#endif
namespace zmq
{
// Atomic assignement.
inline void atomic_uint32_set (volatile uint32_t *p_, uint32_t value_)
{
*p_ = value_;
// StoreLoad memory barrier should go here on platforms with
// memory models that require it.
}
// Atomic retrieval of an integer.
inline uint32_t atomic_uint32_get (volatile uint32_t *p_)
{
// StoreLoad memory barrier should go here on platforms with
// memory models that require it.
return *p_;
}
// Atomic addition. Returns the old value.
inline uint32_t atomic_uint32_add (volatile uint32_t *p_, uint32_t delta_)
{
#if defined ZMQ_ATOMIC_WINDOWS
return InterlockedExchangeAdd ((LONG*) &value, increment_);
#elif defined ZMQ_ATOMIC_SOLARIS
return atomic_add_32_nv (&value, increment_) - delta_;
#elif defined ZMQ_ATOMIC_X86
uint32_t old;
__asm__ volatile (
"lock; xadd %0, %1\n\t"
: "=r" (old), "=m" (*p_)
: "0" (delta_), "m" (*p_)
: "cc", "memory");
return old;
#else
#error // TODO:
sync.lock ();
uint32_t old = *p_;
*p_ += delta_;
sync.unlock ();
#endif
}
// Atomic subtraction. Returns the old value.
inline uint32_t atomic_uint32_sub (volatile uint32_t *p_, uint32_t delta_)
{
#if defined ZMQ_ATOMIC_WINDOWS
LONG delta = - ((LONG) delta_);
return InterlockedExchangeAdd ((LONG*) &value, delta);
#elif defined ZMQ_ATOMIC_SOLARIS
int32_t delta = - ((int32_t) delta_);
return atomic_add_32_nv (&value, delta) + delta_;
#elif defined ZMQ_ATOMIC_X86
uint32_t old = -delta_;
__asm__ volatile ("lock; xaddl %0,%1"
: "=r" (old), "=m" (*p_)
: "0" (old), "m" (*p_)
: "cc");
return old;
#else
#error // TODO:
sync.lock ();
uint32_t old = *p_;
*p_ -= delta_;
sync.unlock ();
return old;
#endif
}
// Atomic assignement.
template <typename T>
inline void atomic_ptr_set (volatile T **p_, T *value_)
{
*p_ = value_;
// StoreLoad memory barrier should go here on platforms with
// memory models that require it.
}
// Perform atomic 'exchange pointers' operation. Old value is returned.
template <typename T>
inline void *atomic_ptr_xchg (volatile T **p_, T *value_)
{
#if defined ZMQ_ATOMIC_WINDOWS
return InterlockedExchangePointer (p_, value_);
#elif defined ZMQ_ATOMIC_SOLARIS
return atomic_swap_ptr (p_, value_);
#elif defined ZMQ_ATOMIC_X86
void *old;
__asm__ volatile (
"lock; xchg %0, %2"
: "=r" (old), "=m" (*p_)
: "m" (*p_), "0" (value_));
return old;
#else
#error // TODO:
sync.lock ();
void *old = *p_;
*p_ = value_;
sync.unlock ();
return old;
#endif
}
// Perform atomic 'compare and swap' operation on the pointer.
// The pointer is compared to 'cmp' argument and if they are
// equal, its value is set to 'value'. Old value of the pointer
// is returned.
template <typename T>
inline void *atomic_ptr_cas (volatile T **p_, T *cmp_, T *value_)
{
#if defined ZMQ_ATOMIC_WINDOWS
return InterlockedCompareExchangePointer (p_, value_, cmp_);
#elif defined ZMQ_ATOMIC_SOLARIS
return atomic_cas_ptr (p_, cmp_, value_);
#elif defined ZMQ_ATOMIC_X86
void *old;
__asm__ volatile (
"lock; cmpxchg %2, %3"
: "=a" (old), "=m" (*p_)
: "r" (value_), "m" (*p_), "0" (cmp_)
: "cc");
return old;
#else
#error // TODO:
sync.lock ();
void *old = *p_;
if (old == cmp_)
*p_ = value_;
sync.unlock ();
return old;
#endif
}
#if defined ZMQ_ATOMIC_X86 && defined __x86_64__
typedef uint64_t atomic_bitmap_t;
#else
typedef uint32_t atomic_bitmap_t;
#endif
// Atomic assignement.
inline void atomic_bitmap_set (volatile atomic_bitmap_t *p_,
atomic_bitmap_t value_)
{
*p_ = value_;
// StoreLoad memory barrier should go here on platforms with
// memory models that require it.
}
// Bit-test-set-and-reset. Sets one bit of the value and resets
// another one. Returns the original value of the reset bit.
inline bool atomic_bitmap_btsr (volatile atomic_bitmap_t *p_,
int set_index_, int reset_index_)
{
#if defined ZMQ_ATOMIC_WINDOWS
while (true) {
atomic_bitmap_t oldval = *p_;
atomic_bitmap_t newval = (oldval | (atomic_bitmap_t (1) <<
set_index_)) & ~(integer_t (1) << reset_index_);
if (InterlockedCompareExchange ((volatile LONG*) p_, newval,
oldval) == (LONG) oldval)
return (oldval & (atomic_bitmap_t (1) << reset_index_)) ?
true : false;
}
#elif defined ZMQ_ATOMIC_SOLARIS
while (true) {
atomic_bitmap_t oldval = *p_;
atomic_bitmap_t newval = (oldval | (atomic_bitmap_t (1) <<
set_index_)) & ~(integer_t (1) << reset_index_);
if (atomic_cas_32 (p_, oldval, newval) == oldval)
return (oldval & (atomic_bitmap_t (1) << reset_index_)) ?
true : false;
}
#elif defined ZMQ_ATOMIC_X86
atomic_bitmap_t oldval, dummy;
__asm__ volatile (
"mov %0, %1\n\t"
"1:"
"mov %1, %2\n\t"
"bts %3, %2\n\t"
"btr %4, %2\n\t"
"lock cmpxchg %2, %0\n\t"
"jnz 1b\n\t"
: "+m" (*p_), "=&a" (oldval), "=&r" (dummy)
: "r" (atomic_bitmap_t (set_index_)),
"r" (atomic_bitmap_t (reset_index_))
: "cc");
return (bool) (oldval & (atomic_bitmap_t (1) << reset_index_));
#else
#error // TODO:
sync.lock ();
atomic_bitmap_t oldval = *p_;
*p_ = (oldval | (atomic_bitmap_t (1) << set_index_)) &
~(atomic_bitmap_t (1) << reset_index_);
sync.unlock ();
return (oldval & (atomic_bitmap_t (1) << reset_index_)) ? true : false;
#endif
}
// Sets value to newval. Returns the original value.
inline atomic_bitmap_t atomic_bitmap_xchg (volatile atomic_bitmap_t *p_,
atomic_bitmap_t newval_)
{
#if defined ZMQ_ATOMIC_WINDOWS
return InterlockedExchange ((volatile LONG*) p_, newval_);
#elif defined ZMQ_ATOMIC_SOLARIS
return atomic_swap_32 (p_, newval_);
#elif defined ZMQ_ATOMIC_X86
atomic_bitmap_t oldval = newval_;
__asm__ volatile (
"lock; xchg %0, %1"
: "=r" (oldval)
: "m" (*p_), "0" (oldval)
: "memory");
return oldval;
#else
#error // TODO:
sync.lock ();
atomic_bitmap_t oldval = *p_;
*p_ = newval_;
sync.unlock ();
#endif
}
// izte is "if-zero-then-else" atomic operation - if the value is zero
// it substitutes it by 'thenval' else it rewrites it by 'elseval'.
// Original value of the integer is returned from this function.
inline atomic_bitmap_t atomic_bitmap_izte (volatile atomic_bitmap_t *p_,
atomic_bitmap_t thenval_, atomic_bitmap_t elseval_)
{
#if defined ZMQ_ATOMIC_WINDOWS
while (true) {
atomic_bitmap_t oldval = *p_;
atomic_bitmap_t newval = (oldval ? elseval_ : thenval_);
if (InterlockedCompareExchange ((volatile LONG*) p_, newval,
oldval) == (LONG) oldval)
return oldval;
}
#elif defined ZMQ_ATOMIC_SOLARIS
while (true) {
atomic_bitmap_t oldval = *p_;
atomic_bitmap_t newval = (oldval ? elseval_ : thenval_);
if (atomic_cas_32 (p_, oldval, newval) == oldval)
return oldval;
}
#elif defined ZMQ_ATOMIC_X86
atomic_bitmap_t oldval;
atomic_bitmap_t dummy;
__asm__ volatile (
"mov %0, %1\n\t"
"1:"
"mov %3, %2\n\t"
"test %1, %1\n\t"
"jz 2f\n\t"
"mov %4, %2\n\t"
"2:"
"lock cmpxchg %2, %0\n\t"
"jnz 1b\n\t"
: "+m" (*p_), "=&a" (oldval), "=&r" (dummy)
: "r" (thenval_), "r" (elseval_)
: "cc");
return oldval;
#else
#error // TODO:
sync.lock ();
atomic_bitmap_t oldval = *p_;
*p_ = oldval ? elseval_ : thenval_;
sync.unlock ();
return oldval;
#endif
}
}
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_I_INOUT_HPP_INCLUDED__
#define __ZMQ_I_INOUT_HPP_INCLUDED__
#include "../include/zmq.h"
namespace zmq
{
struct i_inout
{
virtual bool read (::zmq_msg *msg_) = 0;
virtual bool write (::zmq_msg *msg_) = 0;
virtual void flush () = 0;
};
}
#endif
...@@ -21,35 +21,19 @@ ...@@ -21,35 +21,19 @@
#include "io_thread.hpp" #include "io_thread.hpp"
#include "err.hpp" #include "err.hpp"
zmq::io_object_t::io_object_t (io_thread_t *parent_, object_t *owner_) : zmq::io_object_t::io_object_t (io_thread_t *io_thread_)
object_t (parent_),
owner (owner_),
plugged_in (false),
terminated (false)
{ {
// Retrieve the poller from the thread we are running in. // Retrieve the poller from the thread we are running in.
poller = parent_->get_poller (); poller = io_thread_->get_poller ();
} }
zmq::io_object_t::~io_object_t () zmq::io_object_t::~io_object_t ()
{ {
} }
void zmq::io_object_t::process_plug () void zmq::io_object_t::set_io_thread (io_thread_t *io_thread_)
{ {
zmq_assert (!plugged_in); poller = io_thread_->get_poller ();
// If termination of the object was already requested, destroy it and
// send the termination acknowledgement.
if (terminated) {
send_term_ack (owner);
delete this;
return;
}
// Notify the generic termination mechanism (io_object_t) that the object
// is already plugged in.
plugged_in = true;
} }
zmq::handle_t zmq::io_object_t::add_fd (fd_t fd_) zmq::handle_t zmq::io_object_t::add_fd (fd_t fd_)
...@@ -106,26 +90,3 @@ void zmq::io_object_t::timer_event () ...@@ -106,26 +90,3 @@ void zmq::io_object_t::timer_event ()
{ {
zmq_assert (false); zmq_assert (false);
} }
void zmq::io_object_t::term ()
{
send_term_req (owner, this);
}
void zmq::io_object_t::process_term ()
{
zmq_assert (!terminated);
// If termination request has occured even before the object was plugged in
// wait till plugging in happens, then acknowledge the termination.
if (!plugged_in) {
terminated = true;
return;
}
// Otherwise, destroy the object and acknowledge the termination
// straight away.
send_term_ack (owner);
process_unplug ();
delete this;
}
...@@ -20,41 +20,31 @@ ...@@ -20,41 +20,31 @@
#ifndef __ZMQ_IO_OBJECT_HPP_INCLUDED__ #ifndef __ZMQ_IO_OBJECT_HPP_INCLUDED__
#define __ZMQ_IO_OBJECT_HPP_INCLUDED__ #define __ZMQ_IO_OBJECT_HPP_INCLUDED__
#include "object.hpp" #include <stddef.h>
#include "i_poller.hpp" #include "i_poller.hpp"
#include "i_poll_events.hpp" #include "i_poll_events.hpp"
namespace zmq namespace zmq
{ {
class io_object_t : public object_t, public i_poll_events // Simple base class for objects that live in I/O threads.
// It makes communication with the poller object easier and
// makes defining unneeded event handlers unnecessary.
class io_object_t : public i_poll_events
{ {
public: public:
// I/O object will live in the thread inherited from the parent. io_object_t (class io_thread_t *io_thread_ = NULL);
// However, it's lifetime is managed by the owner. ~io_object_t ();
io_object_t (class io_thread_t *parent_, object_t *owner_);
protected: protected:
// Ask owner socket to terminate this I/O object. This may not happen // Derived class can init/swap the underlying I/O thread.
void term (); // Caution: Remove all the file descriptors from the old I/O thread
// before swapping to the new one!
// I/O object destroys itself. No point in allowing others to invoke void set_io_thread (class io_thread_t *io_thread_);
// the destructor. At the same time, it has to be virtual so that
// generic io_object deallocation mechanism destroys specific type
// of I/O object correctly.
virtual ~io_object_t ();
// Handlers for incoming commands. It vital that every I/O object
// invokes io_object_t::process_plug at the end of it's own plug
// handler.
void process_plug ();
// io_object_t defines a new handler used to disconnect the object
// from the poller object. Implement the handlen in the derived
// classes to ensure sane cleanup.
virtual void process_unplug () = 0;
// Methods to access underlying poller object. // Methods to access underlying poller object.
handle_t add_fd (fd_t fd_); handle_t add_fd (fd_t fd_);
...@@ -71,24 +61,10 @@ namespace zmq ...@@ -71,24 +61,10 @@ namespace zmq
void out_event (); void out_event ();
void timer_event (); void timer_event ();
// Socket owning this I/O object. It is responsible for destroying
// it when it's being closed.
object_t *owner;
private: private:
// Set to true when object is plugged in.
bool plugged_in;
// Set to true when object was terminated before it was plugged in.
// In such case destruction is delayed till 'plug' command arrives.
bool terminated;
struct i_poller *poller; struct i_poller *poller;
// Handlers for incoming commands.
void process_term ();
io_object_t (const io_object_t&); io_object_t (const io_object_t&);
void operator = (const io_object_t&); void operator = (const io_object_t&);
}; };
......
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "options.hpp"
zmq::options_t::options_t () :
hwm (0),
lwm (0),
swap (0),
mask (0),
affinity (0)
{
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_OPTIONS_HPP_INCLUDED__
#define __ZMQ_OPTIONS_HPP_INCLUDED__
#include <string>
namespace zmq
{
struct options_t
{
options_t ();
int64_t hwm;
int64_t lwm;
int64_t swap;
uint64_t mask;
uint64_t affinity;
std::string identity;
};
}
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "owned.hpp"
#include "err.hpp"
zmq::owned_t::owned_t (object_t *parent_, object_t *owner_) :
object_t (parent_),
owner (owner_),
plugged_in (false),
terminated (false)
{
}
zmq::owned_t::~owned_t ()
{
}
void zmq::owned_t::process_plug ()
{
zmq_assert (!plugged_in);
// If termination of the object was already requested, destroy it and
// send the termination acknowledgement.
if (terminated) {
send_term_ack (owner);
delete this;
return;
}
// Notify the generic termination mechanism (io_object_t) that the object
// is already plugged in.
plugged_in = true;
}
void zmq::owned_t::term ()
{
send_term_req (owner, this);
}
void zmq::owned_t::process_term ()
{
zmq_assert (!terminated);
// If termination request has occured even before the object was plugged in
// wait till plugging in happens, then acknowledge the termination.
if (!plugged_in) {
terminated = true;
return;
}
// Otherwise, destroy the object and acknowledge the termination
// straight away.
send_term_ack (owner);
process_unplug ();
delete this;
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_OWNED_HPP_INCLUDED__
#define __ZMQ_OWNED_HPP_INCLUDED__
#include "object.hpp"
namespace zmq
{
// Base class for objects owned by individual sockets. Handles
// initialisation and destruction of such objects.
class owned_t : public object_t
{
public:
// The object will live in parent's thread, however, its lifetime
// will be managed by its owner socket.
owned_t (object_t *parent_, object_t *owner_);
protected:
// Ask owner socket to terminate this object.
void term ();
// Derived object destroys owned_t. No point in allowing others to
// invoke the destructor. At the same time, it has to be virtual so
// that generic owned_t deallocation mechanism destroys specific type
// of the owned object correctly.
virtual ~owned_t ();
// Handlers for incoming commands. It's vital that every I/O object
// invokes io_object_t::process_plug at the end of it's own plug
// handler.
void process_plug ();
// io_object_t defines a new handler used to disconnect the object
// from the poller object. Implement the handlen in the derived
// classes to ensure sane cleanup.
virtual void process_unplug () = 0;
// Socket owning this object. It is responsible for destroying
// it when it's being closed.
object_t *owner;
private:
// Handlers for incoming commands.
void process_term ();
// Set to true when object is plugged in.
bool plugged_in;
// Set to true when object was terminated before it was plugged in.
// In such case destruction is delayed till 'plug' command arrives.
bool terminated;
owned_t (const owned_t&);
void operator = (const owned_t&);
};
}
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "session.hpp"
#include "zmq_engine.hpp"
#include "err.hpp"
zmq::session_t::session_t (object_t *parent_, object_t *owner_,
zmq_engine_t *engine_) :
owned_t (parent_, owner_),
engine (engine_)
{
}
zmq::session_t::~session_t ()
{
}
bool zmq::session_t::read (::zmq_msg *msg_)
{
return false;
}
bool zmq::session_t::write (::zmq_msg *msg_)
{
return false;
}
void zmq::session_t::flush ()
{
}
void zmq::session_t::process_plug ()
{
engine->plug (this);
owned_t::process_plug ();
}
void zmq::session_t::process_unplug ()
{
engine->unplug ();
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_SESSION_HPP_INCLUDED__
#define __ZMQ_SESSION_HPP_INCLUDED__
#include "i_inout.hpp"
#include "owned.hpp"
namespace zmq
{
class session_t : public owned_t, public i_inout
{
public:
session_t (object_t *parent_, object_t *owner_,
class zmq_engine_t *engine_);
private:
~session_t ();
// i_inout interface implementation.
bool read (::zmq_msg *msg_);
bool write (::zmq_msg *msg_);
void flush ();
// Handlers for incoming commands.
void process_plug ();
void process_unplug ();
class zmq_engine_t *engine;
session_t (const session_t&);
void operator = (const session_t&);
};
}
#endif
...@@ -32,12 +32,7 @@ ...@@ -32,12 +32,7 @@
zmq::socket_base_t::socket_base_t (app_thread_t *parent_) : zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
object_t (parent_), object_t (parent_),
pending_term_acks (0), pending_term_acks (0),
app_thread (parent_), app_thread (parent_)
hwm (0),
lwm (0),
swap (0),
mask (0),
affinity (0)
{ {
} }
...@@ -77,7 +72,7 @@ int zmq::socket_base_t::setsockopt (int option_, void *optval_, ...@@ -77,7 +72,7 @@ int zmq::socket_base_t::setsockopt (int option_, void *optval_,
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
hwm = *((int64_t*) optval_); options.hwm = *((int64_t*) optval_);
return 0; return 0;
case ZMQ_LWM: case ZMQ_LWM:
...@@ -85,7 +80,7 @@ int zmq::socket_base_t::setsockopt (int option_, void *optval_, ...@@ -85,7 +80,7 @@ int zmq::socket_base_t::setsockopt (int option_, void *optval_,
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
lwm = *((int64_t*) optval_); options.lwm = *((int64_t*) optval_);
return 0; return 0;
case ZMQ_SWAP: case ZMQ_SWAP:
...@@ -93,7 +88,7 @@ int zmq::socket_base_t::setsockopt (int option_, void *optval_, ...@@ -93,7 +88,7 @@ int zmq::socket_base_t::setsockopt (int option_, void *optval_,
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
swap = *((int64_t*) optval_); options.swap = *((int64_t*) optval_);
return 0; return 0;
case ZMQ_MASK: case ZMQ_MASK:
...@@ -101,7 +96,7 @@ int zmq::socket_base_t::setsockopt (int option_, void *optval_, ...@@ -101,7 +96,7 @@ int zmq::socket_base_t::setsockopt (int option_, void *optval_,
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
mask = (uint64_t) *((int64_t*) optval_); options.mask = (uint64_t) *((int64_t*) optval_);
return 0; return 0;
case ZMQ_AFFINITY: case ZMQ_AFFINITY:
...@@ -109,15 +104,15 @@ int zmq::socket_base_t::setsockopt (int option_, void *optval_, ...@@ -109,15 +104,15 @@ int zmq::socket_base_t::setsockopt (int option_, void *optval_,
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
affinity = (uint64_t) *((int64_t*) optval_); options.affinity = (uint64_t) *((int64_t*) optval_);
return 0; return 0;
case ZMQ_SESSIONID: case ZMQ_IDENTITY:
if (optvallen_ != sizeof (const char*)) { if (optvallen_ != sizeof (const char*)) {
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
session_id = (const char*) optval_; options.identity = (const char*) optval_;
return 0; return 0;
default: default:
...@@ -128,8 +123,8 @@ int zmq::socket_base_t::setsockopt (int option_, void *optval_, ...@@ -128,8 +123,8 @@ int zmq::socket_base_t::setsockopt (int option_, void *optval_,
int zmq::socket_base_t::bind (const char *addr_) int zmq::socket_base_t::bind (const char *addr_)
{ {
zmq_listener_t *listener = zmq_listener_t *listener = new zmq_listener_t (
new zmq_listener_t (choose_io_thread (affinity), this); choose_io_thread (options.affinity), this, options);
int rc = listener->set_address (addr_); int rc = listener->set_address (addr_);
if (rc != 0) if (rc != 0)
return -1; return -1;
...@@ -141,8 +136,8 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -141,8 +136,8 @@ int zmq::socket_base_t::bind (const char *addr_)
int zmq::socket_base_t::connect (const char *addr_) int zmq::socket_base_t::connect (const char *addr_)
{ {
zmq_connecter_t *connecter = zmq_connecter_t *connecter = new zmq_connecter_t (
new zmq_connecter_t (choose_io_thread (affinity), this); choose_io_thread (options.affinity), this, options);
int rc = connecter->set_address (addr_); int rc = connecter->set_address (addr_);
if (rc != 0) if (rc != 0)
return -1; return -1;
......
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
#include <string> #include <string>
#include "object.hpp" #include "object.hpp"
#include "options.hpp"
#include "stdint.hpp" #include "stdint.hpp"
namespace zmq namespace zmq
...@@ -66,12 +67,7 @@ namespace zmq ...@@ -66,12 +67,7 @@ namespace zmq
class app_thread_t *app_thread; class app_thread_t *app_thread;
// Socket options. // Socket options.
int64_t hwm; options_t options;
int64_t lwm;
int64_t swap;
uint64_t mask;
uint64_t affinity;
std::string session_id;
socket_base_t (const socket_base_t&); socket_base_t (const socket_base_t&);
void operator = (const socket_base_t&); void operator = (const socket_base_t&);
......
...@@ -18,11 +18,16 @@ ...@@ -18,11 +18,16 @@
*/ */
#include "zmq_connecter.hpp" #include "zmq_connecter.hpp"
#include "zmq_init.hpp"
#include "io_thread.hpp"
#include "err.hpp" #include "err.hpp"
zmq::zmq_connecter_t::zmq_connecter_t (io_thread_t *parent_, object_t *owner_) : zmq::zmq_connecter_t::zmq_connecter_t (io_thread_t *parent_, object_t *owner_,
io_object_t (parent_, owner_), const options_t &options_) :
waiting (false) owned_t (parent_, owner_),
io_object_t (parent_),
handle_valid (false),
options (options_)
{ {
} }
...@@ -38,12 +43,12 @@ int zmq::zmq_connecter_t::set_address (const char *addr_) ...@@ -38,12 +43,12 @@ int zmq::zmq_connecter_t::set_address (const char *addr_)
void zmq::zmq_connecter_t::process_plug () void zmq::zmq_connecter_t::process_plug ()
{ {
start_connecting (); start_connecting ();
io_object_t::process_plug (); owned_t::process_plug ();
} }
void zmq::zmq_connecter_t::process_unplug () void zmq::zmq_connecter_t::process_unplug ()
{ {
if (!waiting) if (handle_valid)
rm_fd (handle); rm_fd (handle);
} }
...@@ -58,30 +63,31 @@ void zmq::zmq_connecter_t::in_event () ...@@ -58,30 +63,31 @@ void zmq::zmq_connecter_t::in_event ()
void zmq::zmq_connecter_t::out_event () void zmq::zmq_connecter_t::out_event ()
{ {
fd_t fd = tcp_connecter.connect (); fd_t fd = tcp_connecter.connect ();
rm_fd (handle);
handle_valid = false;
// If there was error during the connecting, close the socket and wait // If there was error during the connecting, close the socket and wait
// for a while before trying to reconnect. // for a while before trying to reconnect.
if (fd == retired_fd) { if (fd == retired_fd) {
rm_fd (handle);
tcp_connecter.close (); tcp_connecter.close ();
waiting = true;
add_timer (); add_timer ();
return; return;
} }
zmq_assert (false); // Create an init object.
io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_init_t *init = new zmq_init_t (io_thread, owner, fd, true, options);
zmq_assert (init);
send_plug (init);
send_own (owner, init);
/* // Ask owner socket to shut the connecter down.
object_t *engine = new zmq_engine_t (choose_io_thread (0), owner); term ();
send_plug (engine);
send_own (owner, engine);
*/
} }
void zmq::zmq_connecter_t::timer_event () void zmq::zmq_connecter_t::timer_event ()
{ {
// Reconnect period have elapsed. // Reconnect period have elapsed.
waiting = false;
start_connecting (); start_connecting ();
} }
...@@ -99,12 +105,12 @@ void zmq::zmq_connecter_t::start_connecting () ...@@ -99,12 +105,12 @@ void zmq::zmq_connecter_t::start_connecting ()
// Connection establishment may be dealyed. Poll for its completion. // Connection establishment may be dealyed. Poll for its completion.
else if (rc == -1 && errno == EINPROGRESS) { else if (rc == -1 && errno == EINPROGRESS) {
handle = add_fd (tcp_connecter.get_fd ()); handle = add_fd (tcp_connecter.get_fd ());
handle_valid = true;
set_pollout (handle); set_pollout (handle);
return; return;
} }
// If none of the above is true, synchronous error occured. // If none of the above is true, synchronous error occured.
// Wait for a while and retry. // Wait for a while and retry.
waiting = true;
add_timer (); add_timer ();
} }
...@@ -20,17 +20,21 @@ ...@@ -20,17 +20,21 @@
#ifndef __ZMQ_ZMQ_CONNECTER_HPP_INCLUDED__ #ifndef __ZMQ_ZMQ_CONNECTER_HPP_INCLUDED__
#define __ZMQ_ZMQ_CONNECTER_HPP_INCLUDED__ #define __ZMQ_ZMQ_CONNECTER_HPP_INCLUDED__
#include "owned.hpp"
#include "io_object.hpp" #include "io_object.hpp"
#include "tcp_connecter.hpp" #include "tcp_connecter.hpp"
#include "options.hpp"
#include "stdint.hpp"
namespace zmq namespace zmq
{ {
class zmq_connecter_t : public io_object_t class zmq_connecter_t : public owned_t, public io_object_t
{ {
public: public:
zmq_connecter_t (class io_thread_t *parent_, object_t *owner_); zmq_connecter_t (class io_thread_t *parent_, object_t *owner_,
const options_t &options_);
// Set IP address to connect to. // Set IP address to connect to.
int set_address (const char *addr_); int set_address (const char *addr_);
...@@ -57,9 +61,12 @@ namespace zmq ...@@ -57,9 +61,12 @@ namespace zmq
// Handle corresponding to the listening socket. // Handle corresponding to the listening socket.
handle_t handle; handle_t handle;
// True, if we are waiting for a period of time before trying to // If true file descriptor is registered with the poller and 'handle'
// reconnect. // contains valid value.
bool waiting; bool handle_valid;
// Associated socket options.
options_t options;
zmq_connecter_t (const zmq_connecter_t&); zmq_connecter_t (const zmq_connecter_t&);
void operator = (const zmq_connecter_t&); void operator = (const zmq_connecter_t&);
......
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "zmq_decoder.hpp"
#include "i_inout.hpp"
#include "wire.hpp"
zmq::zmq_decoder_t::zmq_decoder_t () :
destination (NULL)
{
zmq_msg_init (&in_progress);
// At the beginning, read one byte and go to one_byte_size_ready state.
next_step (tmpbuf, 1, &zmq_decoder_t::one_byte_size_ready);
}
zmq::zmq_decoder_t::~zmq_decoder_t ()
{
zmq_msg_close (&in_progress);
}
void zmq::zmq_decoder_t::set_inout (i_inout *destination_)
{
destination = destination_;
}
bool zmq::zmq_decoder_t::one_byte_size_ready ()
{
// First byte of size is read. If it is 0xff read 8-byte size.
// Otherwise allocate the buffer for message data and read the
// message data into it.
if (*tmpbuf == 0xff)
next_step (tmpbuf, 8, &zmq_decoder_t::eight_byte_size_ready);
else {
zmq_msg_init_size (&in_progress, *tmpbuf);
next_step (zmq_msg_data (&in_progress), *tmpbuf,
&zmq_decoder_t::message_ready);
}
return true;
}
bool zmq::zmq_decoder_t::eight_byte_size_ready ()
{
// 8-byte size is read. Allocate the buffer for message body and
// read the message data into it.
size_t size = (size_t) get_uint64 (tmpbuf);
zmq_msg_init_size (&in_progress, size);
next_step (zmq_msg_data (&in_progress), size,
&zmq_decoder_t::message_ready);
return true;
}
bool zmq::zmq_decoder_t::message_ready ()
{
// Message is completely read. Push it further and start reading
// new message.
if (!destination->write (&in_progress))
return false;
next_step (tmpbuf, 1, &zmq_decoder_t::one_byte_size_ready);
return true;
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_ZMQ_DECODER_HPP_INCLUDED__
#define __ZMQ_ZMQ_DECODER_HPP_INCLUDED__
#include "../include/zmq.h"
#include "decoder.hpp"
namespace zmq
{
// Decoder for 0MQ backend protocol. Converts data batches into messages.
class zmq_decoder_t : public decoder_t <zmq_decoder_t>
{
public:
zmq_decoder_t ();
~zmq_decoder_t ();
void set_inout (struct i_inout *destination_);
private:
bool one_byte_size_ready ();
bool eight_byte_size_ready ();
bool message_ready ();
struct i_inout *destination;
unsigned char tmpbuf [8];
::zmq_msg in_progress;
zmq_decoder_t (const zmq_decoder_t&);
void operator = (const zmq_decoder_t&);
};
}
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "zmq_encoder.hpp"
#include "i_inout.hpp"
#include "wire.hpp"
zmq::zmq_encoder_t::zmq_encoder_t () :
source (NULL)
{
zmq_msg_init (&in_progress);
// Write 0 bytes to the batch and go to message_ready state.
next_step (NULL, 0, &zmq_encoder_t::message_ready, true);
}
zmq::zmq_encoder_t::~zmq_encoder_t ()
{
zmq_msg_close (&in_progress);
}
void zmq::zmq_encoder_t::set_inout (i_inout *source_)
{
source = source_;
}
bool zmq::zmq_encoder_t::size_ready ()
{
// Write message body into the buffer.
next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress),
&zmq_encoder_t::message_ready, false);
return true;
}
bool zmq::zmq_encoder_t::message_ready ()
{
// Read new message from the dispatcher. If there is none, return false.
// Note that new state is set only if write is successful. That way
// unsuccessful write will cause retry on the next state machine
// invocation.
if (!source->read (&in_progress)) {
return false;
}
size_t size = zmq_msg_size (&in_progress);
// For messages less than 255 bytes long, write one byte of message size.
// For longer messages write 0xff escape character followed by 8-byte
// message size.
if (size < 255) {
tmpbuf [0] = (unsigned char) size;
next_step (tmpbuf, 1, &zmq_encoder_t::size_ready, true);
}
else {
tmpbuf [0] = 0xff;
put_uint64 (tmpbuf + 1, size);
next_step (tmpbuf, 9, &zmq_encoder_t::size_ready, true);
}
return true;
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_ZMQ_ENCODER_HPP_INCLUDED__
#define __ZMQ_ZMQ_ENCODER_HPP_INCLUDED__
#include "../include/zmq.h"
#include "encoder.hpp"
namespace zmq
{
// Encoder for 0MQ backend protocol. Converts messages into data batches.
class zmq_encoder_t : public encoder_t <zmq_encoder_t>
{
public:
zmq_encoder_t ();
~zmq_encoder_t ();
void set_inout (struct i_inout *source_);
private:
bool size_ready ();
bool message_ready ();
struct i_inout *source;
::zmq_msg in_progress;
unsigned char tmpbuf [9];
zmq_encoder_t (const zmq_encoder_t&);
void operator = (const zmq_encoder_t&);
};
}
#endif
...@@ -19,17 +19,118 @@ ...@@ -19,17 +19,118 @@
#include "zmq_engine.hpp" #include "zmq_engine.hpp"
#include "io_thread.hpp" #include "io_thread.hpp"
#include "i_inout.hpp"
#include "config.hpp"
#include "err.hpp"
zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, object_t *owner_) : zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_) :
io_object_t (parent_, owner_) io_object_t (parent_),
insize (0),
inpos (0),
outsize (0),
outpos (0),
inout (NULL)
{ {
// Allocate read & write buffer.
inbuf = (unsigned char*) malloc (in_batch_size);
zmq_assert (inbuf);
outbuf = (unsigned char*) malloc (out_batch_size);
zmq_assert (outbuf);
// Initialise the underlying socket.
int rc = tcp_socket.open (fd_);
zmq_assert (rc == 0);
} }
zmq::zmq_engine_t::~zmq_engine_t () zmq::zmq_engine_t::~zmq_engine_t ()
{ {
free (outbuf);
free (inbuf);
}
void zmq::zmq_engine_t::plug (i_inout *inout_)
{
encoder.set_inout (inout_);
decoder.set_inout (inout_);
handle = add_fd (tcp_socket.get_fd ());
set_pollin (handle);
set_pollout (handle);
inout = inout_;
}
void zmq::zmq_engine_t::unplug ()
{
rm_fd (handle);
inout = NULL;
}
void zmq::zmq_engine_t::in_event ()
{
// If there's no data to process in the buffer, read new data.
if (inpos == insize) {
// Read as much data as possible to the read buffer.
insize = tcp_socket.read (inbuf, in_batch_size);
printf ("%d bytes read\n", (int) insize);
inpos = 0;
// Check whether the peer has closed the connection.
if (insize == -1) {
insize = 0;
error ();
return;
}
}
// Following code should be executed even if there's not a single byte in
// the buffer. There still can be a decoded messages stored in the decoder.
// Push the data to the decoder.
int nbytes = decoder.write (inbuf + inpos, insize - inpos);
// Adjust read position. Stop polling for input if we got stuck.
inpos += nbytes;
if (inpos < insize)
reset_pollin (handle);
// If at least one byte was processed, flush all messages the decoder
// may have produced.
if (nbytes > 0)
inout->flush ();
} }
void zmq::zmq_engine_t::process_plug () void zmq::zmq_engine_t::out_event ()
{ {
// If write buffer is empty, try to read new data from the encoder.
if (outpos == outsize) {
outsize = encoder.read (outbuf, out_batch_size);
outpos = 0;
// If there is no data to send, stop polling for output.
if (outsize == 0)
reset_pollout (handle);
}
// If there are any data to write in write buffer, write as much as
// possible to the socket.
if (outpos < outsize) {
int nbytes = tcp_socket.write (outbuf + outpos, outsize - outpos);
// Handle problems with the connection.
if (nbytes == -1) {
error ();
return;
}
outpos += nbytes;
}
} }
void zmq::zmq_engine_t::error ()
{
zmq_assert (false);
}
...@@ -21,6 +21,9 @@ ...@@ -21,6 +21,9 @@
#define __ZMQ_ZMQ_ENGINE_HPP_INCLUDED__ #define __ZMQ_ZMQ_ENGINE_HPP_INCLUDED__
#include "io_object.hpp" #include "io_object.hpp"
#include "tcp_socket.hpp"
#include "zmq_encoder.hpp"
#include "zmq_decoder.hpp"
namespace zmq namespace zmq
{ {
...@@ -29,14 +32,36 @@ namespace zmq ...@@ -29,14 +32,36 @@ namespace zmq
{ {
public: public:
zmq_engine_t (class io_thread_t *parent_, object_t *owner_); zmq_engine_t (class io_thread_t *parent_, fd_t fd_);
~zmq_engine_t ();
void plug (struct i_inout *inout_);
void unplug ();
// i_poll_events interface implementation.
void in_event ();
void out_event ();
private: private:
~zmq_engine_t (); // Function to handle network disconnections.
void error ();
tcp_socket_t tcp_socket;
handle_t handle;
unsigned char *inbuf;
int insize;
int inpos;
unsigned char *outbuf;
int outsize;
int outpos;
i_inout *inout;
// Handlers for incoming commands. zmq_encoder_t encoder;
void process_plug (); zmq_decoder_t decoder;
zmq_engine_t (const zmq_engine_t&); zmq_engine_t (const zmq_engine_t&);
void operator = (const zmq_engine_t&); void operator = (const zmq_engine_t&);
......
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "zmq_init.hpp"
#include "io_thread.hpp"
#include "session.hpp"
#include "err.hpp"
zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, object_t *owner_, fd_t fd_,
bool connected_, const options_t &options_) :
owned_t (parent_, owner_),
connected (connected_),
options (options_)
{
// Create associated engine object.
engine = new zmq_engine_t (parent_, fd_);
zmq_assert (engine);
}
zmq::zmq_init_t::~zmq_init_t ()
{
if (engine)
delete engine;
}
bool zmq::zmq_init_t::read (::zmq_msg *msg_)
{
// On the listening side, no initialisation data are sent to the peer.
if (!connected)
return false;
// Send identity.
int rc = zmq_msg_init_size (msg_, options.identity.size ());
zmq_assert (rc == 0);
memcpy (zmq_msg_data (msg_), options.identity.c_str (),
options.identity.size ());
// Initialisation is done.
create_session ();
return true;
}
bool zmq::zmq_init_t::write (::zmq_msg *msg_)
{
// On the connecting side no initialisation data are expected.
if (connected)
return false;
// Retreieve the identity.
options.identity = std::string ((const char*) zmq_msg_data (msg_),
zmq_msg_size (msg_));
// Initialisation is done.
create_session ();
return true;
}
void zmq::zmq_init_t::flush ()
{
// No need to do anything. zmq_init_t does no batching of messages.
// Each message is processed immediately on write.
}
void zmq::zmq_init_t::process_plug ()
{
engine->plug (this);
owned_t::process_plug ();
}
void zmq::zmq_init_t::process_unplug ()
{
engine->unplug ();
}
void zmq::zmq_init_t::create_session ()
{
// Disconnect engine from the init object.
engine->unplug ();
// Create the session instance.
io_thread_t *io_thread = choose_io_thread (options.affinity);
session_t *session = new session_t (io_thread, owner, engine);
zmq_assert (session);
engine = NULL;
// Pass session/engine pair to a chosen I/O thread.
send_plug (session);
send_own (owner, session);
// Destroy the init object.
term ();
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_ZMQ_INIT_HPP_INCLUDED__
#define __ZMQ_ZMQ_INIT_HPP_INCLUDED__
#include <string>
#include "i_inout.hpp"
#include "owned.hpp"
#include "zmq_engine.hpp"
#include "stdint.hpp"
#include "fd.hpp"
#include "options.hpp"
namespace zmq
{
// The class handles initialisation phase of native 0MQ wire-level
// protocol. Currently it can be used to handle both sides of the
// connection. If it grows to complex, we can separate the two into
// distinct classes.
class zmq_init_t : public owned_t, public i_inout
{
public:
// Set 'connected' to true if the connection was created by 'connect'
// function. If it was accepted from a listening socket, set it to
// false.
zmq_init_t (class io_thread_t *parent_, object_t *owner_, fd_t fd_,
bool connected_, const options_t &options);
~zmq_init_t ();
private:
// i_inout interface implementation.
bool read (::zmq_msg *msg_);
bool write (::zmq_msg *msg_);
void flush ();
// Handlers for incoming commands.
void process_plug ();
void process_unplug ();
void create_session ();
// Engine is created by zmq_init_t object. Once the initialisation
// phase is over it is passed to a session object, possibly running
// in a different I/O thread.
zmq_engine_t *engine;
// If true, we are on the connecting side. If false, we are on the
// listening side.
bool connected;
// Associated socket options.
options_t options;
zmq_init_t (const zmq_init_t&);
void operator = (const zmq_init_t&);
};
}
#endif
...@@ -18,12 +18,15 @@ ...@@ -18,12 +18,15 @@
*/ */
#include "zmq_listener.hpp" #include "zmq_listener.hpp"
#include "zmq_engine.hpp" #include "zmq_init.hpp"
#include "io_thread.hpp" #include "io_thread.hpp"
#include "err.hpp" #include "err.hpp"
zmq::zmq_listener_t::zmq_listener_t (io_thread_t *parent_, object_t *owner_) : zmq::zmq_listener_t::zmq_listener_t (io_thread_t *parent_, object_t *owner_,
io_object_t (parent_, owner_) const options_t &options_) :
owned_t (parent_, owner_),
io_object_t (parent_),
options (options_)
{ {
} }
...@@ -46,7 +49,7 @@ void zmq::zmq_listener_t::process_plug () ...@@ -46,7 +49,7 @@ void zmq::zmq_listener_t::process_plug ()
handle = add_fd (tcp_listener.get_fd ()); handle = add_fd (tcp_listener.get_fd ());
set_pollin (handle); set_pollin (handle);
io_object_t::process_plug (); owned_t::process_plug ();
} }
void zmq::zmq_listener_t::process_unplug () void zmq::zmq_listener_t::process_unplug ()
...@@ -63,14 +66,12 @@ void zmq::zmq_listener_t::in_event () ...@@ -63,14 +66,12 @@ void zmq::zmq_listener_t::in_event ()
if (fd == retired_fd) if (fd == retired_fd)
return; return;
// TODO // Create an init object.
zmq_assert (false); io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_init_t *init = new zmq_init_t (io_thread, owner, fd, false, options);
/* zmq_assert (init);
object_t *engine = new zmq_engine_t (choose_io_thread (0), owner); send_plug (init);
send_plug (engine); send_own (owner, init);
send_own (owner, engine);
*/
} }
......
...@@ -20,17 +20,21 @@ ...@@ -20,17 +20,21 @@
#ifndef __ZMQ_ZMQ_LISTENER_HPP_INCLUDED__ #ifndef __ZMQ_ZMQ_LISTENER_HPP_INCLUDED__
#define __ZMQ_ZMQ_LISTENER_HPP_INCLUDED__ #define __ZMQ_ZMQ_LISTENER_HPP_INCLUDED__
#include "owned.hpp"
#include "io_object.hpp" #include "io_object.hpp"
#include "tcp_listener.hpp" #include "tcp_listener.hpp"
#include "options.hpp"
#include "stdint.hpp"
namespace zmq namespace zmq
{ {
class zmq_listener_t : public io_object_t class zmq_listener_t : public owned_t, public io_object_t
{ {
public: public:
zmq_listener_t (class io_thread_t *parent_, object_t *owner_); zmq_listener_t (class io_thread_t *parent_, object_t *owner_,
const options_t &options_);
// Set IP address to listen on. // Set IP address to listen on.
int set_address (const char *addr_); int set_address (const char *addr_);
...@@ -52,6 +56,9 @@ namespace zmq ...@@ -52,6 +56,9 @@ namespace zmq
// Handle corresponding to the listening socket. // Handle corresponding to the listening socket.
handle_t handle; handle_t handle;
// Associated socket options.
options_t options;
zmq_listener_t (const zmq_listener_t&); zmq_listener_t (const zmq_listener_t&);
void operator = (const zmq_listener_t&); void operator = (const zmq_listener_t&);
}; };
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment