Commit 4ed70a93 authored by Martin Sustrik's avatar Martin Sustrik

initial commit

parents
include_HEADERS = include/zs.h include/zs.hpp
SUBDIRS = src examples
DIST_SUBDIRS = src examples
#!/bin/sh
# Copyright (c) 2007 FastMQ Inc.
#
# This file is part of 0MQ.
#
# 0MQ is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
# 0MQ is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# Script to generate all required files from fresh svn checkout.
autoreconf --install --force --verbose -I config
if [ $? -ne 0 ]; then
echo
echo "Could not run autoreconf, check autotools installation."
echo
fi
# -*- Autoconf -*-
# Process this file with autoconf to produce a configure script.
AC_PREREQ(2.61)
AC_INIT([zsock],[dev])
AC_CONFIG_AUX_DIR(config)
AM_CONFIG_HEADER(src/platform.hpp)
AM_INIT_AUTOMAKE(AC_PACKAGE_NAME, AC_PACKAGE_VERSION)
AM_PROG_CC_C_O
# Checks for programs.
AC_PROG_CXX
AC_PROG_LIBTOOL
# Checks for libraries.
AC_CHECK_LIB(pthread, pthread_create)
# Host speciffic checks
AC_CANONICAL_HOST
case "${host_os}" in
*linux*)
AC_DEFINE(ZS_HAVE_LINUX, 1, [Have Linux OS])
CPPFLAGS="-D_REENTRANT $CPPFLAGS"
sed < libtool > libtool-2 \
's/^hardcode_libdir_flag_spec.*$'/'hardcode_libdir_flag_spec=" "/'
mv libtool-2 libtool
chmod 755 libtool
AC_CHECK_LIB(uuid, uuid_generate)
;;
*solaris*)
AC_DEFINE(ZS_HAVE_SOLARIS, 1, [Have Solaris OS])
AC_CHECK_LIB(socket, main)
AC_CHECK_LIB(nsl, main)
AC_CHECK_LIB(rt, main)
CPPFLAGS="-D_REENTRANT -D_PTHREADS $CPPFLAGS"
AC_MSG_CHECKING([wheter atomic operations can be used])
AC_COMPILE_IFELSE([AC_LANG_PROGRAM(
[[#include <atomic.h>]],
[[uint32_t value;
atomic_cas_32 (&value, 0, 0);
return 0;]])],
[solaris_has_atomic=yes],
[solaris_has_atomic=no])
AC_MSG_RESULT([$solaris_has_atomic])
# Solaris 8 does not have atomic operations exported to user space.
if test "x$solaris_has_atomic" = "xno"; then
AC_DEFINE(ZS_FORCE_MUTEXES, 1, [Force to use mutexes])
fi
;;
*freebsd*)
AC_DEFINE(ZS_HAVE_FREEBSD, 1, [Have FreeBSD OS])
CPPFLAGS="-D_THREAD_SAFE $CPPFLAGS"
LIBS="-pthread"
;;
*darwin*)
AC_DEFINE(ZS_HAVE_OSX, 1, [Have DarwinOSX OS])
LIBS="-pthread"
ZS_EXTRA_CXXFLAGS+="-Wno-uninitialized"
;;
*openbsd*)
AC_DEFINE(ZS_HAVE_OPENBSD, 1, [Have OpenBSD OS])
CPPFLAGS="-pthread $CPPFLAGS"
LIBS="-pthread"
;;
*nto-qnx*)
AC_DEFINE(ZS_HAVE_QNXNTO, 1, [Have QNX Neutrino OS])
CPPFLAGS="-D_THREAD_SAFE $CPPFLAGS"
AC_CHECK_LIB(socket,main)
;;
*aix*)
AC_DEFINE(ZS_HAVE_AIX, 1, [Have AIX OS])
if test "x$GXX" = "xyes"; then
CPPFLAGS="-D_THREAD_SAFE $CPPFLAGS"
fi
;;
*hpux*)
AC_DEFINE(ZS_HAVE_HPUX, 1, [Have HPUX OS])
if test "x$GXX" = "xyes"; then
CPPFLAGS="-D_THREAD_SAFE $CPPFLAGS"
fi
AC_CHECK_LIB(rt, main)
sed < libtool > libtool-2 \
's/^hardcode_libdir_flag_spec.*$'/'hardcode_libdir_flag_spec=" "/'
mv libtool-2 libtool
chmod 755 libtool
;;
*mingw32*)
AC_DEFINE(ZS_HAVE_WINDOWS, 1, [Have Windows OS])
AC_DEFINE(ZS_HAVE_MINGW32, 1, [Have MinGW32])
AC_CHECK_HEADERS(windows.h)
LIBS="-lwsock32 -lws2_32 -no-undefined"
CFLAGS="-std=c99"
install_man="no"
;;
*)
AC_MSG_ERROR([Not supported os: $host.])
;;
esac
# Check if we are running at sparc harware
AC_MSG_CHECKING([wheter __sparc__ is defined])
AC_COMPILE_IFELSE([AC_LANG_PROGRAM(
[[#if defined __sparc__
//OK we are on sparc
#else
error: we are not on sparc
#endif
]])],
[sparc=yes],
[sparc=no])
AC_MSG_RESULT([$sparc])
if test "x$sparc" = "xyes"; then
CPPFLAGS="$CPPFLAGS -mcpu=v9"
fi
# Checks for header files.
AC_HEADER_STDC
AC_CHECK_HEADERS(errno.h arpa/inet.h netinet/tcp.h netinet/in.h stddef.h \
stdlib.h string.h sys/socket.h sys/time.h unistd.h limits.h)
# Check if we have eventfd.h header file.
AC_CHECK_HEADERS(sys/eventfd.h, [AC_DEFINE(ZS_HAVE_EVENTFD, 1, [Have eventfd extension.])])
# Check if we have ifaddrs.h header file.
AC_CHECK_HEADERS(ifaddrs.h, [AC_DEFINE(ZS_HAVE_IFADDRS, 1, [Have ifaddrs.h header.])])
# Use c++ in subsequent tests
AC_LANG(C++)
# Optional stuff
AC_CHECK_PROG(have_pkg_config, pkg-config, yes, no)
if test "x$have_pkg_config" != "xno"; then
# First instance of PKG_CHECK_ has to be executed
PKG_CHECK_EXISTS([dummy_pkg], [], [])
fi
# Checks for typedefs, structures, and compiler characteristics.
AC_HEADER_STDBOOL
AC_C_CONST
AC_C_INLINE
AC_TYPE_SIZE_T
AC_TYPE_SSIZE_T
AC_HEADER_TIME
AC_TYPE_UINT32_T
AC_C_VOLATILE
# Substs
stdint="0"
if test "x$HAVE_STDINT_H" = "xyes"; then
stdint="1"
fi
inttypes="0"
if test "x$HAVE_INTTYPES_H" = "xyes"; then
inttypes="1"
fi
AC_SUBST(stdint)
AC_SUBST(inttypes)
# Subst ZS_EXTRA_CXXFLAGS
AC_SUBST(ZS_EXTRA_CXXFLAGS)
# Checks for library functions.
AC_FUNC_MALLOC
AC_TYPE_SIGNAL
AC_CHECK_FUNCS(perror gettimeofday memset socket getifaddrs freeifaddrs)
AC_OUTPUT(Makefile src/Makefile examples/Makefile examples/chat/Makefile)
AC_MSG_RESULT([])
AC_MSG_RESULT([ ******************************************************** ])
AC_MSG_RESULT([ 0SOCKETS ])
AC_MSG_RESULT([ ******************************************************** ])
AC_MSG_RESULT([ This software is distributed under the terms and ])
AC_MSG_RESULT([ conditions of the LESSER GNU GENERAL PUBLIC LICENSE. ])
AC_MSG_RESULT([ See the file COPYING and COPYING.LESSER for the full ])
AC_MSG_RESULT([ license text. ])
AC_MSG_RESULT([ ******************************************************** ])
AC_MSG_RESULT([])
AC_MSG_RESULT([ zsock install dir: $prefix])
AC_MSG_RESULT([])
SUBDIRS = chat
DIST_SUBDIRS = chat
INCLUDES = -I$(top_builddir) -I$(top_builddir)/include
noinst_PROGRAMS = chatroom display prompt
chatroom_SOURCES = chatroom.cpp
chatroom_LDADD = $(top_builddir)/src/libzs.la
chatroom_CXXFLAGS = -Wall -pedantic -Werror
display_SOURCES = display.cpp
display_LDADD = $(top_builddir)/src/libzs.la
display_CXXFLAGS = -Wall -pedantic -Werror
prompt_SOURCES = prompt.cpp
prompt_LDADD = $(top_builddir)/src/libzs.la
prompt_CXXFLAGS = -Wall -pedantic -Werror
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <time.h>
#include <string.h>
#include <iostream>
using namespace std;
#include <zs.hpp>
int main (int argc, const char *argv [])
{
// Check the command line syntax
if (argc != 3) {
cerr << "usage: chatroom <in-interface> <out-interface>" << endl;
return 1;
}
// Retrieve command line arguments
const char *in_interface = argv [1];
const char *out_interface = argv [2];
// Initialise 0MQ infrastructure
zs::context_t ctx (1, 1);
// Create two sockets. One for receiving messages from 'propmt'
// applications, one for sending messages to 'display' applications
zs::socket_t in_socket (ctx, ZS_SUB);
in_socket.bind (in_interface);
zs::socket_t out_socket (ctx, ZS_PUB);
out_socket.bind (out_interface);
while (true) {
// Get a message
zs::message_t in_message;
in_socket.recv (&in_message);
// Get the current time. Replace the newline character at the end
// by space character.
char timebuf [256];
time_t current_time;
time (&current_time);
snprintf (timebuf, 256, "%s", ctime (&current_time));
timebuf [strlen (timebuf) - 1] = ' ';
// Create and fill in the message
zs::message_t out_message (strlen (timebuf) + in_message.size ());
char *data = (char*) out_message.data ();
memcpy (data, timebuf, strlen (timebuf));
data += strlen (timebuf);
memcpy (data, in_message.data (), in_message.size ());
// Send the message
out_socket.send (out_message);
}
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <string.h>
#include <string>
#include <iostream>
using namespace std;
#include <zs.hpp>
int main (int argc, const char *argv [])
{
// Check the command line syntax.
if (argc != 2) {
cerr << "usage: display <chatroom-out-address>" << endl;
return 1;
}
// Retrieve command line arguments
const char *chatroom_out_address = argv [1];
// Initialise 0MQ infrastructure, connect to the chatroom and ask for all
// messages and gap notifications.
zs::context_t ctx (1, 1);
zs::socket_t s (ctx, ZS_SUB);
s.connect (chatroom_out_address);
s.subscribe ("*");
while (true) {
// Get a message and print it to the console.
zs::message_t message;
s.recv (&message);
if (message.type () == zs::message_gap)
cout << "Problems connecting to the chatroom..." << endl;
else
cout << (char*) message.data () << flush;
}
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <string.h>
#include <string>
#include <iostream>
using namespace std;
#include <zs.hpp>
int main (int argc, const char *argv [])
{
// Check the command line syntax.
if (argc != 3) {
cerr << "usage: prompt <chatroom-in-address> <user name>" << endl;
return 1;
}
// Retrieve command line arguments
const char *chatroom_in_address = argv [1];
const char *user_name = argv [2];
// Initialise 0MQ infrastructure and connect to the chatroom.
zs::context_t ctx (1, 1);
zs::socket_t s (ctx, ZS_PUB);
s.connect (chatroom_in_address);
while (true) {
// Allow user to input the message text. Prepend it by user name.
char textbuf [1024];
char *rcc = fgets (textbuf, sizeof (textbuf), stdin);
assert (rcc);
string text (user_name);
text = text + ": " + textbuf;
// Create the message (terminating zero is part of the message)
zs::message_t message (text.size () + 1);
memcpy (message.data (), text.c_str (), text.size () + 1);
// Send the message
s.send (message);
}
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZSOCKETS_H_INCLUDED__
#define __ZSOCKETS_H_INCLUDED__
#ifdef __cplusplus
extern "C" {
#endif
#include <stddef.h>
#include <stdint.h>
#if defined MSC_VER && defined ZS_BUILDING_LIBZS
#define ZS_EXPORT __declspec(dllexport)
#else
#define ZS_EXPORT
#endif
// Maximal size of "Very Small Message". VSMs are passed by value
// to avoid excessive memory allocation/deallocation.
#define ZS_MAX_VSM_SIZE 30
// Message & notification types.
#define ZS_GAP 1
#define ZS_DELIMITER 31
#define ZS_VSM 32
// The operation should be performed in non-blocking mode. I.e. if it cannot
// be processed immediately, error should be returned with errno set to EAGAIN.
#define ZS_NOBLOCK 1
// zs_send should not flush the message downstream immediately. Instead, it
// should batch ZS_NOFLUSH messages and send them downstream only when zs_flush
// is invoked. This is an optimisation for cases where several messages are
// sent in a single business transaction. However, the effect is measurable
// only in extremely high-perf scenarios (million messages a second or so).
// If that's not your case, use standard flushing send instead. See exchange
// example for illustration of ZS_NOFLUSH functionality.
#define ZS_NOFLUSH 2
// Socket to communicate with a single peer. Allows for a singe connect or a
// single accept. There's no message routing or message filtering involved.
#define ZS_P2P 0
// Socket to distribute data. Recv fuction is not implemeted for this socket
// type. Messages are distributed in fanout fashion to all peers.
#define ZS_PUB 1
// Socket to subscribe to distributed data. Send function is not implemented
// for this socket type. However, subscribe function can be used to modify the
// message filter.
#define ZS_SUB 2
// Socket to send requests on and receive replies from. Requests are
// load-balanced among all the peers. This socket type doesn't allow for more
// recv's that there were send's.
#define ZS_REQ 3
// Socket to receive requests from and send replies to. This socket type allows
// only an alternated sequence of recv's and send's. Each send is routed to
// the peer that the previous recv delivered message from.
#define ZS_REP 4
// Prototype for the message body deallocation functions.
// It is deliberately defined in the way to comply with standard C free.
typedef void (zs_free_fn) (void *data);
// A message. If 'shared' is true, message content pointed to by 'content'
// is shared, i.e. reference counting is used to manage its lifetime
// rather than straighforward malloc/free. struct zs_msg_content is
// not declared in the API.
struct zs_msg
{
struct zs_msg_content *content;
unsigned char shared;
uint16_t vsm_size;
unsigned char vsm_data [ZS_MAX_VSM_SIZE];
};
// TODO: Different options...
struct zs_opts
{
uint64_t hwm;
uint64_t lwm;
uint64_t swap;
uint64_t mask;
uint64_t taskset;
const char *identity;
const char *args;
};
// Initialise an empty message (zero bytes long).
ZS_EXPORT int zs_msg_init (zs_msg *msg);
// Initialise a message 'size' bytes long.
//
// Errors: ENOMEM - the size is too large to allocate.
ZS_EXPORT int zs_msg_init_size (zs_msg *msg, size_t size);
// Initialise a message from an existing buffer. Message isn't copied,
// instead 0SOCKETS infrastructure take ownership of the buffer and call
// deallocation functio (ffn) once it's not needed anymore.
ZS_EXPORT int zs_msg_init_data (zs_msg *msg, void *data, size_t size,
zs_free_fn *ffn);
// Deallocate the message.
ZS_EXPORT int zs_msg_close (zs_msg *msg);
// Move the content of the message from 'src' to 'dest'. The content isn't
// copied, just moved. 'src' is an empty message after the call. Original
// content of 'dest' message is deallocated.
ZS_EXPORT int zs_msg_move (zs_msg *dest, zs_msg *src);
// Copy the 'src' message to 'dest'. The content isn't copied, instead
// reference count is increased. Don't modify the message data after the
// call as they are shared between two messages. Original content of 'dest'
// message is deallocated.
ZS_EXPORT int zs_msg_copy (zs_msg *dest, zs_msg *src);
// Returns pointer to message data.
ZS_EXPORT void *zs_msg_data (zs_msg *msg);
// Return size of message data (in bytes).
ZS_EXPORT size_t zs_msg_size (zs_msg *msg);
// Returns type of the message.
ZS_EXPORT int zs_msg_type (zs_msg *msg);
// Initialise 0SOCKETS context. 'app_threads' specifies maximal number
// of application threads that can have open sockets at the same time.
// 'io_threads' specifies the size of thread pool to handle I/O operations.
//
// Errors: EINVAL - one of the arguments is less than zero or there are no
// threads declared at all.
ZS_EXPORT void *zs_init (int app_threads, int io_threads);
// Deinitialise 0SOCKETS context including all the open sockets. Closing
// sockets after zs_term has been called will result in undefined behaviour.
ZS_EXPORT int zs_term (void *context);
// Open a socket.
//
// Errors: EINVAL - invalid socket type.
// EMFILE - the number of application threads entitled to hold open
// sockets at the same time was exceeded.
ZS_EXPORT void *zs_socket (void *context, int type);
// Close the socket.
ZS_EXPORT int zs_close (void *s);
// Bind the socket to a particular address.
ZS_EXPORT int zs_bind (void *s, const char *addr, zs_opts *opts);
// Connect the socket to a particular address.
ZS_EXPORT int zs_connect (void *s, const char *addr, zs_opts *opts);
// Subscribe for the subset of messages identified by 'criteria' argument.
ZS_EXPORT int zs_subscribe (void *s, const char *criteria);
// Send the message 'msg' to the socket 's'. 'flags' argument can be
// combination of following values:
// ZS_NOBLOCK - if message cannot be sent, return immediately.
// ZS_NOFLUSH - message won't be sent immediately. It'll be sent with either
// subsequent flushing send or explicit call to zs_flush function.
//
// Errors: EAGAIN - message cannot be sent at the moment (applies only to
// non-blocking send).
// ENOTSUP - function isn't supported by particular socket type.
ZS_EXPORT int zs_send (void *s, zs_msg *msg, int flags);
// Flush the messages that were send using ZS_NOFLUSH flag down the stream.
//
// Errors: ENOTSUP - function isn't supported by particular socket type.
ZS_EXPORT int zs_flush (void *s);
// Send a message from the socket 's'. 'flags' argument can be combination
// of following values:
// ZS_NOBLOCK - if message cannot be received, return immediately.
//
// Errors: EAGAIN - message cannot be received at the moment (applies only to
// non-blocking receive).
// ENOTSUP - function isn't supported by particular socket type.
ZS_EXPORT int zs_recv (void *s, zs_msg *msg, int flags);
#ifdef __cplusplus
}
#endif
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZSOCKETS_HPP_INCLUDED__
#define __ZSOCKETS_HPP_INCLUDED__
#include "zs.h"
#include <assert.h>
namespace zs
{
typedef zs_free_fn free_fn;
enum message_type_t
{
message_data = 1 << 0,
message_gap = 1 << ZS_GAP,
message_delimiter = 1 << ZS_DELIMITER
};
// A message. Caution: Don't change the body of the message once you've
// copied it - the behaviour is undefined. Don't change the body of the
// received message either - other threads may be accessing it in parallel.
class message_t : private zs_msg
{
friend class socket_t;
public:
// Creates message size_ bytes long.
inline message_t (size_t size_ = 0)
{
int rc = zs_msg_init_size (this, size_);
assert (rc == 0);
}
// Creates message from the supplied buffer. 0MQ takes care of
// deallocating the buffer once it is not needed. The deallocation
// function is supplied in ffn_ parameter. If ffn_ is NULL, no
// deallocation happens - this is useful for sending static buffers.
inline message_t (void *data_, size_t size_,
free_fn *ffn_)
{
int rc = zs_msg_init_data (this, data_, size_, ffn_);
assert (rc == 0);
}
// Destroys the message.
inline ~message_t ()
{
int rc = zs_msg_close (this);
assert (rc == 0);
}
// Destroys old content of the message and allocates buffer for the
// new message body. Having this as a separate function allows user
// to reuse once-allocated message for multiple times.
inline void rebuild (size_t size_)
{
int rc = zs_msg_close (this);
assert (rc == 0);
rc = zs_msg_init_size (this, size_);
assert (rc == 0);
}
// Same as above, however, the message is rebuilt from the supplied
// buffer. See appropriate constructor for discussion of buffer
// deallocation mechanism.
inline void rebuild (void *data_, size_t size_, free_fn *ffn_)
{
int rc = zs_msg_close (this);
assert (rc == 0);
rc = zs_msg_init_data (this, data_, size_, ffn_);
assert (rc == 0);
}
// Moves the message content from one message to the another. If the
// destination message have contained data prior to the operation
// these get deallocated. The source message will contain 0 bytes
// of data after the operation.
inline void move_to (message_t *msg_)
{
int rc = zs_msg_move (this, (zs_msg*) msg_);
assert (rc == 0);
}
// Copies the message content from one message to the another. If the
// destination message have contained data prior to the operation
// these get deallocated.
inline void copy_to (message_t *msg_)
{
int rc = zs_msg_copy (this, (zs_msg*) msg_);
assert (rc == 0);
}
// Returns message type.
inline message_type_t type ()
{
return (message_type_t) (1 << zs_msg_type (this));
}
// Returns pointer to message's data buffer.
inline void *data ()
{
return zs_msg_data (this);
}
// Returns the size of message data buffer.
inline size_t size ()
{
return zs_msg_size (this);
}
private:
// Disable implicit message copying, so that users won't use shared
// messages (less efficient) without being aware of the fact.
message_t (const message_t&);
void operator = (const message_t&);
};
class context_t
{
friend class socket_t;
public:
inline context_t (int app_threads_, int io_threads_)
{
ptr = zs_init (app_threads_, io_threads_);
assert (ptr);
}
inline ~context_t ()
{
int rc = zs_term (ptr);
assert (rc == 0);
}
private:
void *ptr;
// Disable copying.
context_t (const context_t&);
void operator = (const context_t&);
};
class socket_t
{
public:
inline socket_t (context_t &context_, int type_ = 0)
{
ptr = zs_socket (context_.ptr, type_);
assert (ptr);
}
inline ~socket_t ()
{
int rc = zs_close (ptr);
assert (rc == 0);
}
inline void bind (const char *addr_, zs_opts *opts_ = NULL)
{
int rc = zs_bind (ptr, addr_, opts_);
assert (rc == 0);
}
inline void connect (const char *addr_, zs_opts *opts_ = NULL)
{
int rc = zs_connect (ptr, addr_, opts_);
assert (rc == 0);
}
inline void subscribe (const char *criteria_)
{
int rc = zs_subscribe (ptr, criteria_);
assert (rc == 0);
}
inline void send (message_t &msg_, int flags_ = 0)
{
int rc = zs_send (ptr, &msg_, flags_);
assert (rc == 0);
}
inline void flush ()
{
int rc = zs_flush (ptr);
assert (rc == 0);
}
inline void recv (message_t *msg_, int flags_ = 0)
{
int rc = zs_recv (ptr, msg_, flags_);
assert (rc == 0);
}
private:
void *ptr;
// Disable copying.
socket_t (const socket_t&);
void operator = (const socket_t&);
};
}
#endif
lib_LTLIBRARIES = libzs.la
libzs_la_SOURCES = \
app_thread.hpp \
atomic_bitmap.hpp \
atomic_counter.hpp \
atomic_ptr.hpp \
command.hpp \
config.hpp \
connecter.hpp \
data_distributor.hpp \
decoder.hpp \
devpoll.hpp \
dispatcher.hpp \
dummy_aggregator.hpp \
dummy_distributor.hpp \
encoder.hpp \
epoll.hpp \
err.hpp \
fair_aggregator.hpp \
fd.hpp \
fd_signaler.hpp \
io_object.hpp \
io_thread.hpp \
ip.hpp \
i_api.hpp \
i_demux.hpp \
i_mux.hpp \
i_poller.hpp \
i_poll_events.hpp \
i_session.hpp \
i_signaler.hpp \
i_engine.hpp \
i_thread.hpp \
listener.hpp \
kqueue.hpp \
load_balancer.hpp \
msg.hpp \
mutex.hpp \
object.hpp \
p2p.hpp \
pipe.hpp \
pipe_reader.hpp \
pipe_writer.hpp \
platform.hpp \
poll.hpp \
pub.hpp \
rep.hpp \
req.hpp \
safe_object.hpp \
select.hpp \
session.hpp \
session_stub.hpp \
simple_semaphore.hpp \
socket_base.hpp \
sub.hpp \
stdint.hpp \
tcp_connecter.hpp \
tcp_listener.hpp \
tcp_socket.hpp \
thread.hpp \
uuid.hpp \
windows.hpp \
wire.hpp \
ypipe.hpp \
ypollset.hpp \
yqueue.hpp \
zmq_decoder.hpp \
zmq_encoder.hpp \
zmq_tcp_engine.hpp \
app_thread.cpp \
connecter.cpp \
data_distributor.cpp \
devpoll.hpp \
dispatcher.cpp \
dummy_aggregator.cpp \
dummy_distributor.cpp \
epoll.cpp \
err.cpp \
fair_aggregator.cpp \
fd_signaler.cpp \
io_object.cpp \
io_thread.cpp \
ip.cpp \
kqueue.cpp \
listener.cpp \
load_balancer.cpp \
object.cpp \
p2p.cpp \
pipe.cpp \
pipe_reader.cpp \
pipe_writer.cpp \
poll.cpp \
pub.cpp \
rep.cpp \
req.cpp \
safe_object.cpp \
select.cpp \
session.cpp \
session_stub.cpp \
socket_base.cpp \
sub.cpp \
tcp_connecter.cpp \
tcp_listener.cpp \
tcp_socket.cpp \
thread.cpp \
uuid.cpp \
ypollset.cpp \
zmq_decoder.cpp \
zmq_encoder.cpp \
zmq_tcp_engine.cpp \
zs.cpp
libzs_la_LDFLAGS = -version-info 0:0:0
libzs_la_CXXFLAGS = -Wall -pedantic -Werror @ZS_EXTRA_CXXFLAGS@
dist-hook:
-rm $(distdir)/src/platform.hpp
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zs.h"
#if defined ZS_HAVE_WINDOWS
#include "windows.hpp"
#else
#include <unistd.h>
#endif
#include "app_thread.hpp"
#include "dispatcher.hpp"
#include "err.hpp"
#include "session.hpp"
#include "pipe.hpp"
#include "config.hpp"
#include "i_api.hpp"
#include "dummy_aggregator.hpp"
#include "fair_aggregator.hpp"
#include "dummy_distributor.hpp"
#include "data_distributor.hpp"
#include "load_balancer.hpp"
#include "p2p.hpp"
#include "pub.hpp"
#include "sub.hpp"
#include "req.hpp"
#include "rep.hpp"
// If the RDTSC is available we use it to prevent excessive
// polling for commands. The nice thing here is that it will work on any
// system with x86 architecture and gcc or MSVC compiler.
#if (defined __GNUC__ && (defined __i386__ || defined __x86_64__)) ||\
(defined _MSC_VER && (defined _M_IX86 || defined _M_X64))
#define ZS_DELAY_COMMANDS
#endif
zs::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) :
object_t (dispatcher_, thread_slot_),
tid (0),
last_processing_time (0)
{
}
void zs::app_thread_t::shutdown ()
{
// Deallocate all the sessions associated with the thread.
while (!sessions.empty ())
sessions [0]->shutdown ();
delete this;
}
zs::app_thread_t::~app_thread_t ()
{
}
void zs::app_thread_t::attach_session (session_t *session_)
{
session_->set_index (sessions.size ());
sessions.push_back (session_);
}
void zs::app_thread_t::detach_session (session_t *session_)
{
// O(1) removal of the session from the list.
sessions_t::size_type i = session_->get_index ();
sessions [i] = sessions [sessions.size () - 1];
sessions [i]->set_index (i);
sessions.pop_back ();
}
zs::i_poller *zs::app_thread_t::get_poller ()
{
zs_assert (false);
}
zs::i_signaler *zs::app_thread_t::get_signaler ()
{
return &pollset;
}
bool zs::app_thread_t::is_current ()
{
return !sessions.empty () && tid == getpid ();
}
bool zs::app_thread_t::make_current ()
{
// If there are object managed by this slot we cannot assign the slot
// to a different thread.
if (!sessions.empty ())
return false;
tid = getpid ();
return true;
}
zs::i_api *zs::app_thread_t::create_socket (int type_)
{
i_mux *mux = NULL;
i_demux *demux = NULL;
session_t *session = NULL;
i_api *api = NULL;
switch (type_) {
case ZS_P2P:
mux = new dummy_aggregator_t;
zs_assert (mux);
demux = new dummy_distributor_t;
zs_assert (demux);
session = new session_t (this, this, mux, demux, true, false);
zs_assert (session);
api = new p2p_t (this, session);
zs_assert (api);
break;
case ZS_PUB:
demux = new data_distributor_t;
zs_assert (demux);
session = new session_t (this, this, mux, demux, true, false);
zs_assert (session);
api = new pub_t (this, session);
zs_assert (api);
break;
case ZS_SUB:
mux = new fair_aggregator_t;
zs_assert (mux);
session = new session_t (this, this, mux, demux, true, false);
zs_assert (session);
api = new sub_t (this, session);
zs_assert (api);
break;
case ZS_REQ:
// TODO
zs_assert (false);
api = new req_t (this, session);
zs_assert (api);
break;
case ZS_REP:
// TODO
zs_assert (false);
api = new rep_t (this, session);
zs_assert (api);
break;
default:
errno = EINVAL;
return NULL;
}
attach_session (session);
return api;
}
void zs::app_thread_t::process_commands (bool block_)
{
ypollset_t::signals_t signals;
if (block_)
signals = pollset.poll ();
else {
#if defined ZS_DELAY_COMMANDS
// Optimised version of command processing - it doesn't have to check
// for incoming commands each time. It does so only if certain time
// elapsed since last command processing. Command delay varies
// depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU
// etc. The optimisation makes sense only on platforms where getting
// a timestamp is a very cheap operation (tens of nanoseconds).
// Get timestamp counter.
#if defined __GNUC__
uint32_t low;
uint32_t high;
__asm__ volatile ("rdtsc" : "=a" (low), "=d" (high));
uint64_t current_time = (uint64_t) high << 32 | low;
#elif defined _MSC_VER
uint64_t current_time = __rdtsc ();
#else
#error
#endif
// Check whether certain time have elapsed since last command
// processing.
if (current_time - last_processing_time <= max_command_delay)
return;
last_processing_time = current_time;
#endif
// Check whether there are any commands pending for this thread.
signals = pollset.check ();
}
if (signals) {
// Traverse all the possible sources of commands and process
// all the commands from all of them.
for (int i = 0; i != thread_slot_count (); i++) {
if (signals & (ypollset_t::signals_t (1) << i)) {
command_t cmd;
while (dispatcher->read (i, get_thread_slot (), &cmd))
cmd.destination->process_command (cmd);
}
}
}
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZS_APP_THREAD_HPP_INCLUDED__
#define __ZS_APP_THREAD_HPP_INCLUDED__
#include <vector>
#include "i_thread.hpp"
#include "stdint.hpp"
#include "object.hpp"
#include "ypollset.hpp"
namespace zs
{
class app_thread_t : public object_t, public i_thread
{
public:
app_thread_t (class dispatcher_t *dispatcher_, int thread_slot_);
// To be called when the whole infrastrucure is being closed (zs_term).
void shutdown ();
// Returns signaler associated with this application thread.
i_signaler *get_signaler ();
// Create socket engine in this thread. Return false if the calling
// thread doesn't match the thread handled by this app thread object.
struct i_api *create_socket (int type_);
// Nota bene: The following two functions are accessed from different
// threads. The caller (dispatcher) is responsible for synchronisation
// of accesses.
// Returns true is current thread is associated with the app thread.
bool is_current ();
// Tries to associate current thread with the app thread object.
// Returns true is successfull, false otherwise.
bool make_current ();
// Processes commands sent to this thread (if any). If 'block' is
// set to true, returns only after at least one command was processed.
void process_commands (bool block_);
// i_thread implementation.
void attach_session (class session_t *session_);
void detach_session (class session_t *session_);
struct i_poller *get_poller ();
private:
// Clean-up.
~app_thread_t ();
// Thread ID associated with this slot.
// TODO: Virtualise pid_t!
// TODO: Check whether getpid returns unique ID for each thread.
int tid;
// Vector of all sessionss associated with this app thread.
typedef std::vector <class session_t*> sessions_t;
sessions_t sessions;
// App thread's signaler object.
ypollset_t pollset;
// Timestamp of when commands were processed the last time.
uint64_t last_processing_time;
app_thread_t (const app_thread_t&);
void operator = (const app_thread_t&);
};
}
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZS_ATOMIC_HPP_INCLUDED__
#define __ZS_ATOMIC_HPP_INCLUDED__
#include "stdint.hpp"
#if defined ZS_FORCE_MUTEXES
#define ZS_ATOMIC_MUTEX
#elif (defined __i386__ || defined __x86_64__) && defined __GNUC__
#define ZS_ATOMIC_X86
#elif defined ZMQ_HAVE_WINDOWS
#define ZS_ATOMIC_WINDOWS
#elif defined ZMQ_HAVE_SOLARIS
#define ZS_ATOMIC_SOLARIS
#else
#define ZS_ATOMIC_MUTEX
#endif
namespace zs
{
// Atomic assignement.
inline void atomic_uint32_set (volatile uint32_t *p_, uint32_t value_)
{
*p_ = value_;
// StoreLoad memory barrier should go here on platforms with
// memory models that require it.
}
// Atomic retrieval of an integer.
inline uint32_t atomic_uint32_get (volatile uint32_t *p_)
{
// StoreLoad memory barrier should go here on platforms with
// memory models that require it.
return *p_;
}
// Atomic addition. Returns the old value.
inline uint32_t atomic_uint32_add (volatile uint32_t *p_, uint32_t delta_)
{
#if defined ZS_ATOMIC_WINDOWS
return InterlockedExchangeAdd ((LONG*) &value, increment_);
#elif defined ZS_ATOMIC_SOLARIS
return atomic_add_32_nv (&value, increment_) - delta_;
#elif defined ZS_ATOMIC_X86
uint32_t old;
__asm__ volatile (
"lock; xadd %0, %1\n\t"
: "=r" (old), "=m" (*p_)
: "0" (delta_), "m" (*p_)
: "cc", "memory");
return old;
#else
#error // TODO:
sync.lock ();
uint32_t old = *p_;
*p_ += delta_;
sync.unlock ();
#endif
}
// Atomic subtraction. Returns the old value.
inline uint32_t atomic_uint32_sub (volatile uint32_t *p_, uint32_t delta_)
{
#if defined ZS_ATOMIC_WINDOWS
LONG delta = - ((LONG) delta_);
return InterlockedExchangeAdd ((LONG*) &value, delta);
#elif defined ZS_ATOMIC_SOLARIS
int32_t delta = - ((int32_t) delta_);
return atomic_add_32_nv (&value, delta) + delta_;
#elif defined ZS_ATOMIC_X86
uint32_t old = -delta_;
__asm__ volatile ("lock; xaddl %0,%1"
: "=r" (old), "=m" (*p_)
: "0" (old), "m" (*p_)
: "cc");
return old;
#else
#error // TODO:
sync.lock ();
uint32_t old = *p_;
*p_ -= delta_;
sync.unlock ();
return old;
#endif
}
// Atomic assignement.
template <typename T>
inline void atomic_ptr_set (volatile T **p_, T *value_)
{
*p_ = value_;
// StoreLoad memory barrier should go here on platforms with
// memory models that require it.
}
// Perform atomic 'exchange pointers' operation. Old value is returned.
template <typename T>
inline void *atomic_ptr_xchg (volatile T **p_, T *value_)
{
#if defined ZS_ATOMIC_WINDOWS
return InterlockedExchangePointer (p_, value_);
#elif defined ZS_ATOMIC_SOLARIS
return atomic_swap_ptr (p_, value_);
#elif defined ZS_ATOMIC_X86
void *old;
__asm__ volatile (
"lock; xchg %0, %2"
: "=r" (old), "=m" (*p_)
: "m" (*p_), "0" (value_));
return old;
#else
#error // TODO:
sync.lock ();
void *old = *p_;
*p_ = value_;
sync.unlock ();
return old;
#endif
}
// Perform atomic 'compare and swap' operation on the pointer.
// The pointer is compared to 'cmp' argument and if they are
// equal, its value is set to 'value'. Old value of the pointer
// is returned.
template <typename T>
inline void *atomic_ptr_cas (volatile T **p_, T *cmp_, T *value_)
{
#if defined ZS_ATOMIC_WINDOWS
return InterlockedCompareExchangePointer (p_, value_, cmp_);
#elif defined ZS_ATOMIC_SOLARIS
return atomic_cas_ptr (p_, cmp_, value_);
#elif defined ZS_ATOMIC_X86
void *old;
__asm__ volatile (
"lock; cmpxchg %2, %3"
: "=a" (old), "=m" (*p_)
: "r" (value_), "m" (*p_), "0" (cmp_)
: "cc");
return old;
#else
#error // TODO:
sync.lock ();
void *old = *p_;
if (old == cmp_)
*p_ = value_;
sync.unlock ();
return old;
#endif
}
#if defined ZS_ATOMIC_X86 && defined __x86_64__
typedef uint64_t atomic_bitmap_t;
#else
typedef uint32_t atomic_bitmap_t;
#endif
// Atomic assignement.
inline void atomic_bitmap_set (volatile atomic_bitmap_t *p_,
atomic_bitmap_t value_)
{
*p_ = value_;
// StoreLoad memory barrier should go here on platforms with
// memory models that require it.
}
// Bit-test-set-and-reset. Sets one bit of the value and resets
// another one. Returns the original value of the reset bit.
inline bool atomic_bitmap_btsr (volatile atomic_bitmap_t *p_,
int set_index_, int reset_index_)
{
#if defined ZS_ATOMIC_WINDOWS
while (true) {
atomic_bitmap_t oldval = *p_;
atomic_bitmap_t newval = (oldval | (atomic_bitmap_t (1) <<
set_index_)) & ~(integer_t (1) << reset_index_);
if (InterlockedCompareExchange ((volatile LONG*) p_, newval,
oldval) == (LONG) oldval)
return (oldval & (atomic_bitmap_t (1) << reset_index_)) ?
true : false;
}
#elif defined ZS_ATOMIC_SOLARIS
while (true) {
atomic_bitmap_t oldval = *p_;
atomic_bitmap_t newval = (oldval | (atomic_bitmap_t (1) <<
set_index_)) & ~(integer_t (1) << reset_index_);
if (atomic_cas_32 (p_, oldval, newval) == oldval)
return (oldval & (atomic_bitmap_t (1) << reset_index_)) ?
true : false;
}
#elif defined ZS_ATOMIC_X86
atomic_bitmap_t oldval, dummy;
__asm__ volatile (
"mov %0, %1\n\t"
"1:"
"mov %1, %2\n\t"
"bts %3, %2\n\t"
"btr %4, %2\n\t"
"lock cmpxchg %2, %0\n\t"
"jnz 1b\n\t"
: "+m" (*p_), "=&a" (oldval), "=&r" (dummy)
: "r" (atomic_bitmap_t (set_index_)),
"r" (atomic_bitmap_t (reset_index_))
: "cc");
return (bool) (oldval & (atomic_bitmap_t (1) << reset_index_));
#else
#error // TODO:
sync.lock ();
atomic_bitmap_t oldval = *p_;
*p_ = (oldval | (atomic_bitmap_t (1) << set_index_)) &
~(atomic_bitmap_t (1) << reset_index_);
sync.unlock ();
return (oldval & (atomic_bitmap_t (1) << reset_index_)) ? true : false;
#endif
}
// Sets value to newval. Returns the original value.
inline atomic_bitmap_t atomic_bitmap_xchg (volatile atomic_bitmap_t *p_,
atomic_bitmap_t newval_)
{
#if defined ZS_ATOMIC_WINDOWS
return InterlockedExchange ((volatile LONG*) p_, newval_);
#elif defined ZS_ATOMIC_SOLARIS
return atomic_swap_32 (p_, newval_);
#elif defined ZS_ATOMIC_X86
atomic_bitmap_t oldval = newval_;
__asm__ volatile (
"lock; xchg %0, %1"
: "=r" (oldval)
: "m" (*p_), "0" (oldval)
: "memory");
return oldval;
#else
#error // TODO:
sync.lock ();
atomic_bitmap_t oldval = *p_;
*p_ = newval_;
sync.unlock ();
#endif
}
// izte is "if-zero-then-else" atomic operation - if the value is zero
// it substitutes it by 'thenval' else it rewrites it by 'elseval'.
// Original value of the integer is returned from this function.
inline atomic_bitmap_t atomic_bitmap_izte (volatile atomic_bitmap_t *p_,
atomic_bitmap_t thenval_, atomic_bitmap_t elseval_)
{
#if defined ZS_ATOMIC_WINDOWS
while (true) {
atomic_bitmap_t oldval = *p_;
atomic_bitmap_t newval = (oldval ? elseval_ : thenval_);
if (InterlockedCompareExchange ((volatile LONG*) p_, newval,
oldval) == (LONG) oldval)
return oldval;
}
#elif defined ZS_ATOMIC_SOLARIS
while (true) {
atomic_bitmap_t oldval = *p_;
atomic_bitmap_t newval = (oldval ? elseval_ : thenval_);
if (atomic_cas_32 (p_, oldval, newval) == oldval)
return oldval;
}
#elif defined ZS_ATOMIC_X86
atomic_bitmap_t oldval;
atomic_bitmap_t dummy;
__asm__ volatile (
"mov %0, %1\n\t"
"1:"
"mov %3, %2\n\t"
"test %1, %1\n\t"
"jz 2f\n\t"
"mov %4, %2\n\t"
"2:"
"lock cmpxchg %2, %0\n\t"
"jnz 1b\n\t"
: "+m" (*p_), "=&a" (oldval), "=&r" (dummy)
: "r" (thenval_), "r" (elseval_)
: "cc");
return oldval;
#else
#error // TODO:
sync.lock ();
atomic_bitmap_t oldval = *p_;
*p_ = oldval ? elseval_ : thenval_;
sync.unlock ();
return oldval;
#endif
}
}
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZS_ATOMIC_BITMAP_HPP_INCLUDED__
#define __ZS_ATOMIC_BITMAP_HPP_INCLUDED__
#include "stdint.hpp"
#include "platform.hpp"
// These are the conditions to choose between different implementations
// of atomic_bitmap.
#if defined ZS_FORCE_MUTEXES
#define ZS_ATOMIC_BITMAP_MUTEX
#elif (defined __i386__ || defined __x86_64__) && defined __GNUC__
#define ZS_ATOMIC_BITMAP_X86
#elif 0 && defined __sparc__ && defined __GNUC__
#define ZS_ATOMIC_BITMAP_SPARC
#elif defined ZS_HAVE_WINDOWS
#define ZS_ATOMIC_BITMAP_WINDOWS
#elif defined ZS_HAVE_SOLARIS
#define ZS_ATOMIC_BITMAP_SOLARIS
#else
#define ZS_ATOMIC_BITMAP_MUTEX
#endif
#if defined ZS_ATOMIC_BITMAP_MUTEX
#include "mutex.hpp"
#elif defined ZS_ATOMIC_BITMAP_WINDOWS
#include "windows.hpp"
#elif defined ZS_ATOMIC_BITMAP_SOLARIS
#include <atomic.h>
#endif
namespace zs
{
// This class encapuslates several bitwise atomic operations on unsigned
// integer. Selection of operations is driven specifically by the needs
// of ypollset implementation.
class atomic_bitmap_t
{
public:
#if (defined ZMQ_ATOMIC_BITMAP_X86 || defined ZMQ_FORCE_MUTEXES) \
&& defined __x86_64__
typedef uint64_t bitmap_t;
#else
typedef uint32_t bitmap_t;
#endif
inline atomic_bitmap_t (bitmap_t value_ = 0) :
value (value_)
{
}
inline ~atomic_bitmap_t ()
{
}
// Bit-test-set-and-reset. Sets one bit of the value and resets
// another one. Returns the original value of the reset bit.
inline bool btsr (int set_index_, int reset_index_)
{
#if defined ZS_ATOMIC_BITMAP_WINDOWS
while (true) {
bitmap_t oldval = value;
bitmap_t newval = (oldval | (bitmap_t (1) << set_index_)) &
~(bitmap_t (1) << reset_index_);
if (InterlockedCompareExchange ((volatile LONG*) &value, newval,
oldval) == (LONG) oldval)
return (oldval & (bitmap_t (1) << reset_index_)) ?
true : false;
}
#elif defined ZS_ATOMIC_BITMAP_SOLARIS
while (true) {
bitmap_t oldval = value;
bitmap_t newval = (oldval | (bitmap_t (1) << set_index_)) &
~(bitmap_t (1) << reset_index_);
if (atomic_cas_32 (&value, oldval, newval) == oldval)
return (oldval & (bitmap_t (1) << reset_index_)) ?
true : false;
}
#elif defined ZS_ATOMIC_BITMAP_X86
bitmap_t oldval, dummy;
__asm__ volatile (
"mov %0, %1\n\t"
"1:"
"mov %1, %2\n\t"
"bts %3, %2\n\t"
"btr %4, %2\n\t"
"lock cmpxchg %2, %0\n\t"
"jnz 1b\n\t"
: "+m" (value), "=&a" (oldval), "=&r" (dummy)
: "r" (bitmap_t(set_index_)), "r" (bitmap_t(reset_index_))
: "cc");
return (bool) (oldval & (bitmap_t(1) << reset_index_));
#elif defined ZS_ATOMIC_BITMAP_SPARC
volatile bitmap_t* valptr = &value;
bitmap_t set_val = bitmap_t(1) << set_index_;
bitmap_t reset_val = ~(bitmap_t(1) << reset_index_);
bitmap_t tmp;
bitmap_t oldval;
__asm__ volatile(
"ld [%5], %2 \n\t"
"1: \n\t"
"or %2, %0, %3 \n\t"
"and %3, %1, %3 \n\t"
"cas [%5], %2, %3 \n\t"
"cmp %2, %3 \n\t"
"bne,a,pn %%icc, 1b \n\t"
"mov %3, %2 \n\t"
: "+r" (set_val), "+r" (reset_val), "=&r" (tmp),
"=&r" (oldval), "+m" (*valptr)
: "r" (valptr)
: "cc");
return oldval;
#elif defined ZS_ATOMIC_BITMAP_MUTEX
sync.lock ();
bitmap_t oldval = value;
value = (oldval | (bitmap_t (1) << set_index_)) &
~(bitmap_t (1) << reset_index_);
sync.unlock ();
return (oldval & (bitmap_t (1) << reset_index_)) ? true : false;
#else
#error
#endif
}
// Sets value to newval. Returns the original value.
inline bitmap_t xchg (bitmap_t newval_)
{
bitmap_t oldval;
#if defined ZS_ATOMIC_BITMAP_WINDOWS
oldval = InterlockedExchange ((volatile LONG*) &value, newval_);
#elif defined ZS_ATOMIC_BITMAP_SOLARIS
oldval = atomic_swap_32 (&value, newval_);
#elif defined ZS_ATOMIC_BITMAP_X86
oldval = newval_;
__asm__ volatile (
"lock; xchg %0, %1"
: "=r" (oldval)
: "m" (value), "0" (oldval)
: "memory");
#elif defined ZS_ATOMIC_BITMAP_SPARC
oldval = value;
volatile bitmap_t* ptrin = &value;
bitmap_t tmp;
bitmap_t prev;
__asm__ __volatile__(
"ld [%4], %1\n\t"
"1:\n\t"
"mov %0, %2\n\t"
"cas [%4], %1, %2\n\t"
"cmp %1, %2\n\t"
"bne,a,pn %%icc, 1b\n\t"
"mov %2, %1\n\t"
: "+r" (newval_), "=&r" (tmp), "=&r" (prev), "+m" (*ptrin)
: "r" (ptrin)
: "cc");
return prev;
#elif defined ZS_ATOMIC_BITMAP_MUTEX
sync.lock ();
oldval = value;
value = newval_;
sync.unlock ();
#else
#error
#endif
return oldval;
}
// izte is "if-zero-then-else" atomic operation - if the value is zero
// it substitutes it by 'thenval' else it rewrites it by 'elseval'.
// Original value of the integer is returned from this function.
inline bitmap_t izte (bitmap_t thenval_,
bitmap_t elseval_)
{
#if defined ZS_ATOMIC_BITMAP_WINDOWS
while (true) {
bitmap_t oldval = value;
bitmap_t newval = oldval == 0 ? thenval_ : elseval_;
if (InterlockedCompareExchange ((volatile LONG*) &value,
newval, oldval) == (LONG) oldval)
return oldval;
}
#elif defined ZS_ATOMIC_BITMAP_SOLARIS
while (true) {
bitmap_t oldval = value;
bitmap_t newval = oldval == 0 ? thenval_ : elseval_;
if (atomic_cas_32 (&value, oldval, newval) == oldval)
return oldval;
}
#elif defined ZS_ATOMIC_BITMAP_X86
bitmap_t oldval;
bitmap_t dummy;
__asm__ volatile (
"mov %0, %1\n\t"
"1:"
"mov %3, %2\n\t"
"test %1, %1\n\t"
"jz 2f\n\t"
"mov %4, %2\n\t"
"2:"
"lock cmpxchg %2, %0\n\t"
"jnz 1b\n\t"
: "+m" (value), "=&a" (oldval), "=&r" (dummy)
: "r" (thenval_), "r" (elseval_)
: "cc");
return oldval;
#elif defined ZS_ATOMIC_BITMAP_SPARC
volatile bitmap_t* ptrin = &value;
bitmap_t tmp;
bitmap_t prev;
__asm__ __volatile__(
"ld [%3], %0 \n\t"
"mov 0, %1 \n\t"
"cas [%3], %1, %4 \n\t"
"cmp %0, %1 \n\t"
"be,a,pn %%icc,1f \n\t"
"ld [%3], %0 \n\t"
"cas [%3], %0, %5 \n\t"
"1: \n\t"
: "=&r" (tmp), "=&r" (prev), "+m" (*ptrin)
: "r" (ptrin), "r" (thenval_), "r" (elseval_)
: "cc");
return prev;
#elif defined ZS_ATOMIC_BITMAP_MUTEX
sync.lock ();
bitmap_t oldval = value;
value = oldval ? elseval_ : thenval_;
sync.unlock ();
return oldval;
#else
#error
#endif
}
private:
volatile bitmap_t value;
#if defined ZS_ATOMIC_BITMAP_MUTEX
mutex_t sync;
#endif
atomic_bitmap_t (const atomic_bitmap_t&);
void operator = (const atomic_bitmap_t&);
};
}
// Remove macros local to this file.
#if defined ZS_ATOMIC_BITMAP_WINDOWS
#undef ZS_ATOMIC_BITMAP_WINDOWS
#endif
#if defined ZS_ATOMIC_BITMAP_SOLARIS
#undef ZS_ATOMIC_BITMAP_SOLARIS
#endif
#if defined ZS_ATOMIC_BITMAP_X86
#undef ZS_ATOMIC_BITMAP_X86
#endif
#if defined ZS_ATOMIC_BITMAP_SPARC
#undef ZS_ATOMIC_BITMAP_SPARC
#endif
#if defined ZS_ATOMIC_BITMAP_MUTEX
#undef ZS_ATOMIC_BITMAP_MUTEX
#endif
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZS_ATOMIC_COUNTER_HPP_INCLUDED__
#define __ZS_ATOMIC_COUNTER_HPP_INCLUDED__
#include "stdint.hpp"
#include "platform.hpp"
#if defined ZS_FORCE_MUTEXES
#define ZS_ATOMIC_COUNTER_MUTEX
#elif (defined __i386__ || defined __x86_64__) && defined __GNUC__
#define ZS_ATOMIC_COUNTER_X86
#elif 0 && defined __sparc__ && defined __GNUC__
#define ZS_ATOMIC_COUNTER_SPARC
#elif defined ZS_HAVE_WINDOWS
#define ZS_ATOMIC_COUNTER_WINDOWS
#elif defined ZS_HAVE_SOLARIS
#define ZS_ATOMIC_COUNTER_SOLARIS
#else
#define ZS_ATOMIC_COUNTER_MUTEX
#endif
#if defined ZS_ATOMIC_COUNTER_MUTEX
#include "mutex.hpp"
#elif defined ZS_ATOMIC_COUNTER_WINDOWS
#include "windows.hpp"
#elif defined ZS_ATOMIC_COUNTER_SOLARIS
#include <atomic.h>
#endif
namespace zs
{
// This class represents an integer that can be incremented/decremented
// in atomic fashion.
class atomic_counter_t
{
public:
typedef uint32_t integer_t;
inline atomic_counter_t (integer_t value_ = 0) :
value (value_)
{
}
inline ~atomic_counter_t ()
{
}
// Set counter value (not thread-safe).
inline void set (integer_t value_)
{
value = value_;
}
// Atomic addition. Returns the old value.
inline integer_t add (integer_t increment_)
{
integer_t old_value;
#if defined ZS_ATOMIC_COUNTER_WINDOWS
old_value = InterlockedExchangeAdd ((LONG*) &value, increment_);
#elif defined ZS_ATOMIC_COUNTER_SOLARIS
integer_t new_value = atomic_add_32_nv (&value, increment_);
old_value = new_value - increment_;
#elif defined ZS_ATOMIC_COUNTER_X86
__asm__ volatile (
"lock; xadd %0, %1 \n\t"
: "=r" (old_value), "=m" (value)
: "0" (increment_), "m" (value)
: "cc", "memory");
#elif defined ZS_ATOMIC_COUNTER_SPARC
integer_t tmp;
__asm__ volatile (
"ld [%4], %0 \n\t"
"1: \n\t"
"add %0, %3, %1 \n\t"
"cas [%4], %0, %1 \n\t"
"cmp %0, %1 \n\t"
"bne,a,pn %%icc, 1b \n\t"
"mov %1, %0 \n\t"
: "=&r" (old_value), "=&r" (tmp), "=m" (value)
: "r" (increment_), "r" (&value)
: "cc", "memory");
#elif defined ZS_ATOMIC_COUNTER_MUTEX
sync.lock ();
old_value = value;
value += increment_;
sync.unlock ();
#else
#error
#endif
return old_value;
}
// Atomic subtraction. Returns false if the counter drops to zero.
inline bool sub (integer_t decrement)
{
#if defined ZS_ATOMIC_COUNTER_WINDOWS
LONG delta = - ((LONG) decrement);
integer_t old = InterlockedExchangeAdd ((LONG*) &value, delta);
return old - decrement != 0;
#elif defined ZS_ATOMIC_COUNTER_SOLARIS
int32_t delta = - ((int32_t) decrement);
integer_t nv = atomic_add_32_nv (&value, delta);
return nv != 0;
#elif defined ZS_ATOMIC_COUNTER_X86
integer_t oldval = -decrement;
volatile integer_t *val = &value;
__asm__ volatile ("lock; xaddl %0,%1"
: "=r" (oldval), "=m" (*val)
: "0" (oldval), "m" (*val)
: "cc");
return oldval != decrement;
#elif defined ZS_ATOMIC_COUNTER_SPARC
volatile integer_t *val = &value;
integer_t tmp;
integer_t result;
__asm__ volatile(
"ld [%4], %1\n\t"
"1:\n\t"
"add %1, %0, %2\n\t"
"cas [%4], %1, %2\n\t"
"cmp %1, %2\n\t"
"bne,a,pn %%icc, 1b\n\t"
"mov %2, %1\n\t"
: "+r" (-decrement), "=&r" (tmp), "=&r" (result), "+m" (*val)
: "r" (val)
: "cc");
return result <= decrement;
#elif defined ZS_ATOMIC_COUNTER_MUTEX
sync.lock ();
value -= decrement;
bool result = value ? true : false;
sync.unlock ();
return result;
#else
#error
#endif
}
inline integer_t get ()
{
return value;
}
private:
volatile integer_t value;
#if defined ZS_ATOMIC_COUNTER_MUTEX
mutex_t sync;
#endif
atomic_counter_t (const atomic_counter_t&);
void operator = (const atomic_counter_t&);
};
}
// Remove macros local to this file.
#if defined ZS_ATOMIC_COUNTER_WINDOWS
#undef ZS_ATOMIC_COUNTER_WINDOWS
#endif
#if defined ZS_ATOMIC_COUNTER_SOLARIS
#undef ZS_ATOMIC_COUNTER_SOLARIS
#endif
#if defined ZS_ATOMIC_COUNTER_X86
#undef ZS_ATOMIC_COUNTER_X86
#endif
#if defined ZS_ATOMIC_COUNTER_SPARC
#undef ZS_ATOMIC_COUNTER_SPARC
#endif
#if defined ZS_ATOMIC_COUNTER_MUTEX
#undef ZS_ATOMIC_COUNTER_MUTEX
#endif
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZS_ATOMIC_PTR_HPP_INCLUDED__
#define __ZS_ATOMIC_PTR_HPP_INCLUDED__
#include "platform.hpp"
#if defined ZS_FORCE_MUTEXES
#define ZS_ATOMIC_PTR_MUTEX
#elif (defined __i386__ || defined __x86_64__) && defined __GNUC__
#define ZS_ATOMIC_PTR_X86
#elif 0 && defined __sparc__ && defined __GNUC__
#define ZS_ATOMIC_PTR_SPARC
#elif defined ZS_HAVE_WINDOWS
#define ZS_ATOMIC_PTR_WINDOWS
#elif defined ZS_HAVE_SOLARIS
#define ZS_ATOMIC_PTR_SOLARIS
#else
#define ZS_ATOMIC_PTR_MUTEX
#endif
#if defined ZS_ATOMIC_PTR_MUTEX
#include "mutex.hpp"
#elif defined ZS_ATOMIC_PTR_WINDOWS
#include "windows.hpp"
#elif defined ZS_ATOMIC_PTR_SOLARIS
#include <atomic.h>
#endif
namespace zs
{
// This class encapsulates several atomic operations on pointers.
template <typename T> class atomic_ptr_t
{
public:
// Initialise atomic pointer
inline atomic_ptr_t ()
{
ptr = NULL;
}
// Destroy atomic pointer
inline ~atomic_ptr_t ()
{
}
// Set value of atomic pointer in a non-threadsafe way
// Use this function only when you are sure that at most one
// thread is accessing the pointer at the moment.
inline void set (T *ptr_)
{
this->ptr = ptr_;
}
// Perform atomic 'exchange pointers' operation. Pointer is set
// to the 'val' value. Old value is returned.
inline T *xchg (T *val_)
{
#if defined ZS_ATOMIC_PTR_WINDOWS
return (T*) InterlockedExchangePointer (&ptr, val_);
#elif defined ZS_ATOMIC_PTR_SOLARIS
return (T*) atomic_swap_ptr (&ptr, val_);
#elif defined ZS_ATOMIC_PTR_X86
T *old;
__asm__ volatile (
"lock; xchg %0, %2"
: "=r" (old), "=m" (ptr)
: "m" (ptr), "0" (val_));
return old;
#elif defined ZS_ATOMIC_PTR_SPARC
T* newptr = val_;
volatile T** ptrin = &ptr;
T* tmp;
T* prev;
__asm__ __volatile__(
"ld [%4], %1\n\t"
"1:\n\t"
"mov %0, %2\n\t"
"cas [%4], %1, %2\n\t"
"cmp %1, %2\n\t"
"bne,a,pn %%icc, 1b\n\t"
"mov %2, %1\n\t"
: "+r" (newptr), "=&r" (tmp), "=&r" (prev), "+m" (*ptrin)
: "r" (ptrin)
: "cc");
return prev;
#elif defined ZS_ATOMIC_PTR_MUTEX
sync.lock ();
T *old = (T*) ptr;
ptr = val_;
sync.unlock ();
return old;
#else
#error
#endif
}
// Perform atomic 'compare and swap' operation on the pointer.
// The pointer is compared to 'cmp' argument and if they are
// equal, its value is set to 'val'. Old value of the pointer
// is returned.
inline T *cas (T *cmp_, T *val_)
{
#if defined ZS_ATOMIC_PTR_WINDOWS
return (T*) InterlockedCompareExchangePointer (
(volatile PVOID*) &ptr, val_, cmp_);
#elif defined ZS_ATOMIC_PTR_SOLARIS
return (T*) atomic_cas_ptr (&ptr, cmp_, val_);
#elif defined ZS_ATOMIC_PTR_X86
T *old;
__asm__ volatile (
"lock; cmpxchg %2, %3"
: "=a" (old), "=m" (ptr)
: "r" (val_), "m" (ptr), "0" (cmp_)
: "cc");
return old;
#elif defined ZS_ATOMIC_PTR_SPARC
volatile T** ptrin = &ptr;
volatile T* prev = ptr;
__asm__ __volatile__(
"cas [%3], %1, %2\n\t"
: "+m" (*ptrin)
: "r" (cmp_), "r" (val_), "r" (ptrin)
: "cc");
return prev;
#elif defined ZS_ATOMIC_PTR_MUTEX
sync.lock ();
T *old = (T*) ptr;
if (ptr == cmp_)
ptr = val_;
sync.unlock ();
return old;
#else
#error
#endif
}
private:
volatile T *ptr;
#if defined ZS_ATOMIC_PTR_MUTEX
mutex_t sync;
#endif
atomic_ptr_t (const atomic_ptr_t&);
void operator = (const atomic_ptr_t&);
};
}
// Remove macros local to this file.
#if defined ZS_ATOMIC_PTR_WINDOWS
#undef ZS_ATOMIC_PTR_WINDOWS
#endif
#if defined ZS_ATOMIC_PTR_SOLARIS
#undef ZS_ATOMIC_PTR_SOLARIS
#endif
#if defined ZS_ATOMIC_PTR_X86
#undef ZS_ATOMIC_PTR_X86
#endif
#if defined ZS_ATOMIC_PTR_SPARC
#undef ZS_ATOMIC_PTR_SPARC
#endif
#if defined ZS_ATOMIC_PTR_MUTEX
#undef ZS_ATOMIC_PTR_MUTEX
#endif
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZS_COMMAND_HPP_INCLUDED__
#define __ZS_COMMAND_HPP_INCLUDED__
#include "stdint.hpp"
namespace zs
{
// This structure defines the commands that can be sent between threads.
struct command_t
{
// Object to process the command.
class object_t *destination;
enum type_t
{
stop,
bind,
head,
tail,
reg,
reg_and_bind,
unreg,
engine,
terminate,
terminate_ack
} type;
union {
struct {
} stop;
struct {
class pipe_reader_t *reader;
class session_t *peer;
} bind;
struct {
uint64_t bytes;
} tail;
struct {
uint64_t bytes;
} head;
struct {
class simple_semaphore_t *smph;
} reg;
struct {
class session_t *peer;
bool flow_in;
bool flow_out;
} reg_and_bind;
struct {
class simple_semaphore_t *smph;
} unreg;
// TODO: Engine object won't be deallocated on terminal shutdown
// while the command is still on the fly!
struct {
class i_engine *engine;
} engine;
struct {
} terminate;
struct {
} terminate_ack;
} args;
};
}
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZS_CONFIG_HPP_INCLUDED__
#define __ZS_CONFIG_HPP_INCLUDED__
namespace zs
{
// Compile-time settings.
enum
{
// Number of new messages in message pipe needed to trigger new memory
// allocation. Setting this parameter to 256 decreases the impact of
// memory allocation by approximately 99.6%
message_pipe_granularity = 256,
// Number of new commands in command pipe needed to trigger new memory
// allocation. The number should be kept low to decrease the memory
// footprint of dispatcher.
command_pipe_granularity = 4,
// Maximal batching size for engines with receiving functionality.
// So, if there are 10 messages that fit into the batch size, all of
// them may be read by a single 'recv' system call, thus avoiding
// unnecessary network stack traversals.
in_batch_size = 8192,
// Maximal batching size for engines with sending functionality.
// So, if there are 10 messages that fit into the batch size, all of
// them may be written by a single 'send' system call, thus avoiding
// unnecessary network stack traversals.
out_batch_size = 8192,
// Maximum number of events the I/O thread can process in one go.
max_io_events = 256,
// Maximal wait time for a timer (milliseconds).
max_timer_period = 100,
// Maximal delay to process command in API thread (in CPU ticks).
// 3,000,000 ticks equals to 1 - 2 milliseconds on current CPUs.
max_command_delay = 3000000,
// Maximal number of non-accepted connections that can be held by
// TCP listener object.
tcp_connection_backlog = 10
};
}
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "connecter.hpp"
#include "io_thread.hpp"
#include "session.hpp"
#include "err.hpp"
#include "simple_semaphore.hpp"
#include "zmq_tcp_engine.hpp"
zs::connecter_t::connecter_t (io_thread_t *thread_, const char *addr_,
session_t *session_) :
io_object_t (thread_),
state (idle),
poller (NULL),
session (session_),
addr (addr_),
identity ("abcde"),
engine (NULL)
{
}
void zs::connecter_t::terminate ()
{
delete this;
}
void zs::connecter_t::shutdown ()
{
delete this;
}
zs::connecter_t::~connecter_t ()
{
}
void zs::connecter_t::process_reg (simple_semaphore_t *smph_)
{
// Fet poller pointer for further use.
zs_assert (!poller);
poller = get_poller ();
// Ask the session to register itself with the I/O thread. Note that
// the session is living in the same I/O thread, thus this results
// in a synchronous call.
session->inc_seqnum ();
send_reg (session, NULL);
// Unlock the application thread that created the connecter.
if (smph_)
smph_->post ();
// Manually trigger timer event which will launch asynchronous connect.
state = waiting;
timer_event ();
}
void zs::connecter_t::process_unreg (simple_semaphore_t *smph_)
{
// Unregister connecter/engine from the poller.
zs_assert (poller);
if (state == connecting)
poller->rm_fd (handle);
else if (state == waiting)
poller->cancel_timer (this);
else if (state == sending)
engine->terminate ();
// Unlock the application thread closing the connecter.
if (smph_)
smph_->post ();
}
void zs::connecter_t::in_event ()
{
// Error occured in asynchronous connect. Retry to connect after a while.
if (state == connecting) {
fd_t fd = tcp_connecter.connect ();
zs_assert (fd == retired_fd);
poller->rm_fd (handle);
poller->add_timer (this);
state = waiting;
return;
}
zs_assert (false);
}
void zs::connecter_t::out_event ()
{
if (state == connecting) {
fd_t fd = tcp_connecter.connect ();
if (fd == retired_fd) {
poller->rm_fd (handle);
poller->add_timer (this);
state = waiting;
return;
}
poller->rm_fd (handle);
engine = new zmq_tcp_engine_t (fd);
zs_assert (engine);
engine->attach (poller, this);
state = sending;
return;
}
zs_assert (false);
}
void zs::connecter_t::timer_event ()
{
zs_assert (state == waiting);
// Initiate async connect and start polling for its completion. If async
// connect fails instantly, try to reconnect after a while.
int rc = tcp_connecter.open (addr.c_str ());
if (rc == 0) {
state = connecting;
in_event ();
}
else if (rc == 1) {
handle = poller->add_fd (tcp_connecter.get_fd (), this);
poller->set_pollout (handle);
state = connecting;
}
else {
poller->add_timer (this);
state = waiting;
}
}
void zs::connecter_t::set_engine (struct i_engine *engine_)
{
engine = engine_;
}
bool zs::connecter_t::read (zs_msg *msg_)
{
zs_assert (state == sending);
// Deallocate old content of the message just in case.
zs_msg_close (msg_);
// Send the identity.
zs_msg_init_size (msg_, identity.size ());
memcpy (zs_msg_data (msg_), identity.c_str (), identity.size ());
// Ask engine to unregister from the poller.
i_engine *e = engine;
engine->detach ();
// Attach the engine to the session. (Note that this is actually
// a synchronous call.
session->inc_seqnum ();
send_engine (session, e);
state = idle;
return true;
}
bool zs::connecter_t::write (struct zs_msg *msg_)
{
// No incoming messages are accepted till identity is sent.
return false;
}
void zs::connecter_t::flush ()
{
// No incoming messages are accepted till identity is sent.
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZS_CONNECTER_HPP_INCLUDED__
#define __ZS_CONNECTER_HPP_INCLUDED__
#include <string>
#include "../include/zs.h"
#include "i_poller.hpp"
#include "io_object.hpp"
#include "i_poll_events.hpp"
#include "i_session.hpp"
#include "tcp_connecter.hpp"
namespace zs
{
class connecter_t : public io_object_t, public i_poll_events,
public i_session
{
public:
connecter_t (class io_thread_t *thread_, const char *addr_,
class session_t *session_);
void terminate ();
void shutdown ();
void process_reg (class simple_semaphore_t *smph_);
void process_unreg (class simple_semaphore_t *smph_);
// i_poll_events implementation.
void in_event ();
void out_event ();
void timer_event ();
// i_session implementation
void set_engine (struct i_engine *engine_);
// void shutdown ();
bool read (struct zs_msg *msg_);
bool write (struct zs_msg *msg_);
void flush ();
private:
// Clean-up.
~connecter_t ();
enum {
idle,
waiting,
connecting,
sending
} state;
// Cached pointer to the poller.
struct i_poller *poller;
// Handle of the connecting socket.
handle_t handle;
// Associated session. It lives in the same I/O thread.
class session_t *session;
// Address to connect to.
std::string addr;
// Identity of the connection.
std::string identity;
tcp_connecter_t tcp_connecter;
struct i_engine *engine;
connecter_t (const connecter_t&);
void operator = (const connecter_t&);
};
}
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zs.h"
#include "data_distributor.hpp"
#include "pipe_writer.hpp"
#include "err.hpp"
#include "session.hpp"
#include "msg.hpp"
zs::data_distributor_t::data_distributor_t () :
session (NULL)
{
}
void zs::data_distributor_t::set_session (session_t *session_)
{
zs_assert (!session);
session = session_;
}
void zs::data_distributor_t::shutdown ()
{
// No need to deallocate pipes here. They'll be deallocated during the
// shutdown of the dispatcher.
delete this;
}
void zs::data_distributor_t::terminate ()
{
// Pipe unregisters itself during the call to terminate, so the pipes
// list shinks by one in each iteration.
while (!pipes.empty ())
pipes [0]->terminate ();
delete this;
}
zs::data_distributor_t::~data_distributor_t ()
{
}
void zs::data_distributor_t::attach_pipe (pipe_writer_t *pipe_)
{
// Associate demux with a new pipe.
pipe_->set_demux (this);
pipe_->set_index (pipes.size ());
pipes.push_back (pipe_);
}
void zs::data_distributor_t::detach_pipe (pipe_writer_t *pipe_)
{
// Release the reference to the pipe.
int index = pipe_->get_index ();
pipe_->set_index (-1);
pipes [index] = pipes.back ();
pipes [index]->set_index (index);
pipes.pop_back ();
}
bool zs::data_distributor_t::empty ()
{
return pipes.empty ();
}
bool zs::data_distributor_t::send (zs_msg *msg_)
{
int pipes_count = pipes.size ();
// If there are no pipes available, simply drop the message.
if (pipes_count == 0) {
zs_msg_close (msg_);
zs_msg_init (msg_);
return true;
}
// TODO: ???
// First check whether all pipes are available for writing.
// for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it ++)
// if (!(*it)->check_write (msg_))
// return false;
// For VSMs the copying is straighforward.
if (msg_->content == (zs_msg_content*) ZS_VSM) {
for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it ++)
write_to_pipe (*it, msg_);
zs_msg_init (msg_);
return true;
}
// Optimisation for the case when there's only a single pipe
// to send the message to - no refcount adjustment (i.e. atomic
// operations) needed.
if (pipes_count == 1) {
write_to_pipe (*pipes.begin (), msg_);
zs_msg_init (msg_);
return true;
}
// There are at least 2 destinations for the message. That means we have
// to deal with reference counting. First add N-1 references to
// the content (we are holding one reference anyway, that's why the -1).
if (msg_->shared)
msg_->content->refcnt.add (pipes_count - 1);
else {
msg_->shared = true;
// TODO: Add memory barrier here.
msg_->content->refcnt.set (pipes_count);
}
// Push the message to all destinations.
for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it ++)
write_to_pipe (*it, msg_);
// Detach the original message from the data buffer.
zs_msg_init (msg_);
return true;
}
void zs::data_distributor_t::flush ()
{
// Flush all pipes. If there's large number of pipes, it can be pretty
// inefficient (especially if there's new message only in a single pipe).
// Can it be improved?
for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it ++)
(*it)->flush ();
}
void zs::data_distributor_t::write_to_pipe (class pipe_writer_t *pipe_,
struct zs_msg *msg_)
{
if (!pipe_->write (msg_)) {
// TODO: Push gap notification to the pipe.
zs_assert (false);
}
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZS_DATA_DISTRIBUTOR_HPP_INCLUDED__
#define __ZS_DATA_DISTRIBUTOR_HPP_INCLUDED__
#include <vector>
#include <i_demux.hpp>
namespace zs
{
// Object to distribute messages to outbound pipes.
class data_distributor_t : public i_demux
{
public:
data_distributor_t ();
// i_demux implementation.
void set_session (class session_t *session_);
void shutdown ();
void terminate ();
void attach_pipe (class pipe_writer_t *pipe_);
void detach_pipe (class pipe_writer_t *pipe_);
bool empty ();
bool send (struct zs_msg *msg_);
void flush ();
private:
// Clean-up.
~data_distributor_t ();
// Reference to the owner session object.
class session_t *session;
// Writes the message to the pipe if possible. If it isn't, writes
// a gap notification to the pipe.
void write_to_pipe (class pipe_writer_t *pipe_, struct zs_msg *msg_);
// The list of outbound pipes.
typedef std::vector <class pipe_writer_t*> pipes_t;
pipes_t pipes;
data_distributor_t (const data_distributor_t&);
void operator = (const data_distributor_t&);
};
}
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZS_DECODER_HPP_INCLUDED__
#define __ZS_DECODER_HPP_INCLUDED__
#include <stddef.h>
#include <string.h>
#include <algorithm>
namespace zs
{
// Helper base class for decoders that know the amount of data to read
// in advance at any moment. Knowing the amount in advance is a property
// of the protocol used. Both AMQP and backend protocol are based on
// size-prefixed paradigm, therefore they are using decoder_t to parse
// the messages. On the other hand, XML-based transports (like XMPP or
// SOAP) don't allow for knowing the size of data to read in advance and
// should use different decoding algorithms.
//
// Decoder implements the state machine that parses the incoming buffer.
// Derived class should implement individual state machine actions.
template <typename T> class decoder_t
{
public:
inline decoder_t () :
read_ptr (NULL),
to_read (0),
next (NULL)
{
}
// Push the binary data to the decoder. Returns number of bytes
// actually parsed.
inline size_t write (unsigned char *data_, size_t size_)
{
size_t pos = 0;
while (true) {
size_t to_copy = std::min (to_read, size_ - pos);
if (read_ptr) {
memcpy (read_ptr, data_ + pos, to_copy);
read_ptr += to_copy;
}
pos += to_copy;
to_read -= to_copy;
while (!to_read)
if (!(static_cast <T*> (this)->*next) ())
return pos;
if (pos == size_)
return pos;
}
}
protected:
// Prototype of state machine action. Action should return false if
// it is unable to push the data to the system.
typedef bool (T::*step_t) ();
// This function should be called from derived class to read data
// from the buffer and schedule next state machine action.
inline void next_step (void *read_ptr_, size_t to_read_,
step_t next_)
{
read_ptr = (unsigned char*) read_ptr_;
to_read = to_read_;
next = next_;
}
private:
unsigned char *read_ptr;
size_t to_read;
step_t next;
decoder_t (const decoder_t&);
void operator = (const decoder_t&);
};
}
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "platform.hpp"
#if defined ZS_HAVE_SOLARIS || defined ZS_HAVE_HPUX
#include <sys/devpoll.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/ioctl.h>
#include <fcntl.h>
#include <unistd.h>
#include <limits.h>
#include "devpoll.hpp"
#include "err.hpp"
#include "config.hpp"
zs::devpoll_t::devpoll_t ()
{
// Get limit on open files
struct rlimit rl;
int rc = getrlimit (RLIMIT_NOFILE, &rl);
errno_assert (rc != -1);
fd_table.resize (rl.rlim_cur);
for (rlim_t i = 0; i < rl.rlim_cur; i ++)
fd_table [i].valid = false;
devpoll_fd = open ("/dev/poll", O_RDWR);
errno_assert (devpoll_fd != -1);
}
zs::devpoll_t::~devpoll_t ()
{
close (devpoll_fd);
}
void zs::devpoll_t::devpoll_ctl (fd_t fd_, short events_)
{
struct pollfd pfd = {fd_, events_, 0};
ssize_t rc = write (devpoll_fd, &pfd, sizeof pfd);
zs_assert (rc == sizeof pfd);
}
zs::handle_t zs::devpoll_t::add_fd (fd_t fd_, i_poll_events *reactor_)
{
assert (!fd_table [fd_].valid);
fd_table [fd_].events = 0;
fd_table [fd_].reactor = reactor_;
fd_table [fd_].valid = true;
fd_table [fd_].accepted = false;
devpoll_ctl (fd_, 0);
pending_list.push_back (fd_);
// Increase the load metric of the thread.
load.add (1);
handle_t handle;
handle.fd = fd_;
return handle;
}
void zs::devpoll_t::rm_fd (handle_t handle_)
{
assert (fd_table [handle_.fd].valid);
devpoll_ctl (handle_.fd, POLLREMOVE);
fd_table [handle_.fd].valid = false;
// Decrease the load metric of the thread.
load.sub (1);
}
void zs::devpoll_t::set_pollin (handle_t handle_)
{
fd_t fd = handle_.fd;
devpoll_ctl (fd, POLLREMOVE);
fd_table [fd].events |= POLLIN;
devpoll_ctl (fd, fd_table [fd].events);
}
void zs::devpoll_t::reset_pollin (handle_t handle_)
{
fd_t fd = handle_.fd;
devpoll_ctl (fd, POLLREMOVE);
fd_table [fd].events &= ~((short) POLLIN);
devpoll_ctl (fd, fd_table [fd].events);
}
void zs::devpoll_t::set_pollout (handle_t handle_)
{
fd_t fd = handle_.fd;
devpoll_ctl (fd, POLLREMOVE);
fd_table [fd].events |= POLLOUT;
devpoll_ctl (fd, fd_table [fd].events);
}
void zs::devpoll_t::reset_pollout (handle_t handle_)
{
fd_t fd = handle_.fd;
devpoll_ctl (fd, POLLREMOVE);
fd_table [fd].events &= ~((short) POLLOUT);
devpoll_ctl (fd, fd_table [fd].events);
}
void zs::devpoll_t::add_timer (i_poll_events *events_)
{
timers.push_back (events_);
}
void zs::devpoll_t::cancel_timer (i_poll_events *events_)
{
timers_t::iterator it = std::find (timers.begin (), timers.end (), events_);
if (it != timers.end ())
timers.erase (it);
}
int zs::devpoll_t::get_load ()
{
return load.get ();
}
void zs::devpoll_t::start ()
{
worker.start (worker_routine, this);
}
void zs::devpoll_t::stop ()
{
stopping = true;
}
void zs::devpoll_t::join ()
{
worker.stop ();
}
bool zs::devpoll_t::loop ()
{
// According to the poll(7d) man page, we can retrieve
// no more then (OPEN_MAX - 1) events.
int nfds = std::min (max_io_events, OPEN_MAX - 1);
while (!stopping) {
struct pollfd ev_buf [max_io_events];
struct dvpoll poll_req;
for (pending_list_t::size_type i = 0; i < pending_list.size (); i ++)
fd_table [pending_list [i]].accepted = true;
pending_list.clear ();
poll_req.dp_fds = &ev_buf [0];
poll_req.dp_nfds = nfds;
poll_req.dp_timeout = timers.empty () ? -1 : max_timer_period;
// Wait for events.
int n = ioctl (devpoll_fd, DP_POLL, &poll_req);
if (n == -1 && errno == EINTR)
continue;
errno_assert (n != -1);
// Handle timer.
if (!n) {
// Use local list of timers as timer handlers may fill new timers
// into the original array.
timers_t t;
std::swap (timers, t);
// Trigger all the timers.
for (timers_t::iterator it = t.begin (); it != t.end (); it ++)
(*it)->timer_event ();
continue;
}
for (int i = 0; i < n; i ++) {
fd_entry_t *fd_ptr = &fd_table [ev_buf [i].fd];
if (!fd_ptr->valid || !fd_ptr->accepted)
continue;
if (ev_buf [i].revents & (POLLERR | POLLHUP))
fd_ptr->reactor->in_event ();
if (!fd_ptr->valid || !fd_ptr->accepted)
continue;
if (ev_buf [i].revents & POLLOUT)
fd_ptr->reactor->out_event ();
if (!fd_ptr->valid || !fd_ptr->accepted)
continue;
if (ev_buf [i].revents & POLLIN)
fd_ptr->reactor->in_event ();
}
}
}
void zs::devpoll_t::worker_routine (void *arg_)
{
((devpoll_t*) arg_)->loop ();
}
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZS_DEVPOLL_HPP_INCLUDED__
#define __ZS_DEVPOLL_HPP_INCLUDED__
#include "platform.hpp"
#if defined ZS_HAVE_SOLARIS || ZS_HAVE_HPUX
#include <vector>
#include "i_poller.hpp"
#include "fd.hpp"
#include "thread.hpp"
#include "atomic_counter.hpp"
namespace zs
{
// Implements socket polling mechanism using the Solaris-specific
// "/dev/poll" interface.
class devpoll_t : public i_poller
{
public:
devpoll_t ();
virtual ~devpoll_t ();
// i_poller implementation.
handle_t add_fd (fd_t fd_, i_poll_events *events_);
void rm_fd (handle_t handle_);
void set_pollin (handle_t handle_);
void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_);
void add_timer (i_poll_events *events_);
void cancel_timer (i_poll_events *events_);
int get_load ();
void start ();
void stop ();
void join ();
private:
// Main worker thread routine.
static void worker_routine (void *arg_);
// Main event loop.
void loop ();
// File descriptor referring to "/dev/poll" pseudo-device.
fd_t devpoll_fd;
struct fd_entry_t
{
short events;
struct i_poll_events *reactor;
bool valid;
bool accepted;
};
std::vector <fd_entry_t> fd_table;
typedef std::vector <fd_t> pending_list_t;
pending_list_t pending_list;
// Pollset manipulation function.
void devpoll_ctl (fd_t fd_, short events_);
// List of all the engines waiting for the timer event.
typedef std::vector <struct i_poll_events*> timers_t;
timers_t timers;
// If true, thread is in the process of shutting down.
bool stopping;
// Handle of the physical thread doing the I/O work.
thread_t worker;
// Load of the poller. Currently number of file descriptors
// registered with the poller.
atomic_counter_t load;
devpoll_t (const devpoll_t&);
void operator = (const devpoll_t&);
};
}
#endif
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zs.h"
#include "dispatcher.hpp"
#include "app_thread.hpp"
#include "io_thread.hpp"
#include "platform.hpp"
#include "err.hpp"
#include "pipe.hpp"
#include "pipe_reader.hpp"
#include "pipe_writer.hpp"
#include "session.hpp"
#include "i_api.hpp"
#if defined ZS_HAVE_WINDOWS
#include "windows.h"
#endif
zs::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_)
{
#ifdef ZS_HAVE_WINDOWS
// Intialise Windows sockets. Note that WSAStartup can be called multiple
// times given that WSACleanup will be called for each WSAStartup.
WORD version_requested = MAKEWORD (2, 2);
WSADATA wsa_data;
int rc = WSAStartup (version_requested, &wsa_data);
zs_assert (rc == 0);
zs_assert (LOBYTE (wsa_data.wVersion) == 2 &&
HIBYTE (wsa_data.wVersion) == 2);
#endif
// Create application thread proxies.
for (int i = 0; i != app_threads_; i++) {
app_thread_t *app_thread = new app_thread_t (this, i);
zs_assert (app_thread);
app_threads.push_back (app_thread);
signalers.push_back (app_thread->get_signaler ());
}
// Create I/O thread objects.
for (int i = 0; i != io_threads_; i++) {
io_thread_t *io_thread = new io_thread_t (this, i + app_threads_);
zs_assert (io_thread);
io_threads.push_back (io_thread);
signalers.push_back (io_thread->get_signaler ());
}
// Create command pipe matrix.
command_pipes = new command_pipe_t [signalers.size () * signalers.size ()];
zs_assert (command_pipes);
// Launch I/O threads.
for (int i = 0; i != io_threads_; i++)
io_threads [i]->start ();
}
void zs::dispatcher_t::shutdown ()
{
delete this;
}
zs::dispatcher_t::~dispatcher_t ()
{
// Ask I/O threads to terminate.
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
io_threads [i]->stop ();
// Wait till I/O threads actually terminate.
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
io_threads [i]->join ();
// At this point the current thread is the only thread with access to
// our internal data. Deallocation will be done exclusively in this thread.
for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
app_threads [i]->shutdown ();
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
io_threads [i]->shutdown ();
delete [] command_pipes;
// Deallocate all the pipes, pipe readers and pipe writers.
for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it++) {
delete it->pipe;
delete it->reader;
delete it->writer;
}
#ifdef ZS_HAVE_WINDOWS
// On Windows, uninitialise socket layer.
int rc = WSACleanup ();
wsa_assert (rc != SOCKET_ERROR);
#endif
}
int zs::dispatcher_t::thread_slot_count ()
{
return signalers.size ();
}
zs::i_api *zs::dispatcher_t::create_socket (int type_)
{
threads_sync.lock ();
app_thread_t *thread = choose_app_thread ();
if (!thread) {
threads_sync.unlock ();
return NULL;
}
i_api *s = thread->create_socket (type_);
threads_sync.unlock ();
return s;
}
zs::app_thread_t *zs::dispatcher_t::choose_app_thread ()
{
// Check whether thread ID is already assigned. If so, return it.
for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
if (app_threads [i]->is_current ())
return app_threads [i];
// Check whether there's an unused thread slot in the dispatcher.
for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
if (app_threads [i]->make_current ())
return app_threads [i];
// Thread limit was exceeded.
errno = EMFILE;
return NULL;
}
zs::io_thread_t *zs::dispatcher_t::choose_io_thread (uint64_t taskset_)
{
zs_assert (io_threads.size () > 0);
// Find the I/O thread with minimum load.
int min_load = io_threads [0]->get_load ();
io_threads_t::size_type result = 0;
for (io_threads_t::size_type i = 1; i != io_threads.size (); i++) {
if (!taskset_ || (taskset_ & (uint64_t (1) << i))) {
int load = io_threads [i]->get_load ();
if (load < min_load) {
min_load = load;
result = i;
}
}
}
return io_threads [result];
}
void zs::dispatcher_t::create_pipe (object_t *reader_parent_,
object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_,
pipe_reader_t **reader_, pipe_writer_t **writer_)
{
// Create the pipe, reader & writer triple.
pipe_t *pipe = new pipe_t;
zs_assert (pipe);
pipe_reader_t *reader = new pipe_reader_t (reader_parent_, pipe,
hwm_, lwm_);
zs_assert (reader);
pipe_writer_t *writer = new pipe_writer_t (writer_parent_, pipe, reader,
hwm_, lwm_);
zs_assert (writer);
reader->set_peer (writer);
// Store the pipe in the repository.
pipe_info_t info = {pipe, reader, writer};
pipes_sync.lock ();
pipe->set_index (pipes.size ());
pipes.push_back (info);
pipes_sync.unlock ();
*reader_ = reader;
*writer_ = writer;
}
void zs::dispatcher_t::destroy_pipe (pipe_t *pipe_)
{
// Remove the pipe from the repository.
pipe_info_t info;
pipes_sync.lock ();
pipes_t::size_type i = pipe_->get_index ();
info = pipes [i];
pipes [i] = pipes.back ();
pipes.pop_back ();
pipes_sync.unlock ();
// Deallocate the pipe and associated pipe reader & pipe writer.
zs_assert (info.pipe == pipe_);
delete info.pipe;
delete info.reader;
delete info.writer;
}
int zs::dispatcher_t::register_inproc_endpoint (const char *endpoint_,
session_t *session_)
{
inproc_endpoint_sync.lock ();
inproc_endpoints_t::iterator it = inproc_endpoints.find (endpoint_);
if (it != inproc_endpoints.end ()) {
inproc_endpoint_sync.unlock ();
errno = EADDRINUSE;
return -1;
}
inproc_endpoints.insert (std::make_pair (endpoint_, session_));
inproc_endpoint_sync.unlock ();
return 0;
}
zs::object_t *zs::dispatcher_t::get_inproc_endpoint (const char *endpoint_)
{
inproc_endpoint_sync.lock ();
inproc_endpoints_t::iterator it = inproc_endpoints.find (endpoint_);
if (it == inproc_endpoints.end ()) {
inproc_endpoint_sync.unlock ();
errno = EADDRNOTAVAIL;
return NULL;
}
it->second->inc_seqnum ();
object_t *session = it->second;
inproc_endpoint_sync.unlock ();
return session;
}
void zs::dispatcher_t::unregister_inproc_endpoints (session_t *session_)
{
inproc_endpoint_sync.lock ();
// Remove the connection from the repository.
// TODO: Yes, the algorithm has O(n^2) complexity. Should be O(log n).
for (inproc_endpoints_t::iterator it = inproc_endpoints.begin ();
it != inproc_endpoints.end ();) {
if (it->second == session_) {
inproc_endpoints.erase (it);
it = inproc_endpoints.begin ();
}
else
it++;
}
inproc_endpoint_sync.unlock ();
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZS_DISPATCHER_HPP_INCLUDED__
#define __ZS_DISPATCHER_HPP_INCLUDED__
#include <vector>
#include <map>
#include <string>
#include "i_signaler.hpp"
#include "ypipe.hpp"
#include "command.hpp"
#include "config.hpp"
#include "mutex.hpp"
#include "stdint.hpp"
namespace zs
{
// Dispatcher implements bidirectional thread-safe passing of commands
// between N threads. It consists of a ypipes to pass commands and
// signalers to wake up the receiver thread when new commands are
// available. Note that dispatcher is inefficient for passing messages
// within a thread (sender thread = receiver thread). The optimisation is
// not part of the class and should be implemented by individual threads
// (presumably by calling the command handling function directly).
class dispatcher_t
{
public:
// Create the dispatcher object. Matrix of pipes to communicate between
// each socket and each I/O thread is created along with appropriate
// signalers.
dispatcher_t (int app_threads_, int io_threads_);
// To be called to terminate the whole infrastructure (zs_term).
void shutdown ();
// Create a socket engine.
struct i_api *create_socket (int type_);
// Returns number of thread slots in the dispatcher. To be used by
// individual threads to find out how many distinct signals can be
// received.
int thread_slot_count ();
// Write command to the dispatcher.
inline void write (int source_, int destination_,
const command_t &command_)
{
command_pipe_t &pipe =
command_pipes [source_ * signalers.size () + destination_];
pipe.write (command_);
if (!pipe.flush ())
signalers [destination_]->signal (source_);
}
// Read command from the dispatcher. Returns false if there is no
// command available.
inline bool read (int source_, int destination_, command_t *command_)
{
return command_pipes [source_ * signalers.size () +
destination_].read (command_);
}
// Creates new pipe.
void create_pipe (class object_t *reader_parent_,
class object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_,
class pipe_reader_t **reader_, class pipe_writer_t **writer_);
// Deallocates the pipe.
void destroy_pipe (class pipe_t *pipe_);
// Registers existing session object as an inproc endpoint.
int register_inproc_endpoint (const char *endpoint_,
class session_t *session_);
// Retrieves an inproc endpoint. Increments the command sequence number
// of the object by one. Caller is thus bound to send the command
// to the connection after invoking this function. Returns NULL if
// the endpoint doesn't exist.
class object_t *get_inproc_endpoint (const char *endpoint_);
// Removes all the inproc endpoints associated with the given session
// object from the global repository.
void unregister_inproc_endpoints (class session_t *session_);
// Returns the I/O thread that is the least busy at the moment.
// Taskset specifies which I/O threads are eligible (0 = all).
class io_thread_t *choose_io_thread (uint64_t taskset_);
private:
// Clean-up.
~dispatcher_t ();
// Returns the app thread associated with the current thread.
// NULL if we are out of app thread slots.
class app_thread_t *choose_app_thread ();
// Application threads.
typedef std::vector <class app_thread_t*> app_threads_t;
app_threads_t app_threads;
// I/O threads.
typedef std::vector <class io_thread_t*> io_threads_t;
io_threads_t io_threads;
// Signalers for both application and I/O threads.
std::vector <i_signaler*> signalers;
// Pipe to hold the commands.
typedef ypipe_t <command_t, true,
command_pipe_granularity> command_pipe_t;
// NxN matrix of command pipes.
command_pipe_t *command_pipes;
// Synchronisation of accesses to shared thread data.
mutex_t threads_sync;
// Global repository of pipes. It's used only on terminal shutdown
// to deallocate all the pipes irrespective of whether they are
// referenced from pipe_reader, pipe_writer or both.
struct pipe_info_t
{
class pipe_t *pipe;
class pipe_reader_t *reader;
class pipe_writer_t *writer;
};
typedef std::vector <pipe_info_t> pipes_t;
pipes_t pipes;
// Synchronisation of access to global repository of pipes.
mutex_t pipes_sync;
// Global repository of available inproc endpoints.
typedef std::map <std::string, class session_t*> inproc_endpoints_t;
inproc_endpoints_t inproc_endpoints;
// Synchronisation of access to the global repository
// of inproc endpoints.
mutex_t inproc_endpoint_sync;
dispatcher_t (const dispatcher_t&);
void operator = (const dispatcher_t&);
};
}
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zs.h"
#include "dummy_aggregator.hpp"
#include "err.hpp"
#include "pipe_reader.hpp"
#include "session.hpp"
// Swaps pipes at specified indices.
#define swap_pipes(i1_, i2_) \
std::swap (pipes [i1_], pipes [i2_]);\
pipes [i1_]->set_index (i1_);\
pipes [i2_]->set_index (i2_);
zs::dummy_aggregator_t::dummy_aggregator_t () :
session (NULL),
pipe (NULL),
active (false)
{
}
void zs::dummy_aggregator_t::set_session (session_t *session_)
{
zs_assert (!session);
session = session_;
}
void zs::dummy_aggregator_t::shutdown ()
{
// No need to deallocate the pipe here. It'll be deallocated during the
// shutdown of the dispatcher.
delete this;
}
void zs::dummy_aggregator_t::terminate ()
{
if (pipe)
pipe->terminate ();
delete this;
}
zs::dummy_aggregator_t::~dummy_aggregator_t ()
{
}
void zs::dummy_aggregator_t::attach_pipe (pipe_reader_t *pipe_)
{
zs_assert (!pipe);
pipe = pipe_;
active = true;
// Associate new pipe with the mux object.
pipe_->set_mux (this);
session->revive ();
}
void zs::dummy_aggregator_t::detach_pipe (pipe_reader_t *pipe_)
{
zs_assert (pipe == pipe_);
deactivate (pipe_);
pipe = NULL;
}
bool zs::dummy_aggregator_t::empty ()
{
return pipe == NULL;
}
bool zs::dummy_aggregator_t::recv (zs_msg *msg_)
{
// Deallocate old content of the message.
zs_msg_close (msg_);
// Try to read from the pipe.
if (pipe && pipe->read (msg_))
return true;
// No message is available. Initialise the output parameter
// to be a 0-byte message.
zs_msg_init (msg_);
return false;
}
void zs::dummy_aggregator_t::deactivate (pipe_reader_t *pipe_)
{
active = false;
}
void zs::dummy_aggregator_t::reactivate (pipe_reader_t *pipe_)
{
active = true;
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZS_DUMMY_AGGREGATOR_HPP_INCLUDED__
#define __ZS_DUMMY_AGGREGATOR_HPP_INCLUDED__
#include <vector>
#include "i_mux.hpp"
namespace zs
{
// Fake message aggregator. There can be at most one pipe bound to it,
// so there's no real aggregation going on. However, it is more efficient
// than a real aggregator. It's intended to be used in the contexts
// where business logic ensures there'll be at most one pipe bound.
class dummy_aggregator_t : public i_mux
{
public:
dummy_aggregator_t ();
// i_mux interface implementation.
void set_session (session_t *session_);
void shutdown ();
void terminate ();
void attach_pipe (class pipe_reader_t *pipe_);
void detach_pipe (class pipe_reader_t *pipe_);
bool empty ();
void deactivate (class pipe_reader_t *pipe_);
void reactivate (class pipe_reader_t *pipe_);
bool recv (struct zs_msg *msg_);
private:
// Clean-up.
~dummy_aggregator_t ();
// Reference to the owner session object.
class session_t *session;
// The single pipe bound.
class pipe_reader_t *pipe;
// If true, the pipe is active.
bool active;
dummy_aggregator_t (const dummy_aggregator_t&);
void operator = (const dummy_aggregator_t&);
};
}
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zs.h"
#include "dummy_distributor.hpp"
#include "pipe_writer.hpp"
#include "err.hpp"
#include "session.hpp"
#include "msg.hpp"
zs::dummy_distributor_t::dummy_distributor_t () :
session (NULL)
{
}
void zs::dummy_distributor_t::set_session (session_t *session_)
{
zs_assert (!session);
session = session_;
}
void zs::dummy_distributor_t::shutdown ()
{
// No need to deallocate pipe here. It'll be deallocated during the
// shutdown of the dispatcher.
delete this;
}
void zs::dummy_distributor_t::terminate ()
{
if (pipe)
pipe->terminate ();
delete this;
}
zs::dummy_distributor_t::~dummy_distributor_t ()
{
}
void zs::dummy_distributor_t::attach_pipe (pipe_writer_t *pipe_)
{
zs_assert (!pipe);
pipe = pipe_;
}
void zs::dummy_distributor_t::detach_pipe (pipe_writer_t *pipe_)
{
zs_assert (pipe == pipe_);
pipe = NULL;
}
bool zs::dummy_distributor_t::empty ()
{
return pipe == NULL;
}
bool zs::dummy_distributor_t::send (zs_msg *msg_)
{
return pipe && pipe->write (msg_);
}
void zs::dummy_distributor_t::flush ()
{
if (pipe)
pipe->flush ();
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZS_DUMMY_DISTRIBUTOR_HPP_INCLUDED__
#define __ZS_DUMMY_DISTRIBUTOR_HPP_INCLUDED__
#include <vector>
#include <i_demux.hpp>
namespace zs
{
// Fake message distributor. There can be only one pipe bound to it
// so there no real distribution going on. However, it is more efficient
// than a real distributor and should be used where business logic
// ensures there'll be at most one pipe bound.
class dummy_distributor_t : public i_demux
{
public:
dummy_distributor_t ();
// i_demux implementation.
void set_session (class session_t *session_);
void shutdown ();
void terminate ();
void attach_pipe (class pipe_writer_t *pipe_);
void detach_pipe (class pipe_writer_t *pipe_);
bool empty ();
bool send (struct zs_msg *msg_);
void flush ();
private:
// Clean-up.
~dummy_distributor_t ();
// Reference to the owner session object.
class session_t *session;
// The bound pipe.
class pipe_writer_t *pipe;
dummy_distributor_t (const dummy_distributor_t&);
void operator = (const dummy_distributor_t&);
};
}
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZS_ENCODER_HPP_INCLUDED__
#define __ZS_ENCODER_HPP_INCLUDED__
#include <stddef.h>
#include <string.h>
#include <algorithm>
namespace zs
{
// Helper base class for encoders. It implements the state machine that
// fills the outgoing buffer. Derived classes should implement individual
// state machine actions.
template <typename T> class encoder_t
{
public:
inline encoder_t ()
{
}
// The function tries to fill the supplied chunk by binary data.
// Returns the size of data actually filled in. If offset is not
// NULL, it is filled by offset of the first message in the batch.
// If there's no beginning of a message in the batch, offset is
// set to -1.
inline size_t read (unsigned char *data_, size_t size_,
int *offset_ = NULL)
{
int offset = -1;
size_t pos = 0;
while (pos < size_) {
if (to_write) {
size_t to_copy = std::min (to_write, size_ - pos);
memcpy (data_ + pos, write_pos, to_copy);
pos += to_copy;
write_pos += to_copy;
to_write -= to_copy;
}
else {
bool more = (static_cast <T*> (this)->*next) ();
if (beginning && offset == -1) {
offset = pos;
beginning = false;
}
if (!more)
break;
}
}
if (offset_)
*offset_ = offset;
return pos;
}
protected:
// Prototype of state machine action.
typedef bool (T::*step_t) ();
// This function should be called from derived class to write the data
// to the buffer and schedule next state machine action. Set beginning
// to true when you are writing first byte of a message.
inline void next_step (void *write_pos_, size_t to_write_,
step_t next_, bool beginning_)
{
write_pos = (unsigned char*) write_pos_;
to_write = to_write_;
next = next_;
beginning = beginning_;
}
private:
unsigned char *write_pos;
size_t to_write;
step_t next;
bool beginning;
encoder_t (const encoder_t&);
void operator = (const encoder_t&);
};
}
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "platform.hpp"
#ifdef ZS_HAVE_LINUX
#include <sys/epoll.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <algorithm>
#include "epoll.hpp"
#include "err.hpp"
#include "config.hpp"
#include "i_poll_events.hpp"
zs::epoll_t::epoll_t () :
stopping (false)
{
epoll_fd = epoll_create (1);
errno_assert (epoll_fd != -1);
}
zs::epoll_t::~epoll_t ()
{
close (epoll_fd);
for (retired_t::iterator it = retired.begin (); it != retired.end (); it ++)
delete *it;
}
zs::handle_t zs::epoll_t::add_fd (fd_t fd_, i_poll_events *events_)
{
poll_entry_t *pe = new poll_entry_t;
zs_assert (pe != NULL);
// The memset is not actually needed. It's here to prevent debugging
// tools to complain about using uninitialised memory.
memset (pe, 0, sizeof (poll_entry_t));
pe->fd = fd_;
pe->ev.events = 0;
pe->ev.data.ptr = pe;
pe->events = events_;
int rc = epoll_ctl (epoll_fd, EPOLL_CTL_ADD, fd_, &pe->ev);
errno_assert (rc != -1);
// Increase the load metric of the thread.
load.add (1);
handle_t handle;
handle.ptr = pe;
return handle;
}
void zs::epoll_t::rm_fd (handle_t handle_)
{
poll_entry_t *pe = (poll_entry_t*) handle_.ptr;
int rc = epoll_ctl (epoll_fd, EPOLL_CTL_DEL, pe->fd, &pe->ev);
errno_assert (rc != -1);
pe->fd = retired_fd;
retired.push_back (pe);
// Decrease the load metric of the thread.
load.sub (1);
}
void zs::epoll_t::set_pollin (handle_t handle_)
{
poll_entry_t *pe = (poll_entry_t*) handle_.ptr;
pe->ev.events |= EPOLLIN;
int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
errno_assert (rc != -1);
}
void zs::epoll_t::reset_pollin (handle_t handle_)
{
poll_entry_t *pe = (poll_entry_t*) handle_.ptr;
pe->ev.events &= ~((short) EPOLLIN);
int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
errno_assert (rc != -1);
}
void zs::epoll_t::set_pollout (handle_t handle_)
{
poll_entry_t *pe = (poll_entry_t*) handle_.ptr;
pe->ev.events |= EPOLLOUT;
int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
errno_assert (rc != -1);
}
void zs::epoll_t::reset_pollout (handle_t handle_)
{
poll_entry_t *pe = (poll_entry_t*) handle_.ptr;
pe->ev.events &= ~((short) EPOLLOUT);
int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
errno_assert (rc != -1);
}
void zs::epoll_t::add_timer (i_poll_events *events_)
{
timers.push_back (events_);
}
void zs::epoll_t::cancel_timer (i_poll_events *events_)
{
timers_t::iterator it = std::find (timers.begin (), timers.end (), events_);
if (it == timers.end ())
return;
timers.erase (it);
}
int zs::epoll_t::get_load ()
{
return load.get ();
}
void zs::epoll_t::start ()
{
worker.start (worker_routine, this);
}
void zs::epoll_t::stop ()
{
stopping = true;
}
void zs::epoll_t::join ()
{
worker.stop ();
}
void zs::epoll_t::loop ()
{
epoll_event ev_buf [max_io_events];
while (!stopping) {
// Wait for events.
int n;
while (true) {
n = epoll_wait (epoll_fd, &ev_buf [0], max_io_events,
timers.empty () ? -1 : max_timer_period);
if (!(n == -1 && errno == EINTR)) {
errno_assert (n != -1);
break;
}
}
// Handle timer.
if (!n) {
// Use local list of timers as timer handlers may fill new timers
// into the original array.
timers_t t;
std::swap (timers, t);
// Trigger all the timers.
for (timers_t::iterator it = t.begin (); it != t.end (); it ++)
(*it)->timer_event ();
continue;
}
for (int i = 0; i < n; i ++) {
poll_entry_t *pe = ((poll_entry_t*) ev_buf [i].data.ptr);
if (pe->fd == retired_fd)
continue;
if (ev_buf [i].events & (EPOLLERR | EPOLLHUP))
pe->events->in_event ();
if (pe->fd == retired_fd)
continue;
if (ev_buf [i].events & EPOLLOUT)
pe->events->out_event ();
if (pe->fd == retired_fd)
continue;
if (ev_buf [i].events & EPOLLIN)
pe->events->in_event ();
}
// Destroy retired event sources.
for (retired_t::iterator it = retired.begin (); it != retired.end ();
it ++)
delete *it;
retired.clear ();
}
}
void zs::epoll_t::worker_routine (void *arg_)
{
((epoll_t*) arg_)->loop ();
}
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZS_EPOLL_HPP_INCLUDED__
#define __ZS_EPOLL_HPP_INCLUDED__
#include "platform.hpp"
#ifdef ZS_HAVE_LINUX
#include <vector>
#include <sys/epoll.h>
#include "i_poller.hpp"
//#include "i_poll_events.hpp"
#include "fd.hpp"
#include "thread.hpp"
#include "atomic_counter.hpp"
namespace zs
{
// This class implements socket polling mechanism using the Linux-specific
// epoll mechanism.
class epoll_t : public i_poller
{
public:
epoll_t ();
virtual ~epoll_t ();
// i_poller implementation.
handle_t add_fd (fd_t fd_, i_poll_events *events_);
void rm_fd (handle_t handle_);
void set_pollin (handle_t handle_);
void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_);
void add_timer (i_poll_events *events_);
void cancel_timer (i_poll_events *events_);
int get_load ();
void start ();
void stop ();
void join ();
private:
// Main worker thread routine.
static void worker_routine (void *arg_);
// Main event loop.
void loop ();
// Main epoll file descriptor
fd_t epoll_fd;
struct poll_entry_t
{
fd_t fd;
epoll_event ev;
struct i_poll_events *events;
};
// List of retired event sources.
typedef std::vector <poll_entry_t*> retired_t;
retired_t retired;
// List of all the engines waiting for the timer event.
typedef std::vector <struct i_poll_events*> timers_t;
timers_t timers;
// If true, thread is in the process of shutting down.
bool stopping;
// Handle of the physical thread doing the I/O work.
thread_t worker;
// Load of the poller. Currently number of file descriptors
// registered with the poller.
atomic_counter_t load;
epoll_t (const epoll_t&);
void operator = (const epoll_t&);
};
}
#endif
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "err.hpp"
#include "platform.hpp"
#ifdef ZS_HAVE_WINDOWS
const char *zs::wsa_error()
{
int errcode = WSAGetLastError ();
// TODO: This is not a generic way to handle this...
if (errcode == WSAEWOULDBLOCK)
return NULL;
return
(errcode == WSABASEERR) ?
"No Error" :
(errcode == WSAEINTR) ?
"Interrupted system call" :
(errcode == WSAEBADF) ?
"Bad file number" :
(errcode == WSAEACCES) ?
"Permission denied" :
(errcode == WSAEFAULT) ?
"Bad address" :
(errcode == WSAEINVAL) ?
"Invalid argument" :
(errcode == WSAEMFILE) ?
"Too many open files" :
(errcode == WSAEWOULDBLOCK) ?
"Operation would block" :
(errcode == WSAEINPROGRESS) ?
"Operation now in progress" :
(errcode == WSAEALREADY) ?
"Operation already in progress" :
(errcode == WSAENOTSOCK) ?
"Socket operation on non-socket" :
(errcode == WSAEDESTADDRREQ) ?
"Destination address required" :
(errcode == WSAEMSGSIZE) ?
"Message too long" :
(errcode == WSAEPROTOTYPE) ?
"Protocol wrong type for socket" :
(errcode == WSAENOPROTOOPT) ?
"Bad protocol option" :
(errcode == WSAEPROTONOSUPPORT) ?
"Protocol not supported" :
(errcode == WSAESOCKTNOSUPPORT) ?
"Socket type not supported" :
(errcode == WSAEOPNOTSUPP) ?
"Operation not supported on socket" :
(errcode == WSAEPFNOSUPPORT) ?
"Protocol family not supported" :
(errcode == WSAEAFNOSUPPORT) ?
"Address family not supported by protocol family" :
(errcode == WSAEADDRINUSE) ?
"Address already in use" :
(errcode == WSAEADDRNOTAVAIL) ?
"Can't assign requested address" :
(errcode == WSAENETDOWN) ?
"Network is down" :
(errcode == WSAENETUNREACH) ?
"Network is unreachable" :
(errcode == WSAENETRESET) ?
"Net dropped connection or reset" :
(errcode == WSAECONNABORTED) ?
"Software caused connection abort" :
(errcode == WSAECONNRESET) ?
"Connection reset by peer" :
(errcode == WSAENOBUFS) ?
"No buffer space available" :
(errcode == WSAEISCONN) ?
"Socket is already connected" :
(errcode == WSAENOTCONN) ?
"Socket is not connected" :
(errcode == WSAESHUTDOWN) ?
"Can't send after socket shutdown" :
(errcode == WSAETOOMANYREFS) ?
"Too many references can't splice" :
(errcode == WSAETIMEDOUT) ?
"Connection timed out" :
(errcode == WSAECONNREFUSED) ?
"Connection refused" :
(errcode == WSAELOOP) ?
"Too many levels of symbolic links" :
(errcode == WSAENAMETOOLONG) ?
"File name too long" :
(errcode == WSAEHOSTDOWN) ?
"Host is down" :
(errcode == WSAEHOSTUNREACH) ?
"No Route to Host" :
(errcode == WSAENOTEMPTY) ?
"Directory not empty" :
(errcode == WSAEPROCLIM) ?
"Too many processes" :
(errcode == WSAEUSERS) ?
"Too many users" :
(errcode == WSAEDQUOT) ?
"Disc Quota Exceeded" :
(errcode == WSAESTALE) ?
"Stale NFS file handle" :
(errcode == WSAEREMOTE) ?
"Too many levels of remote in path" :
(errcode == WSASYSNOTREADY) ?
"Network SubSystem is unavailable" :
(errcode == WSAVERNOTSUPPORTED) ?
"WINSOCK DLL Version out of range" :
(errcode == WSANOTINITIALISED) ?
"Successful WSASTARTUP not yet performed" :
(errcode == WSAHOST_NOT_FOUND) ?
"Host not found" :
(errcode == WSATRY_AGAIN) ?
"Non-Authoritative Host not found" :
(errcode == WSANO_RECOVERY) ?
"Non-Recoverable errors: FORMERR REFUSED NOTIMP" :
(errcode == WSANO_DATA) ?
"Valid name no data record of requested" :
"error not defined";
}
void zs::win_error (char *buffer_, size_t buffer_size_)
{
DWORD errcode = GetLastError ();
DWORD rc = FormatMessageA (FORMAT_MESSAGE_FROM_SYSTEM |
FORMAT_MESSAGE_IGNORE_INSERTS, NULL, errcode, MAKELANGID(LANG_NEUTRAL,
SUBLANG_DEFAULT), buffer_, buffer_size_, NULL );
zs_assert (rc);
}
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZS_ERR_HPP_INCLUDED__
#define __ZS_ERR_HPP_INCLUDED__
#include <assert.h>
#include <errno.h>
#include <stdlib.h>
#include <stdio.h>
#include "platform.hpp"
#ifdef ZS_HAVE_WINDOWS
#include "windows.hpp"
#else
#include <netdb.h>
#endif
#ifdef ZS_HAVE_WINDOWS
namespace zs
{
const char *wsa_error ();
void win_error (char *buffer_, size_t buffer_size_);
}
// Provides convenient way to check WSA-style errors on Windows.
#define wsa_assert(x) do { if (!(x)){\
const char *errstr = zs::wsa_error ();\
if (errstr != NULL) {\
fprintf (stderr, "Assertion failed: %s (%s:%d)\n", errstr, \
__FILE__, __LINE__);\
abort ();\
}\
}} while (false)
// Provides convenient way to check GetLastError-style errors on Windows.
#define win_assert(x) do { if (!(x)) {\
char errstr [256];\
zs::win_error (errstr, 256);\
fprintf (stderr, "Assertion failed: %s (%s:%d)\n", errstr, \
__FILE__, __LINE__);\
abort ();\
}} while (false)
#endif
// This macro works in exactly the same way as the normal assert. It is used
// in its stead because standard assert on Win32 in broken - it prints nothing
// when used within the scope of JNI library.
#define zs_assert(x) do { if (!(x)){\
fprintf (stderr, "Assertion failed: %s (%s:%d)\n", #x, \
__FILE__, __LINE__);\
abort ();\
}} while (false)
// Provides convenient way to check for errno-style errors.
#define errno_assert(x) do { if (!(x)) {\
perror (NULL);\
fprintf (stderr, "%s (%s:%d)\n", #x, __FILE__, __LINE__);\
abort ();\
}} while (false)
// Provides convenient way to check for errors from getaddrinfo.
#define gai_assert(x) do { if (x) {\
const char *errstr = gai_strerror (x);\
fprintf (stderr, "%s (%s:%d)\n", errstr, __FILE__, __LINE__);\
abort ();\
}} while (false)
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zs.h"
#include "fair_aggregator.hpp"
#include "err.hpp"
#include "pipe_reader.hpp"
#include "session.hpp"
// Swaps pipes at specified indices.
#define swap_pipes(i1_, i2_) \
std::swap (pipes [i1_], pipes [i2_]);\
pipes [i1_]->set_index (i1_);\
pipes [i2_]->set_index (i2_);
zs::fair_aggregator_t::fair_aggregator_t () :
session (NULL),
active (0),
current (0)
{
}
void zs::fair_aggregator_t::set_session (session_t *session_)
{
zs_assert (!session);
session = session_;
}
void zs::fair_aggregator_t::shutdown ()
{
// No need to deallocate pipes here. They'll be deallocated during the
// shutdown of the dispatcher.
delete this;
}
void zs::fair_aggregator_t::terminate ()
{
// Pipe unregisters itself during the call to terminate, so the pipes
// list shinks by one in each iteration.
while (!pipes.empty ())
pipes [0]->terminate ();
delete this;
}
zs::fair_aggregator_t::~fair_aggregator_t ()
{
}
void zs::fair_aggregator_t::attach_pipe (pipe_reader_t *pipe_)
{
// Associate new pipe with the mux object.
pipe_->set_mux (this);
pipes.push_back (pipe_);
active++;
if (pipes.size () > active)
swap_pipes (pipes.size () - 1, active - 1);
if (active == 1)
session->revive ();
}
void zs::fair_aggregator_t::detach_pipe (pipe_reader_t *pipe_)
{
// Move the pipe from the list of active pipes to the list of idle pipes.
deactivate (pipe_);
// Move the pipe to the end of the idle list and remove it.
swap_pipes (pipe_->get_index (), pipes.size () - 1);
pipes.pop_back ();
}
bool zs::fair_aggregator_t::empty ()
{
return pipes.empty ();
}
bool zs::fair_aggregator_t::recv (zs_msg *msg_)
{
// Deallocate old content of the message.
zs_msg_close (msg_);
// O(1) fair queueing. Round-robin over the active pipes to get
// next message.
for (pipes_t::size_type i = active; i != 0; i--) {
// Update current.
current = (current + 1) % active;
// Try to read from current.
if (pipes [current]->read (msg_))
return true;
}
// No message is available. Initialise the output parameter
// to be a 0-byte message.
zs_msg_init (msg_);
return false;
}
void zs::fair_aggregator_t::deactivate (pipe_reader_t *pipe_)
{
int index = pipe_->get_index ();
// Suspend an active pipe.
swap_pipes (index, active - 1);
active--;
// If the deactiveted pipe is the current one, shift the current one pipe
// backwards so that the pipe that replaced the deactiveted one will be
// processed immediately rather than skipped.
if (index == (int) current) {
index--;
if (index == -1)
index = active - 1;
current = index;
}
}
void zs::fair_aggregator_t::reactivate (pipe_reader_t *pipe_)
{
// Revive an idle pipe.
swap_pipes (pipe_->get_index (), active);
active++;
if (active == 1)
session->revive ();
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment