Commit 47350adc authored by Martin Sustrik's avatar Martin Sustrik

separate class for PUB-style socket added

parent e940878b
...@@ -184,12 +184,12 @@ ZMQ_EXPORT int zmq_connect (void *s, const char *addr); ...@@ -184,12 +184,12 @@ ZMQ_EXPORT int zmq_connect (void *s, const char *addr);
// //
// Errors: EAGAIN - message cannot be sent at the moment (applies only to // Errors: EAGAIN - message cannot be sent at the moment (applies only to
// non-blocking send). // non-blocking send).
// ENOTSUP - function isn't supported by particular socket type. // EFAULT - function isn't supported by particular socket type.
ZMQ_EXPORT int zmq_send (void *s, struct zmq_msg_t *msg, int flags); ZMQ_EXPORT int zmq_send (void *s, struct zmq_msg_t *msg, int flags);
// Flush the messages that were send using ZMQ_NOFLUSH flag down the stream. // Flush the messages that were send using ZMQ_NOFLUSH flag down the stream.
// //
// Errors: ENOTSUP - function isn't supported by particular socket type. // Errors: FAULT - function isn't supported by particular socket type.
ZMQ_EXPORT int zmq_flush (void *s); ZMQ_EXPORT int zmq_flush (void *s);
// Send a message from the socket 's'. 'flags' argument can be combination // Send a message from the socket 's'. 'flags' argument can be combination
...@@ -198,7 +198,7 @@ ZMQ_EXPORT int zmq_flush (void *s); ...@@ -198,7 +198,7 @@ ZMQ_EXPORT int zmq_flush (void *s);
// //
// Errors: EAGAIN - message cannot be received at the moment (applies only to // Errors: EAGAIN - message cannot be received at the moment (applies only to
// non-blocking receive). // non-blocking receive).
// ENOTSUP - function isn't supported by particular socket type. // EFAULT - function isn't supported by particular socket type.
ZMQ_EXPORT int zmq_recv (void *s, struct zmq_msg_t *msg, int flags); ZMQ_EXPORT int zmq_recv (void *s, struct zmq_msg_t *msg, int flags);
// Helper functions used by perf tests so that they don't have to care // Helper functions used by perf tests so that they don't have to care
......
...@@ -65,6 +65,7 @@ libzmq_la_SOURCES = $(pgm_sources) \ ...@@ -65,6 +65,7 @@ libzmq_la_SOURCES = $(pgm_sources) \
pipe.hpp \ pipe.hpp \
platform.hpp \ platform.hpp \
poll.hpp \ poll.hpp \
pub.hpp \
select.hpp \ select.hpp \
session.hpp \ session.hpp \
simple_semaphore.hpp \ simple_semaphore.hpp \
...@@ -103,6 +104,7 @@ libzmq_la_SOURCES = $(pgm_sources) \ ...@@ -103,6 +104,7 @@ libzmq_la_SOURCES = $(pgm_sources) \
owned.cpp \ owned.cpp \
pipe.cpp \ pipe.cpp \
poll.cpp \ poll.cpp \
pub.cpp \
select.cpp \ select.cpp \
session.cpp \ session.cpp \
socket_base.cpp \ socket_base.cpp \
......
...@@ -35,6 +35,7 @@ ...@@ -35,6 +35,7 @@
#include "pipe.hpp" #include "pipe.hpp"
#include "config.hpp" #include "config.hpp"
#include "socket_base.hpp" #include "socket_base.hpp"
#include "pub.hpp"
#include "sub.hpp" #include "sub.hpp"
// If the RDTSC is available we use it to prevent excessive // If the RDTSC is available we use it to prevent excessive
...@@ -138,11 +139,13 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_) ...@@ -138,11 +139,13 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
{ {
socket_base_t *s = NULL; socket_base_t *s = NULL;
switch (type_) { switch (type_) {
case ZMQ_PUB:
s = new pub_t (this);
break;
case ZMQ_SUB: case ZMQ_SUB:
s = new sub_t (this); s = new sub_t (this);
break; break;
case ZMQ_P2P: case ZMQ_P2P:
case ZMQ_PUB:
case ZMQ_REQ: case ZMQ_REQ:
case ZMQ_REP: case ZMQ_REP:
s = new socket_base_t (this); s = new socket_base_t (this);
......
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../c/zmq.h"
#include "pub.hpp"
#include "err.hpp"
zmq::pub_t::pub_t (class app_thread_t *parent_) :
socket_base_t (parent_)
{
}
zmq::pub_t::~pub_t ()
{
}
int zmq::pub_t::recv (struct zmq_msg_t *msg_, int flags_)
{
errno = EFAULT;
return -1;
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_PUB_INCLUDED__
#define __ZMQ_PUB_INCLUDED__
#include "socket_base.hpp"
namespace zmq
{
class pub_t : public socket_base_t
{
public:
pub_t (class app_thread_t *parent_);
~pub_t ();
// Overloads of API functions from socket_base_t.
int recv (struct zmq_msg_t *msg_, int flags_);
};
}
#endif
...@@ -78,6 +78,18 @@ int zmq::sub_t::setsockopt (int option_, const void *optval_, ...@@ -78,6 +78,18 @@ int zmq::sub_t::setsockopt (int option_, const void *optval_,
return socket_base_t::setsockopt (option_, optval_, optvallen_); return socket_base_t::setsockopt (option_, optval_, optvallen_);
} }
int zmq::sub_t::send (struct zmq_msg_t *msg_, int flags_)
{
errno = EFAULT;
return -1;
}
int zmq::sub_t::flush ()
{
errno = EFAULT;
return -1;
}
int zmq::sub_t::recv (struct zmq_msg_t *msg_, int flags_) int zmq::sub_t::recv (struct zmq_msg_t *msg_, int flags_)
{ {
while (true) { while (true) {
......
...@@ -37,6 +37,8 @@ namespace zmq ...@@ -37,6 +37,8 @@ namespace zmq
// Overloads of API functions from socket_base_t. // Overloads of API functions from socket_base_t.
int setsockopt (int option_, const void *optval_, size_t optvallen_); int setsockopt (int option_, const void *optval_, size_t optvallen_);
int send (struct zmq_msg_t *msg_, int flags_);
int flush ();
int recv (struct zmq_msg_t *msg_, int flags_); int recv (struct zmq_msg_t *msg_, int flags_);
private: private:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment