Commit 36428134 authored by malosek's avatar malosek

Merge branch 'master' of git@github.com:sustrik/zeromq2

parents bdf22e9c f7ad4a20
......@@ -14,8 +14,8 @@ if BUILD_PERF
DIR_PERF = perf
endif
SUBDIRS = src $(DIR_P) $(DIR_R) $(DIR_J) $(DIR_PERF)
DIST_SUBDIRS = src python ruby java perf
SUBDIRS = src $(DIR_P) $(DIR_R) $(DIR_J) $(DIR_PERF) devices
DIST_SUBDIRS = src python ruby java perf devices
EXTRA_DIST = $(top_srcdir)/foreign/openpgm/@pgm_basename@.tar.bz2
......
......@@ -44,16 +44,15 @@ extern "C" {
#define ZMQ_VSM 32
// Socket options.
#define ZMQ_HWM 1
#define ZMQ_LWM 2
#define ZMQ_SWAP 3
#define ZMQ_MASK 4
#define ZMQ_AFFINITY 5
#define ZMQ_IDENTITY 6
#define ZMQ_SUBSCRIBE 7
#define ZMQ_UNSUBSCRIBE 8
#define ZMQ_RATE 9
#define ZMQ_RECOVERY_IVL 10
#define ZMQ_HWM 1 // int64_t
#define ZMQ_LWM 2 // int64_t
#define ZMQ_SWAP 3 // int64_t
#define ZMQ_AFFINITY 4 // int64_t
#define ZMQ_IDENTITY 5 // string
#define ZMQ_SUBSCRIBE 6 // string
#define ZMQ_UNSUBSCRIBE 7 // string
#define ZMQ_RATE 8 // int64_t
#define ZMQ_RECOVERY_IVL 9 // int64_t
// The operation should be performed in non-blocking mode. I.e. if it cannot
// be processed immediately, error should be returned with errno set to EAGAIN.
......@@ -186,12 +185,12 @@ ZMQ_EXPORT int zmq_connect (void *s, const char *addr);
//
// Errors: EAGAIN - message cannot be sent at the moment (applies only to
// non-blocking send).
// ENOTSUP - function isn't supported by particular socket type.
// EFAULT - function isn't supported by particular socket type.
ZMQ_EXPORT int zmq_send (void *s, struct zmq_msg_t *msg, int flags);
// Flush the messages that were send using ZMQ_NOFLUSH flag down the stream.
//
// Errors: ENOTSUP - function isn't supported by particular socket type.
// Errors: FAULT - function isn't supported by particular socket type.
ZMQ_EXPORT int zmq_flush (void *s);
// Send a message from the socket 's'. 'flags' argument can be combination
......@@ -200,7 +199,7 @@ ZMQ_EXPORT int zmq_flush (void *s);
//
// Errors: EAGAIN - message cannot be received at the moment (applies only to
// non-blocking receive).
// ENOTSUP - function isn't supported by particular socket type.
// EFAULT - function isn't supported by particular socket type.
ZMQ_EXPORT int zmq_recv (void *s, struct zmq_msg_t *msg, int flags);
// Helper functions used by perf tests so that they don't have to care
......
......@@ -441,6 +441,16 @@ if test "x$with_pgm_ext" != "xno"; then
pgm_ext="yes"
fi
# forwarder device
forwarder="no"
AC_ARG_WITH([forwarder], [AS_HELP_STRING([--with-forwarder],
[build forwarder device [default=no]])], [with_forwarder=yes], [with_forwarder=no])
if test "x$with_forwarder" != "xno"; then
forwarder="yes"
fi
# Perf
perf="no"
AC_ARG_WITH([perf], [AS_HELP_STRING([--with-perf],
......@@ -462,6 +472,7 @@ AM_CONDITIONAL(BUILD_RUBY, test "x$rbzmq" = "xyes")
AM_CONDITIONAL(BUILD_C, test "x$czmq" = "xyes")
AM_CONDITIONAL(BUILD_CPP, test "x$cppzmq" = "xyes")
AM_CONDITIONAL(BUILD_PGM, test "x$pgm_ext" = "xyes")
AM_CONDITIONAL(BUILD_FORWARDER, test "x$forwarder" = "xyes")
AM_CONDITIONAL(BUILD_PERF, test "x$perf" = "xyes")
AC_SUBST(stdint)
......@@ -479,7 +490,8 @@ AC_CHECK_FUNCS(perror gettimeofday memset socket getifaddrs freeifaddrs)
AC_OUTPUT(Makefile src/Makefile python/Makefile python/setup.py ruby/Makefile \
java/Makefile perf/Makefile perf/c/Makefile perf/cpp/Makefile \
perf/python/Makefile perf/ruby/Makefile perf/java/Makefile src/libzmq.pc)
perf/python/Makefile perf/ruby/Makefile perf/java/Makefile src/libzmq.pc \
devices/Makefile devices/forwarder/Makefile)
AC_MSG_RESULT([])
AC_MSG_RESULT([ ******************************************************** ])
......@@ -492,19 +504,26 @@ AC_MSG_RESULT([ license text. ])
AC_MSG_RESULT([ ******************************************************** ])
AC_MSG_RESULT([])
AC_MSG_RESULT([ 0MQ install dir: $prefix])
AC_MSG_RESULT([ C language binding: $czmq])
AC_MSG_RESULT([ C++ language binding: $cppzmq])
AC_MSG_RESULT([ Python language binding: $pyzmq])
AC_MSG_RESULT([ Ruby language binding: $rbzmq])
AC_MSG_RESULT([ Language bindings:])
AC_MSG_RESULT([ C: $czmq])
AC_MSG_RESULT([ C++: $cppzmq])
AC_MSG_RESULT([ Java: $jzmq])
AC_MSG_RESULT([ Python: $pyzmq])
AC_MSG_RESULT([ Ruby: $rbzmq])
if test "x$rbzmq" = "xyes"; then
AC_MSG_RESULT([ Ruby library install dir: $rubydir])
AC_MSG_RESULT([ Ruby library install dir: $rubydir])
fi
AC_MSG_RESULT([ Java language binding: $jzmq])
AC_MSG_RESULT([ Network protocols:])
AC_MSG_RESULT([ TCP: yes])
if test "x$pgm_ext" = "xyes"; then
AC_MSG_RESULT([ PGM extension: $pgm_ext ($pgm_name)])
AC_MSG_RESULT([ PGM: $pgm_ext ($pgm_name)])
else
AC_MSG_RESULT([ PGM extension: $pgm_ext])
AC_MSG_RESULT([ PGM: $pgm_ext])
fi
AC_MSG_RESULT([ performance tests: $perf])
AC_MSG_RESULT([ Devices:])
AC_MSG_RESULT([ forwarder: $forwarder])
AC_MSG_RESULT([ Performance tests: $perf])
AC_MSG_RESULT([])
AC_MSG_RESULT([ ******************************************************** ])
AC_MSG_RESULT([])
if BUILD_FORWARDER
FORWARDER_DIR = forwarder
endif
SUBDIRS = $(FORWARDER_DIR)
DIST_SUBDIRS = forwarder
INCLUDES = -I$(top_builddir)/c -I$(top_builddir)/cpp
bin_PROGRAMS = forwarder
forwarder_LDADD = $(top_builddir)/src/libzmq.la
forwarder_SOURCES = forwarder.cpp
forwarder_CXXFLAGS = -Wall -pedantic -Werror
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../../cpp/zmq.hpp"
#include "../../foreign/xmlParser/xmlParser.cpp"
int main (int argc, char *argv [])
{
if (argc != 2) {
fprintf (stderr, "usage: forwarder <config-file>\n");
return 1;
}
XMLNode root = XMLNode::parseFile (argv [1]);
if (root.isEmpty ()) {
fprintf (stderr, "configuration file not found\n");
return 1;
}
if (strcmp (root.getName (), "forwarder") != 0) {
fprintf (stderr, "root element in the configuration file should be "
"named 'forwarder'\n");
return 1;
}
XMLNode in_node = root.getChildNode ("in");
if (in_node.isEmpty ()) {
fprintf (stderr, "'in' node is missing in the configuration file\n");
return 1;
}
XMLNode out_node = root.getChildNode ("out");
if (out_node.isEmpty ()) {
fprintf (stderr, "'out' node is missing in the configuration file\n");
return 1;
}
// TODO: make the number of I/O threads configurable.
zmq::context_t ctx (1, 1);
zmq::socket_t in_socket (ctx, ZMQ_P2P);
zmq::socket_t out_socket (ctx, ZMQ_P2P);
int n = 0;
while (true) {
XMLNode bind = in_node.getChildNode ("bind", n);
if (bind.isEmpty ())
break;
const char *addr = bind.getAttribute ("addr");
if (!addr) {
fprintf (stderr, "'bind' node is missing 'addr' attribute\n");
return 1;
}
in_socket.bind (addr);
n++;
}
n = 0;
while (true) {
XMLNode connect = in_node.getChildNode ("connect", n);
if (connect.isEmpty ())
break;
const char *addr = connect.getAttribute ("addr");
if (!addr) {
fprintf (stderr, "'connect' node is missing 'addr' attribute\n");
return 1;
}
in_socket.connect (addr);
n++;
}
n = 0;
while (true) {
XMLNode bind = out_node.getChildNode ("bind", n);
if (bind.isEmpty ())
break;
const char *addr = bind.getAttribute ("addr");
if (!addr) {
fprintf (stderr, "'bind' node is missing 'addr' attribute\n");
return 1;
}
out_socket.bind (addr);
n++;
}
n = 0;
while (true) {
XMLNode connect = out_node.getChildNode ("connect", n);
if (connect.isEmpty ())
break;
const char *addr = connect.getAttribute ("addr");
if (!addr) {
fprintf (stderr, "'connect' node is missing 'addr' attribute\n");
return 1;
}
out_socket.connect (addr);
n++;
}
zmq::message_t msg;
while (true) {
in_socket.recv (&msg);
out_socket.send (msg);
}
return 0;
}
This source diff could not be displayed because it is too large. You can view the blob instead.
This diff is collapsed.
......@@ -22,6 +22,8 @@
#include <assert.h>
#include <errno.h>
#include "../src/stdint.hpp"
#include "zmq.h"
#include "org_zmq_Socket.h"
......@@ -86,80 +88,60 @@ JNIEXPORT void JNICALL Java_org_zmq_Socket_finalize (JNIEnv *env, jobject obj)
assert (rc == 0);
}
JNIEXPORT void JNICALL Java_org_zmq_Socket_setHwm (JNIEnv *env, jobject obj,
jlong hwm)
JNIEXPORT void JNICALL Java_org_zmq_Socket_setsockopt__IJ (JNIEnv *env,
jobject obj, jint option, jlong optval)
{
void *s = (void*) env->GetLongField (obj, socket_handle_fid);
assert (s);
int rc = zmq_setsockopt (s, ZMQ_HWM, &hwm, sizeof hwm);
if (rc == -1)
raise_exception (env, errno);
}
JNIEXPORT void JNICALL Java_org_zmq_Socket_setLwm (JNIEnv *env, jobject obj,
jlong lwm)
{
void *s = (void*) env->GetLongField (obj, socket_handle_fid);
assert (s);
int rc = zmq_setsockopt (s, ZMQ_LWM, &lwm, sizeof lwm);
if (rc == -1)
raise_exception (env, errno);
}
JNIEXPORT void JNICALL Java_org_zmq_Socket_setSwap (JNIEnv *env, jobject obj,
jlong swap_size)
{
void *s = (void*) env->GetLongField (obj, socket_handle_fid);
assert (s);
int rc = zmq_setsockopt (s, ZMQ_SWAP, &swap_size, sizeof swap_size);
if (rc == -1)
raise_exception (env, errno);
}
JNIEXPORT void JNICALL Java_org_zmq_Socket_setMask (JNIEnv *env, jobject obj,
jlong mask)
{
void *s = (void*) env->GetLongField (obj, socket_handle_fid);
assert (s);
int rc = zmq_setsockopt (s, ZMQ_MASK, &mask, sizeof mask);
if (rc == -1)
raise_exception (env, errno);
}
JNIEXPORT void JNICALL Java_org_zmq_Socket_setAffinity (JNIEnv *env,
jobject obj, jlong affinity)
{
void *s = (void*) env->GetLongField (obj, socket_handle_fid);
assert (s);
int rc = zmq_setsockopt (s, ZMQ_AFFINITY, &affinity, sizeof affinity);
if (rc == -1)
raise_exception (env, errno);
switch (option) {
case ZMQ_HWM:
case ZMQ_LWM:
case ZMQ_SWAP:
case ZMQ_AFFINITY:
case ZMQ_RATE:
case ZMQ_RECOVERY_IVL:
{
void *s = (void*) env->GetLongField (obj, socket_handle_fid);
assert (s);
int64_t value = optval;
int rc = zmq_setsockopt (s, option, &value, sizeof (value));
if (rc != 0)
raise_exception (env, errno);
return;
}
default:
raise_exception (env, EINVAL);
return;
}
}
JNIEXPORT void JNICALL Java_org_zmq_Socket_setIdentity (JNIEnv *env,
jobject obj, jstring identity)
JNIEXPORT void JNICALL Java_org_zmq_Socket_setsockopt__ILjava_lang_String_2 (
JNIEnv *env, jobject obj, jint option, jstring optval)
{
void *s = (void*) env->GetLongField (obj, socket_handle_fid);
assert (s);
if (identity == NULL) {
switch (option) {
case ZMQ_IDENTITY:
case ZMQ_SUBSCRIBE:
case ZMQ_UNSUBSCRIBE:
{
if (optval == NULL) {
raise_exception (env, EINVAL);
return;
}
void *s = (void*) env->GetLongField (obj, socket_handle_fid);
assert (s);
const char *value = env->GetStringUTFChars (optval, NULL);
assert (value);
int rc = zmq_setsockopt (s, option, value, strlen (value));
env->ReleaseStringUTFChars (optval, value);
if (rc != 0)
raise_exception (env, errno);
return;
}
default:
raise_exception (env, EINVAL);
return;
}
const char *c_identity = env->GetStringUTFChars (identity, NULL);
if (c_identity == NULL)
return;
int rc = zmq_setsockopt (s, ZMQ_IDENTITY, c_identity, sizeof c_identity);
env->ReleaseStringUTFChars (identity, c_identity);
if (rc == -1)
raise_exception (env, errno);
}
JNIEXPORT void JNICALL Java_org_zmq_Socket_bind (JNIEnv *env, jobject obj,
......
......@@ -27,19 +27,24 @@ public class Socket
}
public static final int NOBLOCK = 1;
public static final int NOFLUSH = 2;
public static final int P2P = 0;
public static final int PUB = 1;
public static final int SUB = 2;
public static final int REQ = 3;
public static final int REP = 4;
public static final int HWM = 1;
public static final int LWM = 2;
public static final int SWAP = 3;
public static final int AFFINITY = 4;
public static final int IDENTITY = 5;
public static final int SUBSCRIBE = 6;
public static final int UNSUBSCRIBE = 7;
public static final int RATE = 8;
public static final int RECOVERY_IVL = 9;
/**
* Class constructor.
*
......@@ -51,46 +56,13 @@ public class Socket
}
/**
* Set the high watermark on the socket.
*
* @param hwm high watermark.
*/
public native void setHwm (long hwm);
/**
* Set the low watermark on the socket.
*
* @param lwm low watermark.
*/
public native void setLwm (long lwm);
/**
* Set swap size.
*
* @param swap_size swap size.
*/
public native void setSwap (long swap_size);
/**
* Set reception mask.
*
* @param mask mask.
*/
public native void setMask (long mask);
/**
* Set affinity.
*
* @param affinity
*/
public native void setAffinity (long affinity);
/**
* Set identity.
* Set the socket option value.
*
* @param identity
* @param option ID of the option to set
* @param optval value to set the option to
*/
public native void setIdentity (String identity);
public native void setsockopt (int option, long optval);
public native void setsockopt (int option, String optval);
/**
* Bind to network interface. Start listening for new connections.
......
<?xml version="1.0" encoding="windows-1250"?>
<VisualStudioProject
ProjectType="Visual C++"
Version="9.00"
Name="forwarder"
ProjectGUID="{7529DFB2-685C-4534-B40E-0B148509178A}"
RootNamespace="forwarder"
TargetFrameworkVersion="196613"
>
<Platforms>
<Platform
Name="Win32"
/>
</Platforms>
<ToolFiles>
</ToolFiles>
<Configurations>
<Configuration
Name="Debug|Win32"
OutputDirectory="$(SolutionDir)$(ConfigurationName)"
IntermediateDirectory="$(ConfigurationName)"
ConfigurationType="1"
CharacterSet="2"
>
<Tool
Name="VCPreBuildEventTool"
/>
<Tool
Name="VCCustomBuildTool"
/>
<Tool
Name="VCXMLDataGeneratorTool"
/>
<Tool
Name="VCWebServiceProxyGeneratorTool"
/>
<Tool
Name="VCMIDLTool"
/>
<Tool
Name="VCCLCompilerTool"
Optimization="0"
AdditionalIncludeDirectories="../../c;../../cpp"
MinimalRebuild="true"
BasicRuntimeChecks="3"
RuntimeLibrary="3"
WarningLevel="3"
DebugInformationFormat="4"
/>
<Tool
Name="VCManagedResourceCompilerTool"
/>
<Tool
Name="VCResourceCompilerTool"
/>
<Tool
Name="VCPreLinkEventTool"
/>
<Tool
Name="VCLinkerTool"
GenerateDebugInformation="true"
TargetMachine="1"
/>
<Tool
Name="VCALinkTool"
/>
<Tool
Name="VCManifestTool"
/>
<Tool
Name="VCXDCMakeTool"
/>
<Tool
Name="VCBscMakeTool"
/>
<Tool
Name="VCFxCopTool"
/>
<Tool
Name="VCAppVerifierTool"
/>
<Tool
Name="VCPostBuildEventTool"
/>
</Configuration>
<Configuration
Name="Release|Win32"
OutputDirectory="$(SolutionDir)$(ConfigurationName)"
IntermediateDirectory="$(ConfigurationName)"
ConfigurationType="1"
CharacterSet="2"
WholeProgramOptimization="1"
>
<Tool
Name="VCPreBuildEventTool"
/>
<Tool
Name="VCCustomBuildTool"
/>
<Tool
Name="VCXMLDataGeneratorTool"
/>
<Tool
Name="VCWebServiceProxyGeneratorTool"
/>
<Tool
Name="VCMIDLTool"
/>
<Tool
Name="VCCLCompilerTool"
Optimization="2"
EnableIntrinsicFunctions="true"
AdditionalIncludeDirectories="../../c;../../cpp"
RuntimeLibrary="2"
EnableFunctionLevelLinking="true"
WarningLevel="3"
DebugInformationFormat="3"
/>
<Tool
Name="VCManagedResourceCompilerTool"
/>
<Tool
Name="VCResourceCompilerTool"
/>
<Tool
Name="VCPreLinkEventTool"
/>
<Tool
Name="VCLinkerTool"
GenerateDebugInformation="true"
OptimizeReferences="2"
EnableCOMDATFolding="2"
TargetMachine="1"
/>
<Tool
Name="VCALinkTool"
/>
<Tool
Name="VCManifestTool"
/>
<Tool
Name="VCXDCMakeTool"
/>
<Tool
Name="VCBscMakeTool"
/>
<Tool
Name="VCFxCopTool"
/>
<Tool
Name="VCAppVerifierTool"
/>
<Tool
Name="VCPostBuildEventTool"
/>
</Configuration>
</Configurations>
<References>
</References>
<Files>
<Filter
Name="Source Files"
Filter="cpp;c;cc;cxx;def;odl;idl;hpj;bat;asm;asmx"
UniqueIdentifier="{4FC737F1-C7A5-4376-A066-2A32D752A2FF}"
>
<File
RelativePath="..\..\devices\forwarder\forwarder.cpp"
>
</File>
</Filter>
</Files>
<Globals>
</Globals>
</VisualStudioProject>
......@@ -229,6 +229,10 @@
RelativePath="..\..\src\poll.cpp"
>
</File>
<File
RelativePath="..\..\src\pub.cpp"
>
</File>
<File
RelativePath="..\..\src\select.cpp"
>
......
......@@ -73,6 +73,11 @@ Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "python", "python\python.vcp
{641C5F36-32EE-4323-B740-992B651CF9D6} = {641C5F36-32EE-4323-B740-992B651CF9D6}
EndProjectSection
EndProject
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "forwarder", "forwarder\forwarder.vcproj", "{7529DFB2-685C-4534-B40E-0B148509178A}"
ProjectSection(ProjectDependencies) = postProject
{641C5F36-32EE-4323-B740-992B651CF9D6} = {641C5F36-32EE-4323-B740-992B651CF9D6}
EndProjectSection
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Win32 = Debug|Win32
......@@ -138,6 +143,10 @@ Global
{CB9B47E2-3F9C-45B2-9C30-C27038CDB856}.Debug|Win32.ActiveCfg = Debug|Win32
{CB9B47E2-3F9C-45B2-9C30-C27038CDB856}.Release|Win32.ActiveCfg = Release|Win32
{CB9B47E2-3F9C-45B2-9C30-C27038CDB856}.Release|Win32.Build.0 = Release|Win32
{7529DFB2-685C-4534-B40E-0B148509178A}.Debug|Win32.ActiveCfg = Debug|Win32
{7529DFB2-685C-4534-B40E-0B148509178A}.Debug|Win32.Build.0 = Debug|Win32
{7529DFB2-685C-4534-B40E-0B148509178A}.Release|Win32.ActiveCfg = Release|Win32
{7529DFB2-685C-4534-B40E-0B148509178A}.Release|Win32.Build.0 = Release|Win32
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
......
......@@ -484,9 +484,6 @@ PyMODINIT_FUNC initlibpyzmq ()
t = PyInt_FromLong (ZMQ_SWAP);
PyDict_SetItemString (dict, "SWAP", t);
Py_DECREF (t);
t = PyInt_FromLong (ZMQ_MASK);
PyDict_SetItemString (dict, "MASK", t);
Py_DECREF (t);
t = PyInt_FromLong (ZMQ_AFFINITY);
PyDict_SetItemString (dict, "AFFINITY", t);
Py_DECREF (t);
......
......@@ -280,11 +280,12 @@ extern "C" void Init_librbzmq ()
rb_define_global_const ("HWM", INT2NUM (ZMQ_HWM));
rb_define_global_const ("LWM", INT2NUM (ZMQ_LWM));
rb_define_global_const ("SWAP", INT2NUM (ZMQ_SWAP));
rb_define_global_const ("MASK", INT2NUM (ZMQ_MASK));
rb_define_global_const ("AFFINITY", INT2NUM (ZMQ_AFFINITY));
rb_define_global_const ("IDENTITY", INT2NUM (ZMQ_IDENTITY));
rb_define_global_const ("NOBLOCK", INT2NUM (ZMQ_NOBLOCK));
rb_define_global_const ("NOFLUSH", INT2NUM (ZMQ_NOFLUSH));
rb_define_global_const ("P2P", INT2NUM (ZMQ_P2P));
rb_define_global_const ("SUB", INT2NUM (ZMQ_SUB));
rb_define_global_const ("PUB", INT2NUM (ZMQ_PUB));
......
......@@ -67,6 +67,7 @@ libzmq_la_SOURCES = $(pgm_sources) \
pipe.hpp \
platform.hpp \
poll.hpp \
pub.hpp \
select.hpp \
session.hpp \
simple_semaphore.hpp \
......@@ -107,6 +108,7 @@ libzmq_la_SOURCES = $(pgm_sources) \
pgm_socket.cpp \
pipe.cpp \
poll.cpp \
pub.cpp \
select.cpp \
session.cpp \
socket_base.cpp \
......
......@@ -35,6 +35,7 @@
#include "pipe.hpp"
#include "config.hpp"
#include "socket_base.hpp"
#include "pub.hpp"
#include "sub.hpp"
// If the RDTSC is available we use it to prevent excessive
......@@ -138,11 +139,13 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
{
socket_base_t *s = NULL;
switch (type_) {
case ZMQ_PUB:
s = new pub_t (this);
break;
case ZMQ_SUB:
s = new sub_t (this);
break;
case ZMQ_P2P:
case ZMQ_PUB:
case ZMQ_REQ:
case ZMQ_REP:
s = new socket_base_t (this, type_);
......
......@@ -23,9 +23,8 @@ zmq::options_t::options_t () :
hwm (0),
lwm (0),
swap (0),
mask (0),
affinity (0),
rate (0),
recovery_ivl (0)
rate (100),
recovery_ivl (10)
{
}
......@@ -34,7 +34,6 @@ namespace zmq
int64_t hwm;
int64_t lwm;
int64_t swap;
uint64_t mask;
uint64_t affinity;
std::string identity;
......
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../c/zmq.h"
#include "pub.hpp"
#include "err.hpp"
zmq::pub_t::pub_t (class app_thread_t *parent_) :
socket_base_t (parent_, ZMQ_SUB)
{
}
zmq::pub_t::~pub_t ()
{
}
int zmq::pub_t::recv (struct zmq_msg_t *msg_, int flags_)
{
errno = EFAULT;
return -1;
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_PUB_INCLUDED__
#define __ZMQ_PUB_INCLUDED__
#include "socket_base.hpp"
namespace zmq
{
class pub_t : public socket_base_t
{
public:
pub_t (class app_thread_t *parent_);
~pub_t ();
// Overloads of API functions from socket_base_t.
int recv (struct zmq_msg_t *msg_, int flags_);
};
}
#endif
......@@ -53,10 +53,10 @@ zmq::select_t::select_t () :
zmq::select_t::~select_t ()
{
worker.stop ();
// Make sure there are no fds registered on shutdown.
zmq_assert (load.get () == 0);
worker.stop ();
}
zmq::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_)
......
......@@ -123,14 +123,6 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
options.swap = *((int64_t*) optval_);
return 0;
case ZMQ_MASK:
if (optvallen_ != sizeof (int64_t)) {
errno = EINVAL;
return -1;
}
options.mask = (uint64_t) *((int64_t*) optval_);
return 0;
case ZMQ_AFFINITY:
if (optvallen_ != sizeof (int64_t)) {
errno = EINVAL;
......@@ -149,19 +141,19 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
return -1;
case ZMQ_RATE:
if (optvallen_ != sizeof (uint32_t)) {
if (optvallen_ != sizeof (int64_t)) {
errno = EINVAL;
return -1;
}
options.rate = *((int32_t*) optval_);
options.rate = (uint32_t) *((int64_t*) optval_);
return 0;
case ZMQ_RECOVERY_IVL:
if (optvallen_ != sizeof (uint32_t)) {
if (optvallen_ != sizeof (int64_t)) {
errno = EINVAL;
return -1;
}
options.recovery_ivl = *((int32_t*) optval_);
options.recovery_ivl = (uint32_t) *((int64_t*) optval_);
return 0;
default:
......@@ -287,7 +279,7 @@ int zmq::socket_base_t::connect (const char *addr_)
#endif
// Unknown address type.
errno = ENOTSUP;
errno = EFAULT;
return -1;
}
......
......@@ -78,6 +78,18 @@ int zmq::sub_t::setsockopt (int option_, const void *optval_,
return socket_base_t::setsockopt (option_, optval_, optvallen_);
}
int zmq::sub_t::send (struct zmq_msg_t *msg_, int flags_)
{
errno = EFAULT;
return -1;
}
int zmq::sub_t::flush ()
{
errno = EFAULT;
return -1;
}
int zmq::sub_t::recv (struct zmq_msg_t *msg_, int flags_)
{
while (true) {
......
......@@ -37,6 +37,8 @@ namespace zmq
// Overloads of API functions from socket_base_t.
int setsockopt (int option_, const void *optval_, size_t optvallen_);
int send (struct zmq_msg_t *msg_, int flags_);
int flush ();
int recv (struct zmq_msg_t *msg_, int flags_);
private:
......
......@@ -20,6 +20,7 @@
#include "zmq_decoder.hpp"
#include "i_inout.hpp"
#include "wire.hpp"
#include "err.hpp"
zmq::zmq_decoder_t::zmq_decoder_t () :
destination (NULL)
......@@ -48,7 +49,11 @@ bool zmq::zmq_decoder_t::one_byte_size_ready ()
if (*tmpbuf == 0xff)
next_step (tmpbuf, 8, &zmq_decoder_t::eight_byte_size_ready);
else {
zmq_msg_init_size (&in_progress, *tmpbuf);
// TODO: Handle over-sized message decently.
int rc = zmq_msg_init_size (&in_progress, *tmpbuf);
errno_assert (rc == 0);
next_step (zmq_msg_data (&in_progress), *tmpbuf,
&zmq_decoder_t::message_ready);
}
......@@ -60,7 +65,11 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready ()
// 8-byte size is read. Allocate the buffer for message body and
// read the message data into it.
size_t size = (size_t) get_uint64 (tmpbuf);
zmq_msg_init_size (&in_progress, size);
// TODO: Handle over-sized message decently.
int rc = zmq_msg_init_size (&in_progress, size);
errno_assert (rc == 0);
next_step (zmq_msg_data (&in_progress), size,
&zmq_decoder_t::message_ready);
return true;
......
......@@ -93,8 +93,11 @@ void zmq::zmq_listener_init_t::flush ()
void zmq::zmq_listener_init_t::detach ()
{
// TODO: Engine is closing down. Init object is to be closed as well.
zmq_assert (false);
// This function is called by engine when disconnection occurs.
// The engine will destroy itself, so we just drop the pointer here and
// start termination of the init object.
engine = NULL;
term ();
}
void zmq::zmq_listener_init_t::process_plug ()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment