Commit 862cd41c authored by somdoron's avatar somdoron

implement client socket type and drop messages when more flag is set on client and server

parent 5632b57b
...@@ -26,6 +26,8 @@ src_libzmq_la_SOURCES = \ ...@@ -26,6 +26,8 @@ src_libzmq_la_SOURCES = \
src/atomic_counter.hpp \ src/atomic_counter.hpp \
src/atomic_ptr.hpp \ src/atomic_ptr.hpp \
src/blob.hpp \ src/blob.hpp \
src/client.cpp \
src/client.hpp \
src/clock.cpp \ src/clock.cpp \
src/clock.hpp \ src/clock.hpp \
src/command.hpp \ src/command.hpp \
...@@ -341,7 +343,7 @@ test_apps = \ ...@@ -341,7 +343,7 @@ test_apps = \
tests/test_xpub_manual \ tests/test_xpub_manual \
tests/test_xpub_welcome_msg \ tests/test_xpub_welcome_msg \
tests/test_atomics \ tests/test_atomics \
tests/test_server tests/test_client_server
tests_test_system_SOURCES = tests/test_system.cpp tests_test_system_SOURCES = tests/test_system.cpp
tests_test_system_LDADD = src/libzmq.la tests_test_system_LDADD = src/libzmq.la
...@@ -513,8 +515,8 @@ tests_test_xpub_welcome_msg_LDADD = src/libzmq.la ...@@ -513,8 +515,8 @@ tests_test_xpub_welcome_msg_LDADD = src/libzmq.la
tests_test_atomics_SOURCES = tests/test_atomics.cpp tests_test_atomics_SOURCES = tests/test_atomics.cpp
tests_test_atomics_LDADD = src/libzmq.la tests_test_atomics_LDADD = src/libzmq.la
tests_test_server_SOURCES = tests/test_server.cpp tests_test_client_server_SOURCES = tests/test_client_server.cpp
tests_test_server_LDADD = src/libzmq.la tests_test_client_server_LDADD = src/libzmq.la
if !ON_MINGW if !ON_MINGW
if !ON_CYGWIN if !ON_CYGWIN
......
...@@ -239,6 +239,7 @@ ZMQ_EXPORT uint32_t zmq_msg_get_routing_id(zmq_msg_t *msg); ...@@ -239,6 +239,7 @@ ZMQ_EXPORT uint32_t zmq_msg_get_routing_id(zmq_msg_t *msg);
#define ZMQ_XSUB 10 #define ZMQ_XSUB 10
#define ZMQ_STREAM 11 #define ZMQ_STREAM 11
#define ZMQ_SERVER 12 #define ZMQ_SERVER 12
#define ZMQ_CLIENT 13
/* Deprecated aliases */ /* Deprecated aliases */
#define ZMQ_XREQ ZMQ_DEALER #define ZMQ_XREQ ZMQ_DEALER
......
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "client.hpp"
#include "err.hpp"
#include "msg.hpp"
zmq::client_t::client_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_)
{
options.type = ZMQ_CLIENT;
}
zmq::client_t::~client_t ()
{
}
void zmq::client_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
{
// subscribe_to_all_ is unused
(void) subscribe_to_all_;
zmq_assert (pipe_);
fq.attach (pipe_);
lb.attach (pipe_);
}
int zmq::client_t::xsend (msg_t *msg_)
{
zmq_assert(!(msg_->flags () & msg_t::more));
return lb.sendpipe (msg_, NULL);
}
int zmq::client_t::xrecv (msg_t *msg_)
{
int rc = fq.recvpipe (msg_, NULL);
// Drop any messages with more flag
while (rc == 0 && msg_->flags () & msg_t::more) {
// drop all frames of the current multi-frame message
rc = fq.recvpipe (msg_, NULL);
while (rc == 0 && msg_->flags () & msg_t::more)
rc = fq.recvpipe (msg_, NULL);
// get the new message
if (rc == 0)
rc = fq.recvpipe (msg_, NULL);
}
return rc;
}
bool zmq::client_t::xhas_in ()
{
return fq.has_in ();
}
bool zmq::client_t::xhas_out ()
{
return lb.has_out ();
}
zmq::blob_t zmq::client_t::get_credential () const
{
return fq.get_credential ();
}
void zmq::client_t::xread_activated (pipe_t *pipe_)
{
fq.activated (pipe_);
}
void zmq::client_t::xwrite_activated (pipe_t *pipe_)
{
lb.activated (pipe_);
}
void zmq::client_t::xpipe_terminated (pipe_t *pipe_)
{
fq.pipe_terminated (pipe_);
lb.pipe_terminated (pipe_);
}
\ No newline at end of file
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_CLIENT_HPP_INCLUDED__
#define __ZMQ_CLIENT_HPP_INCLUDED__
#include "socket_base.hpp"
#include "session_base.hpp"
#include "fq.hpp"
#include "lb.hpp"
namespace zmq
{
class ctx_t;
class msg_t;
class pipe_t;
class io_thread_t;
class socket_base_t;
class client_t :
public socket_base_t
{
public:
client_t (zmq::ctx_t *parent_, uint32_t tid_, int sid);
~client_t ();
protected:
// Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_);
int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_);
bool xhas_in ();
bool xhas_out ();
blob_t get_credential () const;
void xread_activated (zmq::pipe_t *pipe_);
void xwrite_activated (zmq::pipe_t *pipe_);
void xpipe_terminated (zmq::pipe_t *pipe_);
private:
// Messages are fair-queued from inbound pipes. And load-balanced to
// the outbound pipes.
fq_t fq;
lb_t lb;
client_t (const client_t &);
const client_t &operator = (const client_t&);
};
}
#endif
...@@ -64,8 +64,8 @@ const char *zmq::mechanism_t::socket_type_string (int socket_type) const ...@@ -64,8 +64,8 @@ const char *zmq::mechanism_t::socket_type_string (int socket_type) const
{ {
static const char *names [] = {"PAIR", "PUB", "SUB", "REQ", "REP", static const char *names [] = {"PAIR", "PUB", "SUB", "REQ", "REP",
"DEALER", "ROUTER", "PULL", "PUSH", "DEALER", "ROUTER", "PULL", "PUSH",
"XPUB", "XSUB", "STREAM", "SERVER"}; "XPUB", "XSUB", "STREAM", "SERVER", "CLIENT"};
zmq_assert (socket_type >= 0 && socket_type <= 12); zmq_assert (socket_type >= 0 && socket_type <= 13);
return names [socket_type]; return names [socket_type];
} }
...@@ -160,7 +160,7 @@ bool zmq::mechanism_t::check_socket_type (const std::string& type_) const ...@@ -160,7 +160,7 @@ bool zmq::mechanism_t::check_socket_type (const std::string& type_) const
case ZMQ_REP: case ZMQ_REP:
return type_ == "REQ" || type_ == "DEALER"; return type_ == "REQ" || type_ == "DEALER";
case ZMQ_DEALER: case ZMQ_DEALER:
return type_ == "REP" || type_ == "DEALER" || type_ == "ROUTER" || type_ == "SERVER"; return type_ == "REP" || type_ == "DEALER" || type_ == "ROUTER";
case ZMQ_ROUTER: case ZMQ_ROUTER:
return type_ == "REQ" || type_ == "DEALER" || type_ == "ROUTER"; return type_ == "REQ" || type_ == "DEALER" || type_ == "ROUTER";
case ZMQ_PUSH: case ZMQ_PUSH:
...@@ -178,7 +178,9 @@ bool zmq::mechanism_t::check_socket_type (const std::string& type_) const ...@@ -178,7 +178,9 @@ bool zmq::mechanism_t::check_socket_type (const std::string& type_) const
case ZMQ_PAIR: case ZMQ_PAIR:
return type_ == "PAIR"; return type_ == "PAIR";
case ZMQ_SERVER: case ZMQ_SERVER:
return type_ == "DEALER"; return type_ == "CLIENT";
case ZMQ_CLIENT:
return type_ == "CLIENT" || type_ == "SERVER";
default: default:
break; break;
} }
......
...@@ -53,14 +53,6 @@ void zmq::server_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) ...@@ -53,14 +53,6 @@ void zmq::server_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
fq.attach (pipe_); fq.attach (pipe_);
} }
int zmq::server_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
errno = EINVAL;
return -1;
}
void zmq::server_t::xpipe_terminated (pipe_t *pipe_) void zmq::server_t::xpipe_terminated (pipe_t *pipe_)
{ {
outpipes_t::iterator it = outpipes.find (pipe_->get_routing_id ()); outpipes_t::iterator it = outpipes.find (pipe_->get_routing_id ());
...@@ -88,12 +80,8 @@ void zmq::server_t::xwrite_activated (pipe_t *pipe_) ...@@ -88,12 +80,8 @@ void zmq::server_t::xwrite_activated (pipe_t *pipe_)
int zmq::server_t::xsend (msg_t *msg_) int zmq::server_t::xsend (msg_t *msg_)
{ {
// Server doesn't support multipart zmq_assert(!(msg_->flags () & msg_t::more));
if (msg_->flags () & msg_t::more) {
errno = EINVAL;
return -1;
}
// Find the pipe associated with the routing stored in the message. // Find the pipe associated with the routing stored in the message.
uint32_t routing_id = msg_->get_routing_id(); uint32_t routing_id = msg_->get_routing_id();
outpipes_t::iterator it = outpipes.find (routing_id); outpipes_t::iterator it = outpipes.find (routing_id);
...@@ -131,18 +119,24 @@ int zmq::server_t::xrecv (msg_t *msg_) ...@@ -131,18 +119,24 @@ int zmq::server_t::xrecv (msg_t *msg_)
pipe_t *pipe = NULL; pipe_t *pipe = NULL;
int rc = fq.recvpipe (msg_, &pipe); int rc = fq.recvpipe (msg_, &pipe);
if (rc != 0) // Drop any messages with more flag
return -1; while (rc == 0 && msg_->flags () & msg_t::more) {
zmq_assert (pipe != NULL); // drop all frames of the current multi-frame message
rc = fq.recvpipe (msg_, NULL);
while (rc == 0 && msg_->flags () & msg_t::more)
rc = fq.recvpipe (msg_, NULL);
if (msg_->flags () & msg_t::more) { // get the new message
msg_->close(); if (rc == 0)
msg_->init(); rc = fq.recvpipe (msg_, &pipe);
}
errno = EINVAL; if (rc != 0)
return -1; return rc;
}
zmq_assert (pipe != NULL);
uint32_t routing_id = pipe->get_routing_id(); uint32_t routing_id = pipe->get_routing_id();
msg_->set_routing_id(routing_id); msg_->set_routing_id(routing_id);
......
...@@ -45,8 +45,7 @@ namespace zmq ...@@ -45,8 +45,7 @@ namespace zmq
~server_t (); ~server_t ();
// Overrides of functions from socket_base_t. // Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_); void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq::msg_t *msg_); int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_); int xrecv (zmq::msg_t *msg_);
bool xhas_in (); bool xhas_in ();
......
...@@ -56,6 +56,7 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, ...@@ -56,6 +56,7 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
case ZMQ_PAIR: case ZMQ_PAIR:
case ZMQ_STREAM: case ZMQ_STREAM:
case ZMQ_SERVER: case ZMQ_SERVER:
case ZMQ_CLIENT:
s = new (std::nothrow) session_base_t (io_thread_, active_, s = new (std::nothrow) session_base_t (io_thread_, active_,
socket_, options_, addr_); socket_, options_, addr_);
break; break;
......
...@@ -71,6 +71,7 @@ ...@@ -71,6 +71,7 @@
#include "xsub.hpp" #include "xsub.hpp"
#include "stream.hpp" #include "stream.hpp"
#include "server.hpp" #include "server.hpp"
#include "client.hpp"
bool zmq::socket_base_t::check_tag () bool zmq::socket_base_t::check_tag ()
{ {
...@@ -121,6 +122,9 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_, ...@@ -121,6 +122,9 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
case ZMQ_SERVER: case ZMQ_SERVER:
s = new (std::nothrow) server_t (parent_, tid_, sid_); s = new (std::nothrow) server_t (parent_, tid_, sid_);
break; break;
case ZMQ_CLIENT:
s = new (std::nothrow) client_t (parent_, tid_, sid_);
break;
default: default:
errno = EINVAL; errno = EINVAL;
return NULL; return NULL;
......
...@@ -21,14 +21,12 @@ ...@@ -21,14 +21,12 @@
int main (void) int main (void)
{ {
printf("0000");
setup_test_environment(); setup_test_environment();
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
void *server = zmq_socket (ctx, ZMQ_SERVER); void *server = zmq_socket (ctx, ZMQ_SERVER);
void *client = zmq_socket (ctx, ZMQ_DEALER); void *client = zmq_socket (ctx, ZMQ_CLIENT);
int rc; int rc;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment