Commit 74d344ca authored by Pieter Hintjens's avatar Pieter Hintjens

Merge pull request #924 from bebopagogo/master

NORM protocol extension
parents 72c02b15 33f22d0d
......@@ -463,6 +463,28 @@ fi
AC_SUBST(pgm_basename)
# This uses "--with-norm" to point to the "norm" directory
# for "norm/include" and "norm/lib"
#(if "--with-norm=yes" is given, then assume installed on system)
AC_ARG_WITH([norm], [AS_HELP_STRING([--with-norm],
[build libzmq with NORM protocol extension, optionally specifying norm path [default=no]])],
[with_norm_ext=$withval], [with_norm_ext=no])
AC_MSG_CHECKING("with_norm_ext = ${with_norm_ext}")
if test "x$with_norm_ext" != "xno"; then
AC_DEFINE(ZMQ_HAVE_NORM, 1, [Have NORM protocol extension])
if test "x$wwith_norm_ext" != "xyes"; then
norm_path="${with_norm_ext}"
LIBZMQ_EXTRA_CXXFLAGS="-I${norm_path}/include ${LIBZMQ_EXTRA_CXXFLAGS}"
LIBZMQ_EXTRA_LDFLAGS="-I${norm_path}/include ${LIBZMQ_EXTRA_LDFLAGS}"
fi
LIBS="-lnorm $LIBS"
fi
# Set -Wall, -Werror and -pedantic
AC_LANG_PUSH([C++])
......
......@@ -45,6 +45,7 @@ libzmq_la_SOURCES = \
msg.hpp \
mtrie.hpp \
mutex.hpp \
norm_engine.hpp \
null_mechanism.hpp \
object.hpp \
options.hpp \
......@@ -112,6 +113,7 @@ libzmq_la_SOURCES = \
mechanism.cpp \
msg.cpp \
mtrie.cpp \
norm_engine.cpp \
null_mechanism.cpp \
object.cpp \
options.cpp \
......
This diff is collapsed.
#ifndef __ZMQ_NORM_ENGINE_HPP_INCLUDED__
#define __ZMQ_NORM_ENGINE_HPP_INCLUDED__
#define ZMQ_HAVE_NORM 1
#if defined ZMQ_HAVE_NORM
#include "io_object.hpp"
#include "i_engine.hpp"
#include "options.hpp"
#include "v2_decoder.hpp"
#include "v2_encoder.hpp"
#include <norm/include/normApi.h>
namespace zmq
{
class io_thread_t;
class session_base_t;
class norm_engine_t : public io_object_t, public i_engine
{
public:
norm_engine_t (zmq::io_thread_t *parent_, const options_t &options_);
~norm_engine_t ();
// create NORM instance, session, etc
int init(const char* network_, bool send, bool recv);
void shutdown();
// i_engine interface implementation.
// Plug the engine to the session.
virtual void plug (zmq::io_thread_t *io_thread_,
class session_base_t *session_);
// Terminate and deallocate the engine. Note that 'detached'
// events are not fired on termination.
virtual void terminate ();
// This method is called by the session to signalise that more
// messages can be written to the pipe.
virtual void restart_input ();
// This method is called by the session to signalise that there
// are messages to send available.
virtual void restart_output ();
virtual void zap_msg_available () {};
// i_poll_events interface implementation.
// (we only need in_event() for NormEvent notification)
// (i.e., don't have any output events or timers (yet))
void in_event ();
private:
void unplug();
void send_data();
void recv_data(NormObjectHandle stream);
enum {BUFFER_SIZE = 2048};
// Used to keep track of streams from multiple senders
class NormRxStreamState
{
public:
NormRxStreamState(NormObjectHandle normStream,
int64_t maxMsgSize);
~NormRxStreamState();
NormObjectHandle GetStreamHandle() const
{return norm_stream;}
bool Init();
void SetRxReady(bool state)
{rx_ready = state;}
bool IsRxReady() const
{return rx_ready;}
void SetSync(bool state)
{in_sync = state;}
bool InSync() const
{return in_sync;}
// These are used to feed data to decoder
// and its underlying "msg" buffer
char* AccessBuffer()
{return (char*)(buffer_ptr + buffer_count);}
size_t GetBytesNeeded() const
{return (buffer_size - buffer_count);}
void IncrementBufferCount(size_t count)
{buffer_count += count;}
msg_t* AccessMsg()
{return zmq_decoder->msg();}
// This invokes the decoder "decode" method
// returning 0 if more data is needed,
// 1 if the message is complete, If an error
// occurs the 'sync' is dropped and the
// decoder re-initialized
int Decode();
class List
{
public:
List();
~List();
void Append(NormRxStreamState& item);
void Remove(NormRxStreamState& item);
bool IsEmpty() const
{return (NULL == head);}
void Destroy();
class Iterator
{
public:
Iterator(const List& list);
NormRxStreamState* GetNextItem();
private:
NormRxStreamState* next_item;
};
friend class Iterator;
private:
NormRxStreamState* head;
NormRxStreamState* tail;
}; // end class zmq::norm_engine_t::NormRxStreamState::List
friend class List;
List* AccessList()
{return list;}
private:
NormObjectHandle norm_stream;
int64_t max_msg_size;
bool in_sync;
bool rx_ready;
v2_decoder_t* zmq_decoder;
bool skip_norm_sync;
unsigned char* buffer_ptr;
size_t buffer_size;
size_t buffer_count;
NormRxStreamState* prev;
NormRxStreamState* next;
NormRxStreamState::List* list;
}; // end class zmq::norm_engine_t::NormRxStreamState
session_base_t* zmq_session;
options_t options;
NormInstanceHandle norm_instance;
handle_t norm_descriptor_handle;
NormSessionHandle norm_session;
bool is_sender;
bool is_receiver;
// Sender state
msg_t tx_msg;
v2_encoder_t zmq_encoder; // for tx messages (we use v2 for now)
bool tx_more_bit;
bool zmq_output_ready; // zmq has msg(s) to send
NormObjectHandle norm_tx_stream;
bool norm_tx_ready; // norm has tx queue vacancy
// tbd - maybe don't need buffer if can access encoder buffer directly?
char tx_buffer[BUFFER_SIZE];
unsigned int tx_index;
unsigned int tx_len;
// Receiver state
// Lists of norm rx streams from remote senders
bool zmq_input_ready; // zmq ready to receive msg(s)
NormRxStreamState::List rx_pending_list; // rx streams waiting for data reception
NormRxStreamState::List rx_ready_list; // rx streams ready for NormStreamRead()
NormRxStreamState::List msg_ready_list; // rx streams w/ msg ready for push to zmq
}; // end class norm_engine_t
}
#endif // ZMQ_HAVE_NORM
#endif // !__ZMQ_NORM_ENGINE_HPP_INCLUDED__
......@@ -28,6 +28,7 @@
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
#include "address.hpp"
#include "norm_engine.hpp"
#include "ctx.hpp"
#include "req.hpp"
......@@ -449,8 +450,9 @@ void zmq::session_base_t::reconnect ()
{
// For delayed connect situations, terminate the pipe
// and reestablish later on
if (pipe && options.immediate == 1
&& addr->protocol != "pgm" && addr->protocol != "epgm") {
if (pipe && 1 == options.immediate == 1
&& addr->protocol != "pgm" && addr->protocol != "epgm"
&& addr->protocol != "norm") {
pipe->hiccup ();
pipe->terminate (false);
terminating_pipes.insert (pipe);
......@@ -549,6 +551,38 @@ void zmq::session_base_t::start_connecting (bool wait_)
return;
}
#endif
#ifdef ZMQ_HAVE_NORM
if (addr->protocol == "norm")
{
// At this point we'll create message pipes to the session straight
// away. There's no point in delaying it as no concept of 'connect'
// exists with NORM anyway.
if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
// NORM sender.
norm_engine_t* norm_sender = new (std::nothrow) norm_engine_t(io_thread, options);
alloc_assert (norm_sender);
int rc = norm_sender->init (addr->address.c_str (), true, false);
errno_assert (rc == 0);
send_attach (this, norm_sender);
}
else { // ZMQ_SUB or ZMQ_XSUB
// NORM receiver.
norm_engine_t* norm_receiver = new (std::nothrow) norm_engine_t (io_thread, options);
alloc_assert (norm_receiver);
int rc = norm_receiver->init (addr->address.c_str (), false, true);
errno_assert (rc == 0);
send_attach (this, norm_receiver);
}
return;
}
#endif // ZMQ_HAVE_NORM
zmq_assert (false);
}
......
......@@ -190,11 +190,11 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
{
// First check out whether the protcol is something we are aware of.
if (protocol_ != "inproc" && protocol_ != "ipc" && protocol_ != "tcp" &&
protocol_ != "pgm" && protocol_ != "epgm" && protocol_ != "tipc") {
protocol_ != "pgm" && protocol_ != "epgm" && protocol_ != "tipc" &&
protocol_ != "norm") {
errno = EPROTONOSUPPORT;
return -1;
}
// If 0MQ is not compiled with OpenPGM, pgm and epgm transports
// are not avaialble.
#if !defined ZMQ_HAVE_OPENPGM
......@@ -203,6 +203,13 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
return -1;
}
#endif
#if !defined ZMQ_HAVE_NORM
if (protocol_ == "norm") {
errno = EPROTONOSUPPORT;
return -1;
}
#endif // !ZMQ_HAVE_NORM
// IPC transport is not available on Windows and OpenVMS.
#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
......@@ -224,7 +231,7 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
// Check whether socket type and transport protocol match.
// Specifically, multicast protocols can't be combined with
// bi-directional messaging patterns (socket types).
if ((protocol_ == "pgm" || protocol_ == "epgm") &&
if ((protocol_ == "pgm" || protocol_ == "epgm" || protocol_ == "norm") &&
options.type != ZMQ_PUB && options.type != ZMQ_SUB &&
options.type != ZMQ_XPUB && options.type != ZMQ_XSUB) {
errno = ENOCOMPATPROTO;
......@@ -362,9 +369,9 @@ int zmq::socket_base_t::bind (const char *addr_)
return rc;
}
if (protocol == "pgm" || protocol == "epgm") {
if (protocol == "pgm" || protocol == "epgm" || protocol == "norm") {
// For convenience's sake, bind can be used interchageable with
// connect for PGM and EPGM transports.
// connect for PGM, EPGM and NORM transports.
return connect (addr_);
}
......@@ -600,6 +607,9 @@ int zmq::socket_base_t::connect (const char *addr_)
}
}
#endif
// TBD - Should we check address for ZMQ_HAVE_NORM???
#ifdef ZMQ_HAVE_OPENPGM
if (protocol == "pgm" || protocol == "epgm") {
struct pgm_addrinfo_t *res = NULL;
......@@ -630,8 +640,8 @@ int zmq::socket_base_t::connect (const char *addr_)
errno_assert (session);
// PGM does not support subscription forwarding; ask for all data to be
// sent to this pipe.
bool subscribe_to_all = protocol == "pgm" || protocol == "epgm";
// sent to this pipe. (same for NORM, currently?)
bool subscribe_to_all = protocol == "pgm" || protocol == "epgm" || protocol == "norm";
pipe_t *newpipe = NULL;
if (options.immediate != 1 || subscribe_to_all) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment