Unverified Commit a56d36b8 authored by Luca Boccassi's avatar Luca Boccassi Committed by GitHub

Merge pull request #3587 from somdoron/stream_engine_base

problem: ws_engine duplicate code from stream_engine
parents 184e7a55 157b2a2e
...@@ -741,6 +741,7 @@ set(cxx-sources ...@@ -741,6 +741,7 @@ set(cxx-sources
random.cpp random.cpp
raw_encoder.cpp raw_encoder.cpp
raw_decoder.cpp raw_decoder.cpp
raw_engine.cpp
reaper.cpp reaper.cpp
rep.cpp rep.cpp
req.cpp req.cpp
...@@ -753,7 +754,7 @@ set(cxx-sources ...@@ -753,7 +754,7 @@ set(cxx-sources
socks.cpp socks.cpp
socks_connecter.cpp socks_connecter.cpp
stream.cpp stream.cpp
stream_engine.cpp stream_engine_base.cpp
sub.cpp sub.cpp
tcp.cpp tcp.cpp
tcp_address.cpp tcp_address.cpp
...@@ -787,6 +788,7 @@ set(cxx-sources ...@@ -787,6 +788,7 @@ set(cxx-sources
ws_encoder.cpp ws_encoder.cpp
ws_engine.cpp ws_engine.cpp
ws_listener.cpp ws_listener.cpp
zmtp_engine.cpp
# at least for VS, the header files must also be listed # at least for VS, the header files must also be listed
address.hpp address.hpp
array.hpp array.hpp
...@@ -873,6 +875,7 @@ set(cxx-sources ...@@ -873,6 +875,7 @@ set(cxx-sources
random.hpp random.hpp
raw_decoder.hpp raw_decoder.hpp
raw_encoder.hpp raw_encoder.hpp
raw_engine.hpp
reaper.hpp reaper.hpp
rep.hpp rep.hpp
req.hpp req.hpp
...@@ -889,7 +892,7 @@ set(cxx-sources ...@@ -889,7 +892,7 @@ set(cxx-sources
socks_connecter.hpp socks_connecter.hpp
stdint.hpp stdint.hpp
stream.hpp stream.hpp
stream_engine.hpp stream_engine_base.hpp
stream_connecter_base.hpp stream_connecter_base.hpp
stream_connecter_base.cpp stream_connecter_base.cpp
stream_listener_base.hpp stream_listener_base.hpp
...@@ -931,6 +934,7 @@ set(cxx-sources ...@@ -931,6 +934,7 @@ set(cxx-sources
ypipe_conflate.hpp ypipe_conflate.hpp
yqueue.hpp yqueue.hpp
zap_client.hpp zap_client.hpp
zmtp_engine.hpp
) )
if(MINGW) if(MINGW)
......
...@@ -169,6 +169,8 @@ src_libzmq_la_SOURCES = \ ...@@ -169,6 +169,8 @@ src_libzmq_la_SOURCES = \
src/raw_decoder.hpp \ src/raw_decoder.hpp \
src/raw_encoder.cpp \ src/raw_encoder.cpp \
src/raw_encoder.hpp \ src/raw_encoder.hpp \
src/raw_engine.cpp \
src/raw_engine.hpp \
src/reaper.cpp \ src/reaper.cpp \
src/reaper.hpp \ src/reaper.hpp \
src/rep.cpp \ src/rep.cpp \
...@@ -201,8 +203,8 @@ src_libzmq_la_SOURCES = \ ...@@ -201,8 +203,8 @@ src_libzmq_la_SOURCES = \
src/stream_connecter_base.hpp \ src/stream_connecter_base.hpp \
src/stream_listener_base.cpp \ src/stream_listener_base.cpp \
src/stream_listener_base.hpp \ src/stream_listener_base.hpp \
src/stream_engine.cpp \ src/stream_engine_base.cpp \
src/stream_engine.hpp \ src/stream_engine_base.hpp \
src/sub.cpp \ src/sub.cpp \
src/sub.hpp \ src/sub.hpp \
src/tcp.cpp \ src/tcp.cpp \
...@@ -275,6 +277,8 @@ src_libzmq_la_SOURCES = \ ...@@ -275,6 +277,8 @@ src_libzmq_la_SOURCES = \
src/socket_poller.hpp \ src/socket_poller.hpp \
src/zap_client.cpp \ src/zap_client.cpp \
src/zap_client.hpp \ src/zap_client.hpp \
src/zmtp_engine.cpp \
src/zmtp_engine.hpp \
src/zmq_draft.h \ src/zmq_draft.h \
external/sha1/sha1.c \ external/sha1/sha1.c \
external/sha1/sha1.h external/sha1/sha1.h
......
...@@ -40,6 +40,13 @@ class io_thread_t; ...@@ -40,6 +40,13 @@ class io_thread_t;
struct i_engine struct i_engine
{ {
enum error_reason_t
{
protocol_error,
connection_error,
timeout_error
};
virtual ~i_engine () {} virtual ~i_engine () {}
// Plug the engine to the session. // Plug the engine to the session.
......
...@@ -36,7 +36,6 @@ ...@@ -36,7 +36,6 @@
#include <new> #include <new>
#include <string> #include <string>
#include "stream_engine.hpp"
#include "io_thread.hpp" #include "io_thread.hpp"
#include "random.hpp" #include "random.hpp"
#include "err.hpp" #include "err.hpp"
......
/*
Copyright (c) 2007-2019 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "precompiled.hpp"
#include "macros.hpp"
#include <limits.h>
#include <string.h>
#ifndef ZMQ_HAVE_WINDOWS
#include <unistd.h>
#endif
#include <new>
#include <sstream>
#include "raw_engine.hpp"
#include "io_thread.hpp"
#include "session_base.hpp"
#include "v1_encoder.hpp"
#include "v1_decoder.hpp"
#include "v2_encoder.hpp"
#include "v2_decoder.hpp"
#include "null_mechanism.hpp"
#include "plain_client.hpp"
#include "plain_server.hpp"
#include "gssapi_client.hpp"
#include "gssapi_server.hpp"
#include "curve_client.hpp"
#include "curve_server.hpp"
#include "raw_decoder.hpp"
#include "raw_encoder.hpp"
#include "config.hpp"
#include "err.hpp"
#include "ip.hpp"
#include "tcp.hpp"
#include "likely.hpp"
#include "wire.hpp"
zmq::raw_engine_t::raw_engine_t (
fd_t fd_,
const options_t &options_,
const endpoint_uri_pair_t &endpoint_uri_pair_) :
stream_engine_base_t (fd_, options_, endpoint_uri_pair_)
{
}
zmq::raw_engine_t::~raw_engine_t ()
{
}
void zmq::raw_engine_t::plug_internal ()
{
// no handshaking for raw sock, instantiate raw encoder and decoders
_encoder = new (std::nothrow) raw_encoder_t (_options.out_batch_size);
alloc_assert (_encoder);
_decoder = new (std::nothrow) raw_decoder_t (_options.in_batch_size);
alloc_assert (_decoder);
_next_msg = &raw_engine_t::pull_msg_from_session;
_process_msg = static_cast<int (stream_engine_base_t::*) (msg_t *)> (
&raw_engine_t::push_raw_msg_to_session);
properties_t properties;
if (init_properties (properties)) {
// Compile metadata.
zmq_assert (_metadata == NULL);
_metadata = new (std::nothrow) metadata_t (properties);
alloc_assert (_metadata);
}
if (_options.raw_notify) {
// For raw sockets, send an initial 0-length message to the
// application so that it knows a peer has connected.
msg_t connector;
connector.init ();
push_raw_msg_to_session (&connector);
connector.close ();
session ()->flush ();
}
set_pollin ();
set_pollout ();
// Flush all the data that may have been already received downstream.
in_event ();
}
bool zmq::raw_engine_t::handshake ()
{
return true;
}
void zmq::raw_engine_t::error (error_reason_t reason_)
{
if (_options.raw_socket && _options.raw_notify) {
// For raw sockets, send a final 0-length message to the application
// so that it knows the peer has been disconnected.
msg_t terminator;
terminator.init ();
(this->*_process_msg) (&terminator);
terminator.close ();
}
stream_engine_base_t::error (reason_);
}
int zmq::raw_engine_t::push_raw_msg_to_session (msg_t *msg_)
{
if (_metadata && _metadata != msg_->metadata ())
msg_->set_metadata (_metadata);
return push_msg_to_session (msg_);
}
/*
Copyright (c) 2007-2019 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_RAW_ENGINE_HPP_INCLUDED__
#define __ZMQ_RAW_ENGINE_HPP_INCLUDED__
#include <stddef.h>
#include "fd.hpp"
#include "i_engine.hpp"
#include "io_object.hpp"
#include "i_encoder.hpp"
#include "i_decoder.hpp"
#include "options.hpp"
#include "socket_base.hpp"
#include "metadata.hpp"
#include "msg.hpp"
#include "stream_engine_base.hpp"
namespace zmq
{
// Protocol revisions
class io_thread_t;
class session_base_t;
class mechanism_t;
// This engine handles any socket with SOCK_STREAM semantics,
// e.g. TCP socket or an UNIX domain socket.
class raw_engine_t : public stream_engine_base_t
{
public:
raw_engine_t (fd_t fd_,
const options_t &options_,
const endpoint_uri_pair_t &endpoint_uri_pair_);
~raw_engine_t ();
protected:
void error (error_reason_t reason_);
void plug_internal ();
bool handshake ();
private:
int push_raw_msg_to_session (msg_t *msg_);
raw_engine_t (const raw_engine_t &);
const raw_engine_t &operator= (const raw_engine_t &);
};
}
#endif
...@@ -425,8 +425,7 @@ void zmq::session_base_t::process_attach (i_engine *engine_) ...@@ -425,8 +425,7 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
_engine->plug (_io_thread, this); _engine->plug (_io_thread, this);
} }
void zmq::session_base_t::engine_error ( void zmq::session_base_t::engine_error (zmq::i_engine::error_reason_t reason_)
zmq::stream_engine_t::error_reason_t reason_)
{ {
// Engine is dead. Let's forget about it. // Engine is dead. Let's forget about it.
_engine = NULL; _engine = NULL;
...@@ -435,20 +434,20 @@ void zmq::session_base_t::engine_error ( ...@@ -435,20 +434,20 @@ void zmq::session_base_t::engine_error (
if (_pipe) if (_pipe)
clean_pipes (); clean_pipes ();
zmq_assert (reason_ == stream_engine_t::connection_error zmq_assert (reason_ == i_engine::connection_error
|| reason_ == stream_engine_t::timeout_error || reason_ == i_engine::timeout_error
|| reason_ == stream_engine_t::protocol_error); || reason_ == i_engine::protocol_error);
switch (reason_) { switch (reason_) {
case stream_engine_t::timeout_error: case i_engine::timeout_error:
/* FALLTHROUGH */ /* FALLTHROUGH */
case stream_engine_t::connection_error: case i_engine::connection_error:
if (_active) { if (_active) {
reconnect (); reconnect ();
break; break;
} }
/* FALLTHROUGH */ /* FALLTHROUGH */
case stream_engine_t::protocol_error: case i_engine::protocol_error:
if (_pending) { if (_pending) {
if (_pipe) if (_pipe)
_pipe->terminate (false); _pipe->terminate (false);
......
...@@ -36,7 +36,8 @@ ...@@ -36,7 +36,8 @@
#include "io_object.hpp" #include "io_object.hpp"
#include "pipe.hpp" #include "pipe.hpp"
#include "socket_base.hpp" #include "socket_base.hpp"
#include "stream_engine.hpp" #include "i_engine.hpp"
#include "msg.hpp"
namespace zmq namespace zmq
{ {
...@@ -61,7 +62,7 @@ class session_base_t : public own_t, public io_object_t, public i_pipe_events ...@@ -61,7 +62,7 @@ class session_base_t : public own_t, public io_object_t, public i_pipe_events
virtual void reset (); virtual void reset ();
void flush (); void flush ();
void rollback (); void rollback ();
void engine_error (zmq::stream_engine_t::error_reason_t reason_); void engine_error (zmq::i_engine::error_reason_t reason_);
// i_pipe_events interface implementation. // i_pipe_events interface implementation.
void read_activated (zmq::pipe_t *pipe_); void read_activated (zmq::pipe_t *pipe_);
......
...@@ -33,7 +33,6 @@ ...@@ -33,7 +33,6 @@
#include "macros.hpp" #include "macros.hpp"
#include "socks_connecter.hpp" #include "socks_connecter.hpp"
#include "stream_engine.hpp"
#include "random.hpp" #include "random.hpp"
#include "err.hpp" #include "err.hpp"
#include "ip.hpp" #include "ip.hpp"
......
...@@ -32,6 +32,8 @@ ...@@ -32,6 +32,8 @@
#include "session_base.hpp" #include "session_base.hpp"
#include "address.hpp" #include "address.hpp"
#include "random.hpp" #include "random.hpp"
#include "zmtp_engine.hpp"
#include "raw_engine.hpp"
#ifndef ZMQ_HAVE_WINDOWS #ifndef ZMQ_HAVE_WINDOWS
#include <unistd.h> #include <unistd.h>
...@@ -173,8 +175,11 @@ void zmq::stream_connecter_base_t::create_engine ( ...@@ -173,8 +175,11 @@ void zmq::stream_connecter_base_t::create_engine (
endpoint_type_connect); endpoint_type_connect);
// Create the engine object for this connection. // Create the engine object for this connection.
stream_engine_t *engine = i_engine *engine;
new (std::nothrow) stream_engine_t (fd, options, endpoint_pair); if (options.raw_socket)
engine = new (std::nothrow) raw_engine_t (fd, options, endpoint_pair);
else
engine = new (std::nothrow) zmtp_engine_t (fd, options, endpoint_pair);
alloc_assert (engine); alloc_assert (engine);
// Attach the engine to the corresponding session object. // Attach the engine to the corresponding session object.
......
...@@ -27,8 +27,8 @@ ...@@ -27,8 +27,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef __ZMQ_STREAM_ENGINE_HPP_INCLUDED__ #ifndef __ZMQ_STREAM_ENGINE_BASE_HPP_INCLUDED__
#define __ZMQ_STREAM_ENGINE_HPP_INCLUDED__ #define __ZMQ_STREAM_ENGINE_BASE_HPP_INCLUDED__
#include <stddef.h> #include <stddef.h>
...@@ -41,16 +41,10 @@ ...@@ -41,16 +41,10 @@
#include "socket_base.hpp" #include "socket_base.hpp"
#include "metadata.hpp" #include "metadata.hpp"
#include "msg.hpp" #include "msg.hpp"
#include "tcp.hpp"
namespace zmq namespace zmq
{ {
// Protocol revisions
enum
{
ZMTP_1_0 = 0,
ZMTP_2_0 = 1
};
class io_thread_t; class io_thread_t;
class session_base_t; class session_base_t;
class mechanism_t; class mechanism_t;
...@@ -58,20 +52,13 @@ class mechanism_t; ...@@ -58,20 +52,13 @@ class mechanism_t;
// This engine handles any socket with SOCK_STREAM semantics, // This engine handles any socket with SOCK_STREAM semantics,
// e.g. TCP socket or an UNIX domain socket. // e.g. TCP socket or an UNIX domain socket.
class stream_engine_t : public io_object_t, public i_engine class stream_engine_base_t : public io_object_t, public i_engine
{ {
public: public:
enum error_reason_t stream_engine_base_t (fd_t fd_,
{ const options_t &options_,
protocol_error, const endpoint_uri_pair_t &endpoint_uri_pair_);
connection_error, ~stream_engine_base_t ();
timeout_error
};
stream_engine_t (fd_t fd_,
const options_t &options_,
const endpoint_uri_pair_t &endpoint_uri_pair_);
~stream_engine_t ();
// i_engine interface implementation. // i_engine interface implementation.
void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_); void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_);
...@@ -86,33 +73,12 @@ class stream_engine_t : public io_object_t, public i_engine ...@@ -86,33 +73,12 @@ class stream_engine_t : public io_object_t, public i_engine
void out_event (); void out_event ();
void timer_event (int id_); void timer_event (int id_);
private: protected:
bool in_event_internal (); typedef metadata_t::dict_t properties_t;
bool init_properties (properties_t &properties_);
// Unplug the engine from the session.
void unplug ();
// Function to handle network disconnections. // Function to handle network disconnections.
void error (error_reason_t reason_); virtual void error (error_reason_t reason_);
// Detects the protocol used by the peer.
bool handshake ();
// Receive the greeting from the peer.
int receive_greeting ();
void receive_greeting_versioned ();
typedef bool (stream_engine_t::*handshake_fun_t) ();
static handshake_fun_t select_handshake_fun (bool unversioned,
unsigned char revision);
bool handshake_v1_0_unversioned ();
bool handshake_v1_0 ();
bool handshake_v2_0 ();
bool handshake_v3_0 ();
int routing_id_msg (msg_t *msg_);
int process_routing_id_msg (msg_t *msg_);
int next_handshake_command (msg_t *msg_); int next_handshake_command (msg_t *msg_);
int process_handshake_command (msg_t *msg_); int process_handshake_command (msg_t *msg_);
...@@ -120,38 +86,26 @@ class stream_engine_t : public io_object_t, public i_engine ...@@ -120,38 +86,26 @@ class stream_engine_t : public io_object_t, public i_engine
int pull_msg_from_session (msg_t *msg_); int pull_msg_from_session (msg_t *msg_);
int push_msg_to_session (msg_t *msg_); int push_msg_to_session (msg_t *msg_);
int push_raw_msg_to_session (msg_t *msg_);
int write_credential (msg_t *msg_);
int pull_and_encode (msg_t *msg_); int pull_and_encode (msg_t *msg_);
int decode_and_push (msg_t *msg_); int decode_and_push (msg_t *msg_);
int push_one_then_decode_and_push (msg_t *msg_);
void mechanism_ready ();
size_t add_property (unsigned char *ptr_,
const char *name_,
const void *value_,
size_t value_len_);
void set_handshake_timer (); void set_handshake_timer ();
int tcp_read (void *data_, size_t size_);
typedef metadata_t::dict_t properties_t; virtual bool handshake () { return true; };
bool init_properties (properties_t &properties_); virtual void plug_internal (){};
int process_command_message (msg_t *msg_); virtual int process_command_message (msg_t *msg_) { return -1; };
int produce_ping_message (msg_t *msg_); virtual int produce_ping_message (msg_t *msg_) { return -1; };
int process_heartbeat_message (msg_t *msg_); virtual int process_heartbeat_message (msg_t *msg_) { return -1; };
int produce_pong_message (msg_t *msg_); virtual int produce_pong_message (msg_t *msg_) { return -1; };
// Underlying socket. void set_pollout () { io_object_t::set_pollout (_handle); }
fd_t _s; void set_pollin () { io_object_t::set_pollin (_handle); }
session_base_t *session () { return _session; }
msg_t _tx_msg; socket_base_t *socket () { return _socket; }
// Need to store PING payload for PONG
msg_t _pong_msg;
handle_t _handle; const options_t _options;
unsigned char *_inpos; unsigned char *_inpos;
size_t _insize; size_t _insize;
...@@ -161,54 +115,13 @@ class stream_engine_t : public io_object_t, public i_engine ...@@ -161,54 +115,13 @@ class stream_engine_t : public io_object_t, public i_engine
size_t _outsize; size_t _outsize;
i_encoder *_encoder; i_encoder *_encoder;
// Metadata to be attached to received messages. May be NULL. mechanism_t *_mechanism;
metadata_t *_metadata;
// When true, we are still trying to determine whether
// the peer is using versioned protocol, and if so, which
// version. When false, normal message flow has started.
bool _handshaking;
static const size_t signature_size = 10;
// Size of ZMTP/1.0 and ZMTP/2.0 greeting message
static const size_t v2_greeting_size = 12;
// Size of ZMTP/3.0 greeting message
static const size_t v3_greeting_size = 64;
// Expected greeting size.
size_t _greeting_size;
// Greeting received from, and sent to peer
unsigned char _greeting_recv[v3_greeting_size];
unsigned char _greeting_send[v3_greeting_size];
// Size of greeting received so far
unsigned int _greeting_bytes_read;
// The session this engine is attached to.
zmq::session_base_t *_session;
const options_t _options;
// Representation of the connected endpoints.
const endpoint_uri_pair_t _endpoint_uri_pair;
bool _plugged;
int (stream_engine_t::*_next_msg) (msg_t *msg_);
int (stream_engine_t::*_process_msg) (msg_t *msg_);
bool _io_error;
// Indicates whether the engine is to inject a phantom int (stream_engine_base_t::*_next_msg) (msg_t *msg_);
// subscription message into the incoming stream. int (stream_engine_base_t::*_process_msg) (msg_t *msg_);
// Needed to support old peers.
bool _subscription_required;
mechanism_t *_mechanism; // Metadata to be attached to received messages. May be NULL.
metadata_t *_metadata;
// True iff the engine couldn't consume the last decoded message. // True iff the engine couldn't consume the last decoded message.
bool _input_stopped; bool _input_stopped;
...@@ -216,6 +129,9 @@ class stream_engine_t : public io_object_t, public i_engine ...@@ -216,6 +129,9 @@ class stream_engine_t : public io_object_t, public i_engine
// True iff the engine doesn't have any message to encode. // True iff the engine doesn't have any message to encode.
bool _output_stopped; bool _output_stopped;
// Representation of the connected endpoints.
const endpoint_uri_pair_t _endpoint_uri_pair;
// ID of the handshake timer // ID of the handshake timer
enum enum
{ {
...@@ -235,15 +151,45 @@ class stream_engine_t : public io_object_t, public i_engine ...@@ -235,15 +151,45 @@ class stream_engine_t : public io_object_t, public i_engine
bool _has_ttl_timer; bool _has_ttl_timer;
bool _has_timeout_timer; bool _has_timeout_timer;
bool _has_heartbeat_timer; bool _has_heartbeat_timer;
int _heartbeat_timeout;
// Socket
zmq::socket_base_t *_socket;
const std::string _peer_address; const std::string _peer_address;
stream_engine_t (const stream_engine_t &); private:
const stream_engine_t &operator= (const stream_engine_t &); bool in_event_internal ();
// Unplug the engine from the session.
void unplug ();
int write_credential (msg_t *msg_);
int push_one_then_decode_and_push (msg_t *msg_);
void mechanism_ready ();
// Underlying socket.
fd_t _s;
handle_t _handle;
bool _plugged;
// When true, we are still trying to determine whether
// the peer is using versioned protocol, and if so, which
// version. When false, normal message flow has started.
bool _handshaking;
msg_t _tx_msg;
bool _io_error;
// The session this engine is attached to.
zmq::session_base_t *_session;
// Socket
zmq::socket_base_t *_socket;
stream_engine_base_t (const stream_engine_base_t &);
const stream_engine_base_t &operator= (const stream_engine_base_t &);
}; };
} }
......
...@@ -31,7 +31,8 @@ ...@@ -31,7 +31,8 @@
#include "stream_listener_base.hpp" #include "stream_listener_base.hpp"
#include "session_base.hpp" #include "session_base.hpp"
#include "socket_base.hpp" #include "socket_base.hpp"
#include "stream_engine.hpp" #include "zmtp_engine.hpp"
#include "raw_engine.hpp"
#ifndef ZMQ_HAVE_WINDOWS #ifndef ZMQ_HAVE_WINDOWS
#include <unistd.h> #include <unistd.h>
...@@ -102,8 +103,11 @@ void zmq::stream_listener_base_t::create_engine (fd_t fd) ...@@ -102,8 +103,11 @@ void zmq::stream_listener_base_t::create_engine (fd_t fd)
get_socket_name (fd, socket_end_local), get_socket_name (fd, socket_end_local),
get_socket_name (fd, socket_end_remote), endpoint_type_bind); get_socket_name (fd, socket_end_remote), endpoint_type_bind);
stream_engine_t *engine = i_engine *engine;
new (std::nothrow) stream_engine_t (fd, options, endpoint_pair); if (options.raw_socket)
engine = new (std::nothrow) raw_engine_t (fd, options, endpoint_pair);
else
engine = new (std::nothrow) zmtp_engine_t (fd, options, endpoint_pair);
alloc_assert (engine); alloc_assert (engine);
// Choose I/O thread to run connecter in. Given that we are already // Choose I/O thread to run connecter in. Given that we are already
......
...@@ -33,7 +33,6 @@ ...@@ -33,7 +33,6 @@
#include "macros.hpp" #include "macros.hpp"
#include "tcp_connecter.hpp" #include "tcp_connecter.hpp"
#include "stream_engine.hpp"
#include "io_thread.hpp" #include "io_thread.hpp"
#include "err.hpp" #include "err.hpp"
#include "ip.hpp" #include "ip.hpp"
......
...@@ -36,7 +36,6 @@ ...@@ -36,7 +36,6 @@
#include <new> #include <new>
#include <string> #include <string>
#include "stream_engine.hpp"
#include "io_thread.hpp" #include "io_thread.hpp"
#include "platform.hpp" #include "platform.hpp"
#include "random.hpp" #include "random.hpp"
......
...@@ -33,7 +33,6 @@ ...@@ -33,7 +33,6 @@
#include "macros.hpp" #include "macros.hpp"
#include "ws_connecter.hpp" #include "ws_connecter.hpp"
#include "stream_engine.hpp"
#include "io_thread.hpp" #include "io_thread.hpp"
#include "err.hpp" #include "err.hpp"
#include "ip.hpp" #include "ip.hpp"
......
This diff is collapsed.
...@@ -31,10 +31,10 @@ ...@@ -31,10 +31,10 @@
#define __ZMQ_WS_ENGINE_HPP_INCLUDED__ #define __ZMQ_WS_ENGINE_HPP_INCLUDED__
#include "io_object.hpp" #include "io_object.hpp"
#include "i_engine.hpp"
#include "address.hpp" #include "address.hpp"
#include "msg.hpp" #include "msg.hpp"
#include "stream_engine.hpp" #include "stream_engine_base.hpp"
#define WS_BUFFER_SIZE 8192 #define WS_BUFFER_SIZE 8192
#define MAX_HEADER_NAME_LENGTH 1024 #define MAX_HEADER_NAME_LENGTH 1024
...@@ -124,7 +124,7 @@ typedef enum ...@@ -124,7 +124,7 @@ typedef enum
client_handshake_error = -1 client_handshake_error = -1
} ws_client_handshake_state_t; } ws_client_handshake_state_t;
class ws_engine_t : public io_object_t, public i_engine class ws_engine_t : public stream_engine_base_t
{ {
public: public:
ws_engine_t (fd_t fd_, ws_engine_t (fd_t fd_,
...@@ -133,49 +133,19 @@ class ws_engine_t : public io_object_t, public i_engine ...@@ -133,49 +133,19 @@ class ws_engine_t : public io_object_t, public i_engine
bool client_); bool client_);
~ws_engine_t (); ~ws_engine_t ();
// i_engine interface implementation. protected:
// Plug the engine to the session. bool handshake ();
void plug (zmq::io_thread_t *io_thread_, class session_base_t *session_); void plug_internal ();
// Terminate and deallocate the engine. Note that 'detached'
// events are not fired on termination.
void terminate ();
// This method is called by the session to signalise that more
// messages can be written to the pipe.
bool restart_input ();
// This method is called by the session to signalise that there
// are messages to send available.
void restart_output ();
void zap_msg_available (){};
void in_event ();
void out_event ();
const endpoint_uri_pair_t &get_endpoint () const;
private: private:
int routing_id_msg (msg_t *msg_);
int process_routing_id_msg (msg_t *msg_);
bool client_handshake (); bool client_handshake ();
bool server_handshake (); bool server_handshake ();
void error (zmq::stream_engine_t::error_reason_t reason_);
void unplug ();
bool _client; bool _client;
bool _plugged;
socket_base_t *_socket;
fd_t _fd;
session_base_t *_session;
handle_t _handle;
options_t _options;
// Representation of the connected endpoints.
const endpoint_uri_pair_t _endpoint_uri_pair;
bool _handshaking;
ws_client_handshake_state_t _client_handshake_state; ws_client_handshake_state_t _client_handshake_state;
ws_server_handshake_state_t _server_handshake_state; ws_server_handshake_state_t _server_handshake_state;
...@@ -191,21 +161,6 @@ class ws_engine_t : public io_object_t, public i_engine ...@@ -191,21 +161,6 @@ class ws_engine_t : public io_object_t, public i_engine
bool _websocket_protocol; bool _websocket_protocol;
char _websocket_key[MAX_HEADER_VALUE_LENGTH + 1]; char _websocket_key[MAX_HEADER_VALUE_LENGTH + 1];
char _websocket_accept[MAX_HEADER_VALUE_LENGTH + 1]; char _websocket_accept[MAX_HEADER_VALUE_LENGTH + 1];
bool _input_stopped;
i_decoder *_decoder;
unsigned char *_inpos;
size_t _insize;
bool _output_stopped;
unsigned char *_outpos;
size_t _outsize;
i_encoder *_encoder;
bool _sent_routing_id;
bool _received_routing_id;
msg_t _tx_msg;
}; };
} }
......
This diff is collapsed.
/*
Copyright (c) 2007-2019 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_ZMTP_ENGINE_HPP_INCLUDED__
#define __ZMQ_ZMTP_ENGINE_HPP_INCLUDED__
#include <stddef.h>
#include "fd.hpp"
#include "i_engine.hpp"
#include "io_object.hpp"
#include "i_encoder.hpp"
#include "i_decoder.hpp"
#include "options.hpp"
#include "socket_base.hpp"
#include "metadata.hpp"
#include "msg.hpp"
#include "stream_engine_base.hpp"
namespace zmq
{
// Protocol revisions
enum
{
ZMTP_1_0 = 0,
ZMTP_2_0 = 1
};
class io_thread_t;
class session_base_t;
class mechanism_t;
// This engine handles any socket with SOCK_STREAM semantics,
// e.g. TCP socket or an UNIX domain socket.
class zmtp_engine_t : public stream_engine_base_t
{
public:
zmtp_engine_t (fd_t fd_,
const options_t &options_,
const endpoint_uri_pair_t &endpoint_uri_pair_);
~zmtp_engine_t ();
protected:
// Detects the protocol used by the peer.
bool handshake ();
void plug_internal ();
int process_command_message (msg_t *msg_);
int produce_ping_message (msg_t *msg_);
int process_heartbeat_message (msg_t *msg_);
int produce_pong_message (msg_t *msg_);
private:
// Receive the greeting from the peer.
int receive_greeting ();
void receive_greeting_versioned ();
typedef bool (zmtp_engine_t::*handshake_fun_t) ();
static handshake_fun_t select_handshake_fun (bool unversioned,
unsigned char revision);
bool handshake_v1_0_unversioned ();
bool handshake_v1_0 ();
bool handshake_v2_0 ();
bool handshake_v3_0 ();
int routing_id_msg (msg_t *msg_);
int process_routing_id_msg (msg_t *msg_);
msg_t _routing_id_msg;
// Need to store PING payload for PONG
msg_t _pong_msg;
static const size_t signature_size = 10;
// Size of ZMTP/1.0 and ZMTP/2.0 greeting message
static const size_t v2_greeting_size = 12;
// Size of ZMTP/3.0 greeting message
static const size_t v3_greeting_size = 64;
// Expected greeting size.
size_t _greeting_size;
// Greeting received from, and sent to peer
unsigned char _greeting_recv[v3_greeting_size];
unsigned char _greeting_send[v3_greeting_size];
// Size of greeting received so far
unsigned int _greeting_bytes_read;
// Indicates whether the engine is to inject a phantom
// subscription message into the incoming stream.
// Needed to support old peers.
bool _subscription_required;
int _heartbeat_timeout;
zmtp_engine_t (const zmtp_engine_t &);
const zmtp_engine_t &operator= (const zmtp_engine_t &);
};
}
#endif
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment