Unverified Commit 3692b266 authored by Luca Boccassi's avatar Luca Boccassi Committed by GitHub

Merge pull request #3579 from somdoron/zws2

problem: browser cannot communicate with zeromq directly
parents feeed3f2 39941a0c
...@@ -152,6 +152,7 @@ test_socket_null ...@@ -152,6 +152,7 @@ test_socket_null
test_xpub_verbose test_xpub_verbose
test_mock_pub_sub test_mock_pub_sub
test_proxy_hwm test_proxy_hwm
test_ws_transport
unittest_ip_resolver unittest_ip_resolver
unittest_mtrie unittest_mtrie
unittest_poller unittest_poller
......
...@@ -221,6 +221,8 @@ else() ...@@ -221,6 +221,8 @@ else()
message(FATAL_ERROR "Invalid polling method") message(FATAL_ERROR "Invalid polling method")
endif() endif()
list(APPEND sources ${CMAKE_CURRENT_SOURCE_DIR}/external/sha1/sha1.c ${CMAKE_CURRENT_SOURCE_DIR}/external/sha1/sha1.h)
if(POLLER STREQUAL "epoll" AND WIN32) if(POLLER STREQUAL "epoll" AND WIN32)
message(STATUS "Including wepoll") message(STATUS "Including wepoll")
list(APPEND sources ${CMAKE_CURRENT_SOURCE_DIR}/external/wepoll/wepoll.c ${CMAKE_CURRENT_SOURCE_DIR}/external/wepoll/wepoll.h) list(APPEND sources ${CMAKE_CURRENT_SOURCE_DIR}/external/wepoll/wepoll.c ${CMAKE_CURRENT_SOURCE_DIR}/external/wepoll/wepoll.h)
...@@ -780,6 +782,11 @@ set(cxx-sources ...@@ -780,6 +782,11 @@ set(cxx-sources
gather.cpp gather.cpp
ip_resolver.cpp ip_resolver.cpp
zap_client.cpp zap_client.cpp
ws_connecter.cpp
ws_decoder.cpp
ws_encoder.cpp
ws_engine.cpp
ws_listener.cpp
# at least for VS, the header files must also be listed # at least for VS, the header files must also be listed
address.hpp address.hpp
array.hpp array.hpp
...@@ -911,6 +918,12 @@ set(cxx-sources ...@@ -911,6 +918,12 @@ set(cxx-sources
vmci_listener.hpp vmci_listener.hpp
windows.hpp windows.hpp
wire.hpp wire.hpp
ws_connecter.hpp
ws_decoder.hpp
ws_encoder.hpp
ws_engine.hpp
ws_listener.hpp
ws_protocol.hpp
xpub.hpp xpub.hpp
xsub.hpp xsub.hpp
ypipe.hpp ypipe.hpp
......
...@@ -248,6 +248,17 @@ src_libzmq_la_SOURCES = \ ...@@ -248,6 +248,17 @@ src_libzmq_la_SOURCES = \
src/vmci_listener.hpp \ src/vmci_listener.hpp \
src/windows.hpp \ src/windows.hpp \
src/wire.hpp \ src/wire.hpp \
src/ws_connecter.cpp \
src/ws_connecter.hpp \
src/ws_decoder.cpp \
src/ws_decoder.hpp \
src/ws_encoder.cpp \
src/ws_encoder.hpp \
src/ws_engine.cpp \
src/ws_engine.hpp \
src/ws_listener.cpp \
src/ws_listener.hpp \
src/ws_protocol.hpp \
src/xpub.cpp \ src/xpub.cpp \
src/xpub.hpp \ src/xpub.hpp \
src/xsub.cpp \ src/xsub.cpp \
...@@ -264,7 +275,9 @@ src_libzmq_la_SOURCES = \ ...@@ -264,7 +275,9 @@ src_libzmq_la_SOURCES = \
src/socket_poller.hpp \ src/socket_poller.hpp \
src/zap_client.cpp \ src/zap_client.cpp \
src/zap_client.hpp \ src/zap_client.hpp \
src/zmq_draft.h src/zmq_draft.h \
external/sha1/sha1.c \
external/sha1/sha1.h
if USE_WEPOLL if USE_WEPOLL
src_libzmq_la_SOURCES += \ src_libzmq_la_SOURCES += \
...@@ -460,7 +473,8 @@ test_apps = \ ...@@ -460,7 +473,8 @@ test_apps = \
tests/test_sodium \ tests/test_sodium \
tests/test_reconnect_ivl \ tests/test_reconnect_ivl \
tests/test_mock_pub_sub \ tests/test_mock_pub_sub \
tests/test_socket_null tests/test_socket_null \
tests/test_ws_transport
UNITY_CPPFLAGS = -I$(top_srcdir)/external/unity -DUNITY_USE_COMMAND_LINE_ARGS -DUNITY_EXCLUDE_FLOAT UNITY_CPPFLAGS = -I$(top_srcdir)/external/unity -DUNITY_USE_COMMAND_LINE_ARGS -DUNITY_EXCLUDE_FLOAT
UNITY_LIBS = $(top_builddir)/external/unity/libunity.a UNITY_LIBS = $(top_builddir)/external/unity/libunity.a
...@@ -775,6 +789,10 @@ tests_test_mock_pub_sub_SOURCES = tests/test_mock_pub_sub.cpp ...@@ -775,6 +789,10 @@ tests_test_mock_pub_sub_SOURCES = tests/test_mock_pub_sub.cpp
tests_test_mock_pub_sub_LDADD = src/libzmq.la ${TESTUTIL_LIBS} tests_test_mock_pub_sub_LDADD = src/libzmq.la ${TESTUTIL_LIBS}
tests_test_mock_pub_sub_CPPFLAGS = ${TESTUTIL_CPPFLAGS} tests_test_mock_pub_sub_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
tests_test_ws_transport_SOURCES = tests/test_ws_transport.cpp
tests_test_ws_transport_LDADD = src/libzmq.la ${TESTUTIL_LIBS}
tests_test_ws_transport_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
if HAVE_CURVE if HAVE_CURVE
test_apps += \ test_apps += \
......
/*
* Copyright (C) 1995, 1996, 1997, and 1998 WIDE Project.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. Neither the name of the project nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE PROJECT AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE PROJECT OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
/*
* FIPS pub 180-1: Secure Hash Algorithm (SHA-1)
* based on: http://www.itl.nist.gov/fipspubs/fip180-1.htm
* implemented by Jun-ichiro itojun Itoh <itojun@itojun.org>
*/
#include "sha1.h"
#include <string.h>
/* constant table */
static uint32_t _K[] = {0x5a827999, 0x6ed9eba1, 0x8f1bbcdc, 0xca62c1d6};
#define K(t) _K[(t) / 20]
#define F0(b, c, d) (((b) & (c)) | ((~(b)) & (d)))
#define F1(b, c, d) (((b) ^ (c)) ^ (d))
#define F2(b, c, d) (((b) & (c)) | ((b) & (d)) | ((c) & (d)))
#define F3(b, c, d) (((b) ^ (c)) ^ (d))
#define S(n, x) (((x) << (n)) | ((x) >> (32 - (n))))
#define H(n) (ctxt->h.b32[(n)])
#define COUNT (ctxt->count)
#define BCOUNT (ctxt->c.b64[0] / 8)
#define W(n) (ctxt->m.b32[(n)])
#define PUTBYTE(x) \
do { \
ctxt->m.b8[(COUNT % 64)] = (x); \
COUNT++; \
COUNT %= 64; \
ctxt->c.b64[0] += 8; \
if (COUNT % 64 == 0) \
sha1_step(ctxt); \
} while (0)
#define PUTPAD(x) \
do { \
ctxt->m.b8[(COUNT % 64)] = (x); \
COUNT++; \
COUNT %= 64; \
if (COUNT % 64 == 0) \
sha1_step(ctxt); \
} while (0)
static void sha1_step(struct sha1_ctxt *);
static void
sha1_step(struct sha1_ctxt * ctxt)
{
uint32_t a,
b,
c,
d,
e;
size_t t,
s;
uint32_t tmp;
#ifndef WORDS_BIGENDIAN
struct sha1_ctxt tctxt;
memmove(&tctxt.m.b8[0], &ctxt->m.b8[0], 64);
ctxt->m.b8[0] = tctxt.m.b8[3];
ctxt->m.b8[1] = tctxt.m.b8[2];
ctxt->m.b8[2] = tctxt.m.b8[1];
ctxt->m.b8[3] = tctxt.m.b8[0];
ctxt->m.b8[4] = tctxt.m.b8[7];
ctxt->m.b8[5] = tctxt.m.b8[6];
ctxt->m.b8[6] = tctxt.m.b8[5];
ctxt->m.b8[7] = tctxt.m.b8[4];
ctxt->m.b8[8] = tctxt.m.b8[11];
ctxt->m.b8[9] = tctxt.m.b8[10];
ctxt->m.b8[10] = tctxt.m.b8[9];
ctxt->m.b8[11] = tctxt.m.b8[8];
ctxt->m.b8[12] = tctxt.m.b8[15];
ctxt->m.b8[13] = tctxt.m.b8[14];
ctxt->m.b8[14] = tctxt.m.b8[13];
ctxt->m.b8[15] = tctxt.m.b8[12];
ctxt->m.b8[16] = tctxt.m.b8[19];
ctxt->m.b8[17] = tctxt.m.b8[18];
ctxt->m.b8[18] = tctxt.m.b8[17];
ctxt->m.b8[19] = tctxt.m.b8[16];
ctxt->m.b8[20] = tctxt.m.b8[23];
ctxt->m.b8[21] = tctxt.m.b8[22];
ctxt->m.b8[22] = tctxt.m.b8[21];
ctxt->m.b8[23] = tctxt.m.b8[20];
ctxt->m.b8[24] = tctxt.m.b8[27];
ctxt->m.b8[25] = tctxt.m.b8[26];
ctxt->m.b8[26] = tctxt.m.b8[25];
ctxt->m.b8[27] = tctxt.m.b8[24];
ctxt->m.b8[28] = tctxt.m.b8[31];
ctxt->m.b8[29] = tctxt.m.b8[30];
ctxt->m.b8[30] = tctxt.m.b8[29];
ctxt->m.b8[31] = tctxt.m.b8[28];
ctxt->m.b8[32] = tctxt.m.b8[35];
ctxt->m.b8[33] = tctxt.m.b8[34];
ctxt->m.b8[34] = tctxt.m.b8[33];
ctxt->m.b8[35] = tctxt.m.b8[32];
ctxt->m.b8[36] = tctxt.m.b8[39];
ctxt->m.b8[37] = tctxt.m.b8[38];
ctxt->m.b8[38] = tctxt.m.b8[37];
ctxt->m.b8[39] = tctxt.m.b8[36];
ctxt->m.b8[40] = tctxt.m.b8[43];
ctxt->m.b8[41] = tctxt.m.b8[42];
ctxt->m.b8[42] = tctxt.m.b8[41];
ctxt->m.b8[43] = tctxt.m.b8[40];
ctxt->m.b8[44] = tctxt.m.b8[47];
ctxt->m.b8[45] = tctxt.m.b8[46];
ctxt->m.b8[46] = tctxt.m.b8[45];
ctxt->m.b8[47] = tctxt.m.b8[44];
ctxt->m.b8[48] = tctxt.m.b8[51];
ctxt->m.b8[49] = tctxt.m.b8[50];
ctxt->m.b8[50] = tctxt.m.b8[49];
ctxt->m.b8[51] = tctxt.m.b8[48];
ctxt->m.b8[52] = tctxt.m.b8[55];
ctxt->m.b8[53] = tctxt.m.b8[54];
ctxt->m.b8[54] = tctxt.m.b8[53];
ctxt->m.b8[55] = tctxt.m.b8[52];
ctxt->m.b8[56] = tctxt.m.b8[59];
ctxt->m.b8[57] = tctxt.m.b8[58];
ctxt->m.b8[58] = tctxt.m.b8[57];
ctxt->m.b8[59] = tctxt.m.b8[56];
ctxt->m.b8[60] = tctxt.m.b8[63];
ctxt->m.b8[61] = tctxt.m.b8[62];
ctxt->m.b8[62] = tctxt.m.b8[61];
ctxt->m.b8[63] = tctxt.m.b8[60];
#endif
a = H(0);
b = H(1);
c = H(2);
d = H(3);
e = H(4);
for (t = 0; t < 20; t++)
{
s = t & 0x0f;
if (t >= 16)
W(s) = S(1, W((s + 13) & 0x0f) ^ W((s + 8) & 0x0f) ^ W((s + 2) & 0x0f) ^ W(s));
tmp = S(5, a) + F0(b, c, d) + e + W(s) + K(t);
e = d;
d = c;
c = S(30, b);
b = a;
a = tmp;
}
for (t = 20; t < 40; t++)
{
s = t & 0x0f;
W(s) = S(1, W((s + 13) & 0x0f) ^ W((s + 8) & 0x0f) ^ W((s + 2) & 0x0f) ^ W(s));
tmp = S(5, a) + F1(b, c, d) + e + W(s) + K(t);
e = d;
d = c;
c = S(30, b);
b = a;
a = tmp;
}
for (t = 40; t < 60; t++)
{
s = t & 0x0f;
W(s) = S(1, W((s + 13) & 0x0f) ^ W((s + 8) & 0x0f) ^ W((s + 2) & 0x0f) ^ W(s));
tmp = S(5, a) + F2(b, c, d) + e + W(s) + K(t);
e = d;
d = c;
c = S(30, b);
b = a;
a = tmp;
}
for (t = 60; t < 80; t++)
{
s = t & 0x0f;
W(s) = S(1, W((s + 13) & 0x0f) ^ W((s + 8) & 0x0f) ^ W((s + 2) & 0x0f) ^ W(s));
tmp = S(5, a) + F3(b, c, d) + e + W(s) + K(t);
e = d;
d = c;
c = S(30, b);
b = a;
a = tmp;
}
H(0) = H(0) + a;
H(1) = H(1) + b;
H(2) = H(2) + c;
H(3) = H(3) + d;
H(4) = H(4) + e;
memset(&ctxt->m.b8[0], 0, 64);
}
/*------------------------------------------------------------*/
void
sha1_init(struct sha1_ctxt * ctxt)
{
memset(ctxt, 0, sizeof(struct sha1_ctxt));
H(0) = 0x67452301;
H(1) = 0xefcdab89;
H(2) = 0x98badcfe;
H(3) = 0x10325476;
H(4) = 0xc3d2e1f0;
}
void
sha1_pad(struct sha1_ctxt * ctxt)
{
size_t padlen; /* pad length in bytes */
size_t padstart;
PUTPAD(0x80);
padstart = COUNT % 64;
padlen = 64 - padstart;
if (padlen < 8)
{
memset(&ctxt->m.b8[padstart], 0, padlen);
COUNT += (uint8_t) padlen;
COUNT %= 64;
sha1_step(ctxt);
padstart = COUNT % 64; /* should be 0 */
padlen = 64 - padstart; /* should be 64 */
}
memset(&ctxt->m.b8[padstart], 0, padlen - 8);
COUNT += ((uint8_t) padlen - 8);
COUNT %= 64;
#ifdef WORDS_BIGENDIAN
PUTPAD(ctxt->c.b8[0]);
PUTPAD(ctxt->c.b8[1]);
PUTPAD(ctxt->c.b8[2]);
PUTPAD(ctxt->c.b8[3]);
PUTPAD(ctxt->c.b8[4]);
PUTPAD(ctxt->c.b8[5]);
PUTPAD(ctxt->c.b8[6]);
PUTPAD(ctxt->c.b8[7]);
#else
PUTPAD(ctxt->c.b8[7]);
PUTPAD(ctxt->c.b8[6]);
PUTPAD(ctxt->c.b8[5]);
PUTPAD(ctxt->c.b8[4]);
PUTPAD(ctxt->c.b8[3]);
PUTPAD(ctxt->c.b8[2]);
PUTPAD(ctxt->c.b8[1]);
PUTPAD(ctxt->c.b8[0]);
#endif
}
void
sha1_loop(struct sha1_ctxt * ctxt, const uint8_t *input0, size_t len)
{
const uint8_t *input;
size_t gaplen;
size_t gapstart;
size_t off;
size_t copysiz;
input = (const uint8_t *) input0;
off = 0;
while (off < len)
{
gapstart = COUNT % 64;
gaplen = 64 - gapstart;
copysiz = (gaplen < len - off) ? gaplen : len - off;
memmove(&ctxt->m.b8[gapstart], &input[off], copysiz);
COUNT += (uint8_t) copysiz;
COUNT %= 64;
ctxt->c.b64[0] += copysiz * 8;
if (COUNT % 64 == 0)
sha1_step(ctxt);
off += copysiz;
}
}
void
sha1_result(struct sha1_ctxt * ctxt, uint8_t *digest0)
{
uint8_t *digest;
digest = (uint8_t *) digest0;
sha1_pad(ctxt);
#ifdef WORDS_BIGENDIAN
memmove(digest, &ctxt->h.b8[0], 20);
#else
digest[0] = ctxt->h.b8[3];
digest[1] = ctxt->h.b8[2];
digest[2] = ctxt->h.b8[1];
digest[3] = ctxt->h.b8[0];
digest[4] = ctxt->h.b8[7];
digest[5] = ctxt->h.b8[6];
digest[6] = ctxt->h.b8[5];
digest[7] = ctxt->h.b8[4];
digest[8] = ctxt->h.b8[11];
digest[9] = ctxt->h.b8[10];
digest[10] = ctxt->h.b8[9];
digest[11] = ctxt->h.b8[8];
digest[12] = ctxt->h.b8[15];
digest[13] = ctxt->h.b8[14];
digest[14] = ctxt->h.b8[13];
digest[15] = ctxt->h.b8[12];
digest[16] = ctxt->h.b8[19];
digest[17] = ctxt->h.b8[18];
digest[18] = ctxt->h.b8[17];
digest[19] = ctxt->h.b8[16];
#endif
}
\ No newline at end of file
/* contrib/pgcrypto/sha1.h */
/* $KAME: sha1.h,v 1.4 2000/02/22 14:01:18 itojun Exp $ */
/*
* Copyright (C) 1995, 1996, 1997, and 1998 WIDE Project.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. Neither the name of the project nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE PROJECT OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
/*
* FIPS pub 180-1: Secure Hash Algorithm (SHA-1)
* THIS SOFTWARE IS PROVIDED BY THE PROJECT AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* based on: http://www.itl.nist.gov/fipspubs/fip180-1.htm
* implemented by Jun-ichiro itojun Itoh <itojun@itojun.org>
*/
#ifndef _NETINET6_SHA1_H_
#define _NETINET6_SHA1_H_
#ifdef __cplusplus
extern "C" {
#endif
#include <stdlib.h>
#include "../../src/stdint.hpp"
struct sha1_ctxt
{
union
{
uint8_t b8[20];
uint32_t b32[5];
} h;
union
{
uint8_t b8[8];
uint64_t b64[1];
} c;
union
{
uint8_t b8[64];
uint32_t b32[16];
} m;
uint8_t count;
};
void sha1_init(struct sha1_ctxt *);
void sha1_pad(struct sha1_ctxt *);
void sha1_loop(struct sha1_ctxt *, const uint8_t *, size_t);
void sha1_result(struct sha1_ctxt *, uint8_t *);
// Compatibility with OpenSSL API
#define SHA_DIGEST_LENGTH 20
typedef struct sha1_ctxt SHA_CTX;
#define SHA1_Init(x) sha1_init((x))
#define SHA1_Update(x, y, z) sha1_loop((x), (y), (z))
#define SHA1_Final(x, y) sha1_result((y), (x))
#define SHA1_RESULTLEN (160/8)
#ifdef __cplusplus
}
#endif
#endif /* _NETINET6_SHA1_H_ */
...@@ -475,6 +475,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (const zmq_msg_t *msg_, ...@@ -475,6 +475,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (const zmq_msg_t *msg_,
#define ZMQ_PROTOCOL_ERROR_ZAP_BAD_VERSION 0x20000003 #define ZMQ_PROTOCOL_ERROR_ZAP_BAD_VERSION 0x20000003
#define ZMQ_PROTOCOL_ERROR_ZAP_INVALID_STATUS_CODE 0x20000004 #define ZMQ_PROTOCOL_ERROR_ZAP_INVALID_STATUS_CODE 0x20000004
#define ZMQ_PROTOCOL_ERROR_ZAP_INVALID_METADATA 0x20000005 #define ZMQ_PROTOCOL_ERROR_ZAP_INVALID_METADATA 0x20000005
#define ZMQ_PROTOCOL_ERROR_WS_UNSPECIFIED 0x30000000
ZMQ_EXPORT void *zmq_socket (void *, int type_); ZMQ_EXPORT void *zmq_socket (void *, int type_);
ZMQ_EXPORT int zmq_close (void *s_); ZMQ_EXPORT int zmq_close (void *s_);
......
...@@ -56,7 +56,7 @@ zmq::address_t::address_t (const std::string &protocol_, ...@@ -56,7 +56,7 @@ zmq::address_t::address_t (const std::string &protocol_,
zmq::address_t::~address_t () zmq::address_t::~address_t ()
{ {
if (protocol == protocol_name::tcp) { if (protocol == protocol_name::tcp || protocol == protocol_name::ws) {
LIBZMQ_DELETE (resolved.tcp_addr); LIBZMQ_DELETE (resolved.tcp_addr);
} else if (protocol == protocol_name::udp) { } else if (protocol == protocol_name::udp) {
LIBZMQ_DELETE (resolved.udp_addr); LIBZMQ_DELETE (resolved.udp_addr);
......
...@@ -60,6 +60,7 @@ namespace protocol_name ...@@ -60,6 +60,7 @@ namespace protocol_name
static const char inproc[] = "inproc"; static const char inproc[] = "inproc";
static const char tcp[] = "tcp"; static const char tcp[] = "tcp";
static const char udp[] = "udp"; static const char udp[] = "udp";
static const char ws[] = "ws";
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \ #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
&& !defined ZMQ_HAVE_VXWORKS && !defined ZMQ_HAVE_VXWORKS
static const char ipc[] = "ipc"; static const char ipc[] = "ipc";
......
...@@ -35,6 +35,7 @@ ...@@ -35,6 +35,7 @@
#include "pipe.hpp" #include "pipe.hpp"
#include "likely.hpp" #include "likely.hpp"
#include "tcp_connecter.hpp" #include "tcp_connecter.hpp"
#include "ws_connecter.hpp"
#include "ipc_connecter.hpp" #include "ipc_connecter.hpp"
#include "tipc_connecter.hpp" #include "tipc_connecter.hpp"
#include "socks_connecter.hpp" #include "socks_connecter.hpp"
...@@ -559,6 +560,8 @@ zmq::session_base_t::connecter_factory_entry_t ...@@ -559,6 +560,8 @@ zmq::session_base_t::connecter_factory_entry_t
zmq::session_base_t::_connecter_factories[] = { zmq::session_base_t::_connecter_factories[] = {
connecter_factory_entry_t (protocol_name::tcp, connecter_factory_entry_t (protocol_name::tcp,
&zmq::session_base_t::create_connecter_tcp), &zmq::session_base_t::create_connecter_tcp),
connecter_factory_entry_t (protocol_name::ws,
&zmq::session_base_t::create_connecter_ws),
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \ #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
&& !defined ZMQ_HAVE_VXWORKS && !defined ZMQ_HAVE_VXWORKS
connecter_factory_entry_t (protocol_name::ipc, connecter_factory_entry_t (protocol_name::ipc,
...@@ -681,6 +684,13 @@ zmq::own_t *zmq::session_base_t::create_connecter_tcp (io_thread_t *io_thread_, ...@@ -681,6 +684,13 @@ zmq::own_t *zmq::session_base_t::create_connecter_tcp (io_thread_t *io_thread_,
tcp_connecter_t (io_thread_, this, options, _addr, wait_); tcp_connecter_t (io_thread_, this, options, _addr, wait_);
} }
zmq::own_t *zmq::session_base_t::create_connecter_ws (io_thread_t *io_thread_,
bool wait_)
{
return new (std::nothrow)
ws_connecter_t (io_thread_, this, options, _addr, wait_);
}
#ifdef ZMQ_HAVE_OPENPGM #ifdef ZMQ_HAVE_OPENPGM
void zmq::session_base_t::start_connecting_pgm (io_thread_t *io_thread_) void zmq::session_base_t::start_connecting_pgm (io_thread_t *io_thread_)
{ {
......
...@@ -118,6 +118,7 @@ class session_base_t : public own_t, public io_object_t, public i_pipe_events ...@@ -118,6 +118,7 @@ class session_base_t : public own_t, public io_object_t, public i_pipe_events
own_t *create_connecter_tipc (io_thread_t *io_thread_, bool wait_); own_t *create_connecter_tipc (io_thread_t *io_thread_, bool wait_);
own_t *create_connecter_ipc (io_thread_t *io_thread_, bool wait_); own_t *create_connecter_ipc (io_thread_t *io_thread_, bool wait_);
own_t *create_connecter_tcp (io_thread_t *io_thread_, bool wait_); own_t *create_connecter_tcp (io_thread_t *io_thread_, bool wait_);
own_t *create_connecter_ws (io_thread_t *io_thread_, bool wait_);
typedef void (session_base_t::*start_connecting_fun_t) ( typedef void (session_base_t::*start_connecting_fun_t) (
io_thread_t *io_thread); io_thread_t *io_thread);
......
...@@ -50,6 +50,7 @@ ...@@ -50,6 +50,7 @@
#include "socket_base.hpp" #include "socket_base.hpp"
#include "tcp_listener.hpp" #include "tcp_listener.hpp"
#include "ws_listener.hpp"
#include "ipc_listener.hpp" #include "ipc_listener.hpp"
#include "tipc_listener.hpp" #include "tipc_listener.hpp"
#include "tcp_connecter.hpp" #include "tcp_connecter.hpp"
...@@ -333,6 +334,7 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) const ...@@ -333,6 +334,7 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) const
&& protocol_ != protocol_name::ipc && protocol_ != protocol_name::ipc
#endif #endif
&& protocol_ != protocol_name::tcp && protocol_ != protocol_name::tcp
&& protocol_ != protocol_name::ws
#if defined ZMQ_HAVE_OPENPGM #if defined ZMQ_HAVE_OPENPGM
// pgm/epgm transports only available if 0MQ is compiled with OpenPGM. // pgm/epgm transports only available if 0MQ is compiled with OpenPGM.
&& protocol_ != "pgm" && protocol_ != "pgm"
...@@ -629,6 +631,27 @@ int zmq::socket_base_t::bind (const char *endpoint_uri_) ...@@ -629,6 +631,27 @@ int zmq::socket_base_t::bind (const char *endpoint_uri_)
return 0; return 0;
} }
if (protocol == protocol_name::ws) {
ws_listener_t *listener =
new (std::nothrow) ws_listener_t (io_thread, this, options);
alloc_assert (listener);
rc = listener->set_local_address (address.c_str ());
if (rc != 0) {
LIBZMQ_DELETE (listener);
event_bind_failed (make_unconnected_bind_endpoint_pair (address),
zmq_errno ());
return -1;
}
// Save last endpoint URI
listener->get_local_address (_last_endpoint);
add_endpoint (make_unconnected_bind_endpoint_pair (_last_endpoint),
static_cast<own_t *> (listener), NULL);
options.connected = true;
return 0;
}
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \ #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
&& !defined ZMQ_HAVE_VXWORKS && !defined ZMQ_HAVE_VXWORKS
if (protocol == protocol_name::ipc) { if (protocol == protocol_name::ipc) {
...@@ -865,6 +888,47 @@ int zmq::socket_base_t::connect (const char *endpoint_uri_) ...@@ -865,6 +888,47 @@ int zmq::socket_base_t::connect (const char *endpoint_uri_)
} }
// Defer resolution until a socket is opened // Defer resolution until a socket is opened
paddr->resolved.tcp_addr = NULL; paddr->resolved.tcp_addr = NULL;
} else if (protocol == protocol_name::ws) {
// Do some basic sanity checks on ws:// address syntax
// - hostname starts with digit or letter, with embedded '-' or '.'
// - IPv6 address may contain hex chars and colons.
// - IPv6 link local address may contain % followed by interface name / zone_id
// (Reference: https://tools.ietf.org/html/rfc4007)
// - IPv4 address may contain decimal digits and dots.
// - Address must end in ":port" where port is *, or numeric
// - Address may contain two parts separated by ':'
// Following code is quick and dirty check to catch obvious errors,
// without trying to be fully accurate.
const char *check = address.c_str ();
if (isalnum (*check) || isxdigit (*check) || *check == '['
|| *check == ':') {
check++;
while (isalnum (*check) || isxdigit (*check) || *check == '.'
|| *check == '-' || *check == ':' || *check == '%'
|| *check == ';' || *check == '[' || *check == ']'
|| *check == '_' || *check == '*') {
check++;
}
}
// Assume the worst, now look for success
rc = -1;
// Did we reach the end of the address safely?
if (*check == 0) {
// Do we have a valid port string? (cannot be '*' in connect
check = strrchr (address.c_str (), ':');
if (check) {
check++;
if (*check && (isdigit (*check)))
rc = 0; // Valid
}
}
if (rc == -1) {
errno = EINVAL;
LIBZMQ_DELETE (paddr);
return -1;
}
// Defer resolution until a socket is opened
paddr->resolved.tcp_addr = NULL;
} }
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \ #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
&& !defined ZMQ_HAVE_VXWORKS && !defined ZMQ_HAVE_VXWORKS
......
...@@ -53,9 +53,9 @@ zmq::stream_connecter_base_t::stream_connecter_base_t ( ...@@ -53,9 +53,9 @@ zmq::stream_connecter_base_t::stream_connecter_base_t (
_s (retired_fd), _s (retired_fd),
_handle (static_cast<handle_t> (NULL)), _handle (static_cast<handle_t> (NULL)),
_socket (session_->get_socket ()), _socket (session_->get_socket ()),
_session (session_),
_delayed_start (delayed_start_), _delayed_start (delayed_start_),
_reconnect_timer_started (false), _reconnect_timer_started (false),
_session (session_),
_current_reconnect_ivl (options.reconnect_ivl) _current_reconnect_ivl (options.reconnect_ivl)
{ {
zmq_assert (_addr); zmq_assert (_addr);
......
...@@ -63,7 +63,7 @@ class stream_connecter_base_t : public own_t, public io_object_t ...@@ -63,7 +63,7 @@ class stream_connecter_base_t : public own_t, public io_object_t
void timer_event (int id_); void timer_event (int id_);
// Internal function to create the engine after connection was established. // Internal function to create the engine after connection was established.
void create_engine (fd_t fd, const std::string &local_address_); virtual void create_engine (fd_t fd, const std::string &local_address_);
// Internal function to add a reconnect timer // Internal function to add a reconnect timer
void add_reconnect_timer (); void add_reconnect_timer ();
...@@ -91,6 +91,9 @@ class stream_connecter_base_t : public own_t, public io_object_t ...@@ -91,6 +91,9 @@ class stream_connecter_base_t : public own_t, public io_object_t
// Socket // Socket
zmq::socket_base_t *const _socket; zmq::socket_base_t *const _socket;
// Reference to the session we belong to.
zmq::session_base_t *const _session;
private: private:
// ID of the timer used to delay the reconnection. // ID of the timer used to delay the reconnection.
enum enum
...@@ -111,9 +114,6 @@ class stream_connecter_base_t : public own_t, public io_object_t ...@@ -111,9 +114,6 @@ class stream_connecter_base_t : public own_t, public io_object_t
// True iff a timer has been started. // True iff a timer has been started.
bool _reconnect_timer_started; bool _reconnect_timer_started;
// Reference to the session we belong to.
zmq::session_base_t *const _session;
// Current reconnect ivl, updated for backoff strategy // Current reconnect ivl, updated for backoff strategy
int _current_reconnect_ivl; int _current_reconnect_ivl;
......
...@@ -67,7 +67,7 @@ class stream_listener_base_t : public own_t, public io_object_t ...@@ -67,7 +67,7 @@ class stream_listener_base_t : public own_t, public io_object_t
// Close the listening socket. // Close the listening socket.
virtual int close (); virtual int close ();
void create_engine (fd_t fd); virtual void create_engine (fd_t fd);
// Underlying socket. // Underlying socket.
fd_t _s; fd_t _s;
......
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "precompiled.hpp"
#include <new>
#include <string>
#include "macros.hpp"
#include "ws_connecter.hpp"
#include "stream_engine.hpp"
#include "io_thread.hpp"
#include "err.hpp"
#include "ip.hpp"
#include "tcp.hpp"
#include "address.hpp"
#include "tcp_address.hpp"
#include "session_base.hpp"
#include "ws_engine.hpp"
#if !defined ZMQ_HAVE_WINDOWS
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <netinet/in.h>
#include <netdb.h>
#include <fcntl.h>
#ifdef ZMQ_HAVE_VXWORKS
#include <sockLib.h>
#endif
#ifdef ZMQ_HAVE_OPENVMS
#include <ioctl.h>
#endif
#endif
#ifdef __APPLE__
#include <TargetConditionals.h>
#endif
zmq::ws_connecter_t::ws_connecter_t (class io_thread_t *io_thread_,
class session_base_t *session_,
const options_t &options_,
address_t *addr_,
bool delayed_start_) :
stream_connecter_base_t (
io_thread_, session_, options_, addr_, delayed_start_),
_connect_timer_started (false)
{
zmq_assert (_addr->protocol == protocol_name::ws);
}
zmq::ws_connecter_t::~ws_connecter_t ()
{
zmq_assert (!_connect_timer_started);
}
void zmq::ws_connecter_t::process_term (int linger_)
{
if (_connect_timer_started) {
cancel_timer (connect_timer_id);
_connect_timer_started = false;
}
stream_connecter_base_t::process_term (linger_);
}
void zmq::ws_connecter_t::out_event ()
{
if (_connect_timer_started) {
cancel_timer (connect_timer_id);
_connect_timer_started = false;
}
// TODO this is still very similar to (t)ipc_connecter_t, maybe the
// differences can be factored out
rm_handle ();
const fd_t fd = connect ();
// Handle the error condition by attempt to reconnect.
if (fd == retired_fd || !tune_socket (fd)) {
close ();
add_reconnect_timer ();
return;
}
create_engine (fd, get_socket_name<tcp_address_t> (fd, socket_end_local));
}
void zmq::ws_connecter_t::timer_event (int id_)
{
if (id_ == connect_timer_id) {
_connect_timer_started = false;
rm_handle ();
close ();
add_reconnect_timer ();
} else
stream_connecter_base_t::timer_event (id_);
}
void zmq::ws_connecter_t::start_connecting ()
{
// Open the connecting socket.
const int rc = open ();
// Connect may succeed in synchronous manner.
if (rc == 0) {
_handle = add_fd (_s);
out_event ();
}
// Connection establishment may be delayed. Poll for its completion.
else if (rc == -1 && errno == EINPROGRESS) {
_handle = add_fd (_s);
set_pollout (_handle);
_socket->event_connect_delayed (
make_unconnected_connect_endpoint_pair (_endpoint), zmq_errno ());
// add userspace connect timeout
add_connect_timer ();
}
// Handle any other error condition by eventual reconnect.
else {
if (_s != retired_fd)
close ();
add_reconnect_timer ();
}
}
void zmq::ws_connecter_t::add_connect_timer ()
{
if (options.connect_timeout > 0) {
add_timer (options.connect_timeout, connect_timer_id);
_connect_timer_started = true;
}
}
int zmq::ws_connecter_t::open ()
{
zmq_assert (_s == retired_fd);
// Resolve the address
if (_addr->resolved.tcp_addr != NULL) {
LIBZMQ_DELETE (_addr->resolved.tcp_addr);
}
_addr->resolved.tcp_addr = new (std::nothrow) tcp_address_t ();
alloc_assert (_addr->resolved.tcp_addr);
_s = tcp_open_socket (_addr->address.c_str (), options, false, true,
_addr->resolved.tcp_addr);
if (_s == retired_fd) {
// TODO we should emit some event in this case!
LIBZMQ_DELETE (_addr->resolved.tcp_addr);
return -1;
}
zmq_assert (_addr->resolved.tcp_addr != NULL);
// Set the socket to non-blocking mode so that we get async connect().
unblock_socket (_s);
const tcp_address_t *const tcp_addr = _addr->resolved.tcp_addr;
int rc;
// Set a source address for conversations
if (tcp_addr->has_src_addr ()) {
// Allow reusing of the address, to connect to different servers
// using the same source port on the client.
int flag = 1;
#ifdef ZMQ_HAVE_WINDOWS
rc = setsockopt (_s, SOL_SOCKET, SO_REUSEADDR,
reinterpret_cast<const char *> (&flag), sizeof (int));
wsa_assert (rc != SOCKET_ERROR);
#elif defined ZMQ_HAVE_VXWORKS
rc = setsockopt (_s, SOL_SOCKET, SO_REUSEADDR, (char *) &flag,
sizeof (int));
errno_assert (rc == 0);
#else
rc = setsockopt (_s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
errno_assert (rc == 0);
#endif
#if defined ZMQ_HAVE_VXWORKS
rc = ::bind (_s, (sockaddr *) tcp_addr->src_addr (),
tcp_addr->src_addrlen ());
#else
rc = ::bind (_s, tcp_addr->src_addr (), tcp_addr->src_addrlen ());
#endif
if (rc == -1)
return -1;
}
// Connect to the remote peer.
#if defined ZMQ_HAVE_VXWORKS
rc = ::connect (_s, (sockaddr *) tcp_addr->addr (), tcp_addr->addrlen ());
#else
rc = ::connect (_s, tcp_addr->addr (), tcp_addr->addrlen ());
#endif
// Connect was successful immediately.
if (rc == 0) {
return 0;
}
// Translate error codes indicating asynchronous connect has been
// launched to a uniform EINPROGRESS.
#ifdef ZMQ_HAVE_WINDOWS
const int last_error = WSAGetLastError ();
if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK)
errno = EINPROGRESS;
else
errno = wsa_error_to_errno (last_error);
#else
if (errno == EINTR)
errno = EINPROGRESS;
#endif
return -1;
}
zmq::fd_t zmq::ws_connecter_t::connect ()
{
// Async connect has finished. Check whether an error occurred
int err = 0;
#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS
int len = sizeof err;
#else
socklen_t len = sizeof err;
#endif
const int rc = getsockopt (_s, SOL_SOCKET, SO_ERROR,
reinterpret_cast<char *> (&err), &len);
// Assert if the error was caused by 0MQ bug.
// Networking problems are OK. No need to assert.
#ifdef ZMQ_HAVE_WINDOWS
zmq_assert (rc == 0);
if (err != 0) {
if (err == WSAEBADF || err == WSAENOPROTOOPT || err == WSAENOTSOCK
|| err == WSAENOBUFS) {
wsa_assert_no (err);
}
return retired_fd;
}
#else
// Following code should handle both Berkeley-derived socket
// implementations and Solaris.
if (rc == -1)
err = errno;
if (err != 0) {
errno = err;
#if !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE
errno_assert (errno != EBADF && errno != ENOPROTOOPT
&& errno != ENOTSOCK && errno != ENOBUFS);
#else
errno_assert (errno != ENOPROTOOPT && errno != ENOTSOCK
&& errno != ENOBUFS);
#endif
return retired_fd;
}
#endif
// Return the newly connected socket.
const fd_t result = _s;
_s = retired_fd;
return result;
}
bool zmq::ws_connecter_t::tune_socket (const fd_t fd_)
{
const int rc =
tune_tcp_socket (fd_) | tune_tcp_maxrt (fd_, options.tcp_maxrt);
return rc == 0;
}
void zmq::ws_connecter_t::create_engine (fd_t fd,
const std::string &local_address_)
{
const endpoint_uri_pair_t endpoint_pair (local_address_, _endpoint,
endpoint_type_connect);
// Create the engine object for this connection.
ws_engine_t *engine =
new (std::nothrow) ws_engine_t (fd, options, endpoint_pair, true);
alloc_assert (engine);
// Attach the engine to the corresponding session object.
send_attach (_session, engine);
// Shut the connecter down.
terminate ();
_socket->event_connected (endpoint_pair, fd);
}
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __WS_CONNECTER_HPP_INCLUDED__
#define __WS_CONNECTER_HPP_INCLUDED__
#include "fd.hpp"
#include "stdint.hpp"
#include "stream_connecter_base.hpp"
namespace zmq
{
class ws_connecter_t : public stream_connecter_base_t
{
public:
// If 'delayed_start' is true connecter first waits for a while,
// then starts connection process.
ws_connecter_t (zmq::io_thread_t *io_thread_,
zmq::session_base_t *session_,
const options_t &options_,
address_t *addr_,
bool delayed_start_);
~ws_connecter_t ();
protected:
void create_engine (fd_t fd, const std::string &local_address_);
private:
// ID of the timer used to check the connect timeout, must be different from stream_connecter_base_t::reconnect_timer_id.
enum
{
connect_timer_id = 2
};
// Handlers for incoming commands.
void process_term (int linger_);
// Handlers for I/O events.
void out_event ();
void timer_event (int id_);
// Internal function to start the actual connection establishment.
void start_connecting ();
// Internal function to add a connect timer
void add_connect_timer ();
// Open TCP connecting socket. Returns -1 in case of error,
// 0 if connect was successful immediately. Returns -1 with
// EAGAIN errno if async connect was launched.
int open ();
// Get the file descriptor of newly created connection. Returns
// retired_fd if the connection was unsuccessful.
fd_t connect ();
// Tunes a connected socket.
bool tune_socket (fd_t fd_);
// True iff a timer has been started.
bool _connect_timer_started;
ws_connecter_t (const ws_connecter_t &);
const ws_connecter_t &operator= (const ws_connecter_t &);
};
}
#endif
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "precompiled.hpp"
#include <stdlib.h>
#include <string.h>
#include <cmath>
#include "ws_protocol.hpp"
#include "ws_decoder.hpp"
#include "likely.hpp"
#include "wire.hpp"
#include "err.hpp"
zmq::ws_decoder_t::ws_decoder_t (size_t bufsize_,
int64_t maxmsgsize_,
bool zero_copy_,
bool must_mask_) :
decoder_base_t<ws_decoder_t, shared_message_memory_allocator> (bufsize_),
_msg_flags (0),
_zero_copy (zero_copy_),
_max_msg_size (maxmsgsize_),
_must_mask (must_mask_),
_size (0)
{
int rc = _in_progress.init ();
errno_assert (rc == 0);
// At the beginning, read one byte and go to opcode_ready state.
next_step (_tmpbuf, 1, &ws_decoder_t::opcode_ready);
}
zmq::ws_decoder_t::~ws_decoder_t ()
{
int rc = _in_progress.close ();
errno_assert (rc == 0);
}
int zmq::ws_decoder_t::opcode_ready (unsigned char const *)
{
bool final = (_tmpbuf[0] & 0x80) != 0; // final bit
if (!final)
return -1; // non final messages are not supported
_opcode = (zmq::ws_protocol_t::opcode_t) (_tmpbuf[0] & 0xF);
_msg_flags = 0;
switch (_opcode) {
case zmq::ws_protocol_t::opcode_binary:
break;
case zmq::ws_protocol_t::opcode_close:
_msg_flags = msg_t::command; // TODO: set the command name to CLOSE
break;
case zmq::ws_protocol_t::opcode_ping:
_msg_flags = msg_t::ping;
break;
case zmq::ws_protocol_t::opcode_pong:
_msg_flags = msg_t::pong;
break;
default:
return -1;
}
next_step (_tmpbuf, 1, &ws_decoder_t::size_first_byte_ready);
return 0;
}
int zmq::ws_decoder_t::size_first_byte_ready (unsigned char const *read_from_)
{
bool is_masked = (_tmpbuf[0] & 0x80) != 0;
if (is_masked != _must_mask) // wrong mask value
return -1;
_size = (uint64_t) (_tmpbuf[0] & 0x7F);
if (_size < 126) {
if (_must_mask)
next_step (_tmpbuf, 4, &ws_decoder_t::mask_ready);
else if (_opcode == ws_protocol_t::opcode_binary) {
if (_size == 0)
return -1;
next_step (_tmpbuf, 1, &ws_decoder_t::flags_ready);
} else
return size_ready (read_from_);
} else if (_size == 126)
next_step (_tmpbuf, 2, &ws_decoder_t::short_size_ready);
else
next_step (_tmpbuf, 8, &ws_decoder_t::long_size_ready);
return 0;
}
int zmq::ws_decoder_t::short_size_ready (unsigned char const *read_from_)
{
_size = (_tmpbuf[0] << 8) | _tmpbuf[1];
if (_must_mask)
next_step (_tmpbuf, 4, &ws_decoder_t::mask_ready);
else if (_opcode == ws_protocol_t::opcode_binary) {
if (_size == 0)
return -1;
next_step (_tmpbuf, 1, &ws_decoder_t::flags_ready);
} else
return size_ready (read_from_);
return 0;
}
int zmq::ws_decoder_t::long_size_ready (unsigned char const *read_from_)
{
// The payload size is encoded as 64-bit unsigned integer.
// The most significant byte comes first.
_size = get_uint64 (_tmpbuf);
if (_must_mask)
next_step (_tmpbuf, 4, &ws_decoder_t::mask_ready);
else if (_opcode == ws_protocol_t::opcode_binary) {
if (_size == 0)
return -1;
next_step (_tmpbuf, 1, &ws_decoder_t::flags_ready);
} else
return size_ready (read_from_);
return 0;
}
int zmq::ws_decoder_t::mask_ready (unsigned char const *read_from_)
{
memcpy (_mask, _tmpbuf, 4);
if (_opcode == ws_protocol_t::opcode_binary) {
if (_size == 0)
return -1;
next_step (_tmpbuf, 1, &ws_decoder_t::flags_ready);
} else
return size_ready (read_from_);
return 0;
}
int zmq::ws_decoder_t::flags_ready (unsigned char const *read_from_)
{
unsigned char flags;
if (_must_mask)
flags = _tmpbuf[0] ^ _mask[0];
else
flags = _tmpbuf[0];
if (flags & ws_protocol_t::more_flag)
_msg_flags |= msg_t::more;
if (flags & ws_protocol_t::command_flag)
_msg_flags |= msg_t::command;
_size--;
return size_ready (read_from_);
}
int zmq::ws_decoder_t::size_ready (unsigned char const *read_pos_)
{
// Message size must not exceed the maximum allowed size.
if (_max_msg_size >= 0)
if (unlikely (_size > static_cast<uint64_t> (_max_msg_size))) {
errno = EMSGSIZE;
return -1;
}
// Message size must fit into size_t data type.
if (unlikely (_size != static_cast<size_t> (_size))) {
errno = EMSGSIZE;
return -1;
}
int rc = _in_progress.close ();
assert (rc == 0);
// the current message can exceed the current buffer. We have to copy the buffer
// data into a new message and complete it in the next receive.
shared_message_memory_allocator &allocator = get_allocator ();
if (unlikely (!_zero_copy
|| _size > (size_t) (allocator.data () + allocator.size ()
- read_pos_))) {
// a new message has started, but the size would exceed the pre-allocated arena
// this happens every time when a message does not fit completely into the buffer
rc = _in_progress.init_size (static_cast<size_t> (_size));
} else {
// construct message using n bytes from the buffer as storage
// increase buffer ref count
// if the message will be a large message, pass a valid refcnt memory location as well
rc = _in_progress.init (
const_cast<unsigned char *> (read_pos_), static_cast<size_t> (_size),
shared_message_memory_allocator::call_dec_ref, allocator.buffer (),
allocator.provide_content ());
// For small messages, data has been copied and refcount does not have to be increased
if (_in_progress.is_zcmsg ()) {
allocator.advance_content ();
allocator.inc_ref ();
}
}
if (unlikely (rc)) {
errno_assert (errno == ENOMEM);
rc = _in_progress.init ();
errno_assert (rc == 0);
errno = ENOMEM;
return -1;
}
_in_progress.set_flags (_msg_flags);
// this sets read_pos to
// the message data address if the data needs to be copied
// for small message / messages exceeding the current buffer
// or
// to the current start address in the buffer because the message
// was constructed to use n bytes from the address passed as argument
next_step (_in_progress.data (), _in_progress.size (),
&ws_decoder_t::message_ready);
return 0;
}
int zmq::ws_decoder_t::message_ready (unsigned char const *)
{
if (_must_mask) {
int mask_index = _opcode == ws_protocol_t::opcode_binary ? 1 : 0;
unsigned char *data = (unsigned char *) _in_progress.data ();
for (size_t i = 0; i < _size; ++i, mask_index++)
data[i] = data[i] ^ _mask[mask_index % 4];
}
// Message is completely read. Signal this to the caller
// and prepare to decode next message.
next_step (_tmpbuf, 1, &ws_decoder_t::opcode_ready);
return 1;
}
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_WS_DECODER_HPP_INCLUDED__
#define __ZMQ_WS_DECODER_HPP_INCLUDED__
#include "decoder.hpp"
#include "decoder_allocators.hpp"
#include "ws_protocol.hpp"
namespace zmq
{
// Decoder for Web socket framing protocol. Converts data stream into messages.
// The class has to inherit from shared_message_memory_allocator because
// the base class calls allocate in its constructor.
class ws_decoder_t
: public decoder_base_t<ws_decoder_t, shared_message_memory_allocator>
{
public:
ws_decoder_t (size_t bufsize_,
int64_t maxmsgsize_,
bool zero_copy_,
bool must_mask_);
virtual ~ws_decoder_t ();
// i_decoder interface.
virtual msg_t *msg () { return &_in_progress; }
private:
int opcode_ready (unsigned char const *);
int size_first_byte_ready (unsigned char const *);
int short_size_ready (unsigned char const *);
int long_size_ready (unsigned char const *);
int mask_ready (unsigned char const *);
int flags_ready (unsigned char const *);
int message_ready (unsigned char const *);
int size_ready (unsigned char const *);
unsigned char _tmpbuf[8];
unsigned char _msg_flags;
msg_t _in_progress;
const bool _zero_copy;
const int64_t _max_msg_size;
const bool _must_mask;
uint64_t _size;
zmq::ws_protocol_t::opcode_t _opcode;
unsigned char _mask[4];
ws_decoder_t (const ws_decoder_t &);
void operator= (const ws_decoder_t &);
};
}
#endif
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "precompiled.hpp"
#include "ws_protocol.hpp"
#include "ws_encoder.hpp"
#include "msg.hpp"
#include "likely.hpp"
#include "wire.hpp"
#include "random.hpp"
#include <limits.h>
zmq::ws_encoder_t::ws_encoder_t (size_t bufsize_, bool must_mask_) :
encoder_base_t<ws_encoder_t> (bufsize_),
_must_mask (must_mask_)
{
// Write 0 bytes to the batch and go to message_ready state.
next_step (NULL, 0, &ws_encoder_t::message_ready, true);
_masked_msg.init ();
}
zmq::ws_encoder_t::~ws_encoder_t ()
{
_masked_msg.close ();
}
void zmq::ws_encoder_t::message_ready ()
{
int offset = 0;
// TODO: it might be close/ping/pong, which should be different op code
_tmp_buf[offset++] = 0x82; // Final | binary
_tmp_buf[offset] = _must_mask ? 0x80 : 0x00;
size_t size = in_progress ()->size ();
size++; // TODO: check if binary
if (size <= 125)
_tmp_buf[offset++] |= (unsigned char) (size & 127);
else if (size <= 0xFFFF) {
_tmp_buf[offset++] |= 126;
_tmp_buf[offset++] = (unsigned char) ((size >> 8) & 0xFF);
_tmp_buf[offset++] = (unsigned char) (size & 0xFF);
} else {
_tmp_buf[offset++] |= 127;
put_uint64 (_tmp_buf + offset, size);
offset += 8;
}
if (_must_mask) {
uint32_t random = generate_random ();
put_uint32 (_tmp_buf + offset, random);
put_uint32 (_mask, random);
offset += 4;
}
// TODO: check if binary
// Encode flags.
unsigned char protocol_flags = 0;
if (in_progress ()->flags () & msg_t::more)
protocol_flags |= ws_protocol_t::more_flag;
if (in_progress ()->flags () & msg_t::command)
protocol_flags |= ws_protocol_t::command_flag;
_tmp_buf[offset++] =
_must_mask ? protocol_flags ^ _mask[0] : protocol_flags;
next_step (_tmp_buf, offset, &ws_encoder_t::size_ready, false);
}
void zmq::ws_encoder_t::size_ready ()
{
if (_must_mask) {
assert (in_progress () != &_masked_msg);
size_t size = in_progress ()->size ();
_masked_msg.close ();
_masked_msg.init_size (size);
int mask_index = 1; // TODO: check if binary message
unsigned char *dest = (unsigned char *) _masked_msg.data ();
unsigned char *src = (unsigned char *) in_progress ()->data ();
for (size_t i = 0; i < in_progress ()->size (); ++i, mask_index++)
dest[i] = src[i] ^ _mask[mask_index % 4];
next_step (_masked_msg.data (), _masked_msg.size (),
&ws_encoder_t::message_ready, true);
} else {
next_step (in_progress ()->data (), in_progress ()->size (),
&ws_encoder_t::message_ready, true);
}
}
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_WS_ENCODER_HPP_INCLUDED__
#define __ZMQ_WS_ENCODER_HPP_INCLUDED__
#include "encoder.hpp"
namespace zmq
{
// Encoder for web socket framing protocol. Converts messages into data stream.
class ws_encoder_t : public encoder_base_t<ws_encoder_t>
{
public:
ws_encoder_t (size_t bufsize_, bool must_mask_);
virtual ~ws_encoder_t ();
private:
void size_ready ();
void message_ready ();
unsigned char _tmp_buf[16];
bool _must_mask;
unsigned char _mask[4];
msg_t _masked_msg;
ws_encoder_t (const ws_encoder_t &);
const ws_encoder_t &operator= (const ws_encoder_t &);
};
}
#endif
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "precompiled.hpp"
#if !defined ZMQ_HAVE_WINDOWS
#include <sys/types.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#ifdef ZMQ_HAVE_VXWORKS
#include <sockLib.h>
#endif
#endif
#include "tcp.hpp"
#include "ws_engine.hpp"
#include "session_base.hpp"
#include "err.hpp"
#include "ip.hpp"
#include "random.hpp"
#include "../external/sha1/sha1.h"
#include "ws_decoder.hpp"
#include "ws_encoder.hpp"
#ifdef ZMQ_HAVE_WINDOWS
#define strcasecmp _stricmp
#endif
// OSX uses a different name for this socket option
#ifndef IPV6_ADD_MEMBERSHIP
#define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
#endif
#ifdef __APPLE__
#include <TargetConditionals.h>
#endif
static int
encode_base64 (const unsigned char *in, int in_len, char *out, int out_len);
zmq::ws_engine_t::ws_engine_t (fd_t fd_,
const options_t &options_,
const endpoint_uri_pair_t &endpoint_uri_pair_,
bool client_) :
_client (client_),
_plugged (false),
_socket (NULL),
_fd (fd_),
_session (NULL),
_handle (static_cast<handle_t> (NULL)),
_options (options_),
_endpoint_uri_pair (endpoint_uri_pair_),
_handshaking (true),
_client_handshake_state (client_handshake_initial),
_server_handshake_state (handshake_initial),
_header_name_position (0),
_header_value_position (0),
_header_upgrade_websocket (false),
_header_connection_upgrade (false),
_websocket_protocol (false),
_input_stopped (false),
_decoder (NULL),
_inpos (NULL),
_insize (0),
_output_stopped (false),
_outpos (NULL),
_outsize (0),
_encoder (NULL),
_sent_routing_id (false),
_received_routing_id (false)
{
// Put the socket into non-blocking mode.
unblock_socket (_fd);
memset (_websocket_key, 0, MAX_HEADER_VALUE_LENGTH + 1);
memset (_websocket_accept, 0, MAX_HEADER_VALUE_LENGTH + 1);
int rc = _tx_msg.init ();
errno_assert (rc == 0);
}
zmq::ws_engine_t::~ws_engine_t ()
{
zmq_assert (!_plugged);
if (_fd != retired_fd) {
#ifdef ZMQ_HAVE_WINDOWS
int rc = closesocket (_fd);
wsa_assert (rc != SOCKET_ERROR);
#else
int rc = close (_fd);
errno_assert (rc == 0);
#endif
_fd = retired_fd;
}
int rc = _tx_msg.close ();
errno_assert (rc == 0);
LIBZMQ_DELETE (_encoder);
LIBZMQ_DELETE (_decoder);
}
void zmq::ws_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_)
{
zmq_assert (!_plugged);
_plugged = true;
zmq_assert (!_session);
zmq_assert (session_);
_session = session_;
_socket = _session->get_socket ();
// Connect to I/O threads poller object.
io_object_t::plug (io_thread_);
_handle = add_fd (_fd);
if (_client) {
unsigned char nonce[16];
int *p = (int *) nonce;
// The nonce doesn't have to be secure one, it is just use to avoid proxy cache
*p = zmq::generate_random ();
*(p + 1) = zmq::generate_random ();
*(p + 2) = zmq::generate_random ();
*(p + 3) = zmq::generate_random ();
int size =
encode_base64 (nonce, 16, _websocket_key, MAX_HEADER_VALUE_LENGTH);
assert (size > 0);
size = snprintf (
(char *) _write_buffer, WS_BUFFER_SIZE,
"GET / HTTP/1.1\r\n"
"Host: server.example.com\r\n" // TODO: we need the address here
"Upgrade: websocket\r\n"
"Connection: Upgrade\r\n"
"Sec-WebSocket-Key: %s\r\n"
"Sec-WebSocket-Protocol: ZWS2.0\r\n"
"Sec-WebSocket-Version: 13\r\n\r\n",
_websocket_key);
assert (size > 0 && size < WS_BUFFER_SIZE);
_outpos = _write_buffer;
_outsize = size;
_output_stopped = false;
set_pollout (_handle);
} else
_output_stopped = true;
_input_stopped = false;
set_pollin (_handle);
in_event ();
}
void zmq::ws_engine_t::unplug ()
{
zmq_assert (_plugged);
_plugged = false;
rm_fd (_handle);
// Disconnect from I/O threads poller object.
io_object_t::unplug ();
}
void zmq::ws_engine_t::terminate ()
{
unplug ();
delete this;
}
void zmq::ws_engine_t::in_event ()
{
if (_handshaking) {
if (_client) {
if (!client_handshake ())
return;
} else if (!server_handshake ())
return;
}
zmq_assert (_decoder);
// If there's no data to process in the buffer...
if (_insize == 0) {
// Retrieve the buffer and read as much data as possible.
// Note that buffer can be arbitrarily large. However, we assume
// the underlying TCP layer has fixed buffer size and thus the
// number of bytes read will be always limited.
size_t bufsize = 0;
_decoder->get_buffer (&_inpos, &bufsize);
const int rc = tcp_read (_fd, _inpos, bufsize);
if (rc == 0) {
// connection closed by peer
errno = EPIPE;
error (zmq::stream_engine_t::connection_error);
return;
}
if (rc == -1) {
if (errno != EAGAIN) {
error (zmq::stream_engine_t::connection_error);
return;
}
return;
}
// Adjust input size
_insize = static_cast<size_t> (rc);
// Adjust buffer size to received bytes
_decoder->resize_buffer (_insize);
}
int rc = 0;
size_t processed = 0;
while (_insize > 0) {
rc = _decoder->decode (_inpos, _insize, processed);
zmq_assert (processed <= _insize);
_inpos += processed;
_insize -= processed;
if (rc == 0 || rc == -1)
break;
if (!_received_routing_id) {
_received_routing_id = true;
if (_options.recv_routing_id)
_decoder->msg ()->set_flags (msg_t::routing_id);
else {
_decoder->msg ()->close ();
_decoder->msg ()->init ();
continue;
}
}
rc = _session->push_msg (_decoder->msg ());
if (rc == -1)
break;
}
// Tear down the connection if we have failed to decode input data
// or the session has rejected the message.
if (rc == -1) {
if (errno != EAGAIN) {
error (zmq::stream_engine_t::protocol_error);
return;
}
_input_stopped = true;
reset_pollin (_handle);
}
_session->flush ();
return;
}
void zmq::ws_engine_t::out_event ()
{
// If write buffer is empty, try to read new data from the encoder.
if (!_outsize) {
// Even when we stop polling as soon as there is no
// data to send, the poller may invoke out_event one
// more time due to 'speculative write' optimisation.
if (unlikely (_encoder == NULL)) {
zmq_assert (_handshaking);
return;
}
_outpos = NULL;
_outsize = _encoder->encode (&_outpos, 0);
while (_outsize < static_cast<size_t> (_options.out_batch_size)) {
if (!_sent_routing_id) {
_tx_msg.close ();
int rc = _tx_msg.init_size (_options.routing_id_size);
errno_assert (rc == 0);
if (_options.routing_id_size > 0)
memcpy (_tx_msg.data (), _options.routing_id,
_options.routing_id_size);
_sent_routing_id = true;
} else if (_session->pull_msg (&_tx_msg) == -1)
break;
_encoder->load_msg (&_tx_msg);
unsigned char *bufptr = _outpos + _outsize;
size_t n =
_encoder->encode (&bufptr, _options.out_batch_size - _outsize);
zmq_assert (n > 0);
if (_outpos == NULL)
_outpos = bufptr;
_outsize += n;
}
// If there is no data to send, stop polling for output.
if (_outsize == 0) {
_output_stopped = true;
reset_pollout (_handle);
return;
}
}
// If there are any data to write in write buffer, write as much as
// possible to the socket. Note that amount of data to write can be
// arbitrarily large. However, we assume that underlying TCP layer has
// limited transmission buffer and thus the actual number of bytes
// written should be reasonably modest.
const int nbytes = tcp_write (_fd, _outpos, _outsize);
// IO error has occurred. We stop waiting for output events.
// The engine is not terminated until we detect input error;
// this is necessary to prevent losing incoming messages.
if (nbytes == -1) {
_output_stopped = true;
reset_pollout (_handle);
return;
}
_outpos += nbytes;
_outsize -= nbytes;
// If we are still handshaking and there are no data
// to send, stop polling for output.
if (unlikely (_handshaking))
if (_outsize == 0) {
_output_stopped = true;
reset_pollout (_handle);
}
}
const zmq::endpoint_uri_pair_t &zmq::ws_engine_t::get_endpoint () const
{
return _endpoint_uri_pair;
}
void zmq::ws_engine_t::restart_output ()
{
if (likely (_output_stopped)) {
set_pollout (_handle);
_output_stopped = false;
}
}
bool zmq::ws_engine_t::server_handshake ()
{
int nbytes = tcp_read (_fd, _read_buffer, WS_BUFFER_SIZE);
if (nbytes == 0) {
errno = EPIPE;
error (zmq::stream_engine_t::connection_error);
return false;
} else if (nbytes == -1) {
if (errno != EAGAIN)
error (zmq::stream_engine_t::connection_error);
return false;
}
_inpos = _read_buffer;
_insize = nbytes;
while (_insize > 0) {
char c = (char) *_inpos;
switch (_server_handshake_state) {
case handshake_initial:
if (c == 'G')
_server_handshake_state = request_line_G;
else
_server_handshake_state = handshake_error;
break;
case request_line_G:
if (c == 'E')
_server_handshake_state = request_line_GE;
else
_server_handshake_state = handshake_error;
break;
case request_line_GE:
if (c == 'T')
_server_handshake_state = request_line_GET;
else
_server_handshake_state = handshake_error;
break;
case request_line_GET:
if (c == ' ')
_server_handshake_state = request_line_GET_space;
else
_server_handshake_state = handshake_error;
break;
case request_line_GET_space:
if (c == '\r' || c == '\n')
_server_handshake_state = handshake_error;
// TODO: instead of check what is not allowed check what is allowed
if (c != ' ')
_server_handshake_state = request_line_resource;
else
_server_handshake_state = request_line_GET_space;
break;
case request_line_resource:
if (c == '\r' || c == '\n')
_server_handshake_state = handshake_error;
else if (c == ' ')
_server_handshake_state = request_line_resource_space;
else
_server_handshake_state = request_line_resource;
break;
case request_line_resource_space:
if (c == 'H')
_server_handshake_state = request_line_H;
else
_server_handshake_state = handshake_error;
break;
case request_line_H:
if (c == 'T')
_server_handshake_state = request_line_HT;
else
_server_handshake_state = handshake_error;
break;
case request_line_HT:
if (c == 'T')
_server_handshake_state = request_line_HTT;
else
_server_handshake_state = handshake_error;
break;
case request_line_HTT:
if (c == 'P')
_server_handshake_state = request_line_HTTP;
else
_server_handshake_state = handshake_error;
break;
case request_line_HTTP:
if (c == '/')
_server_handshake_state = request_line_HTTP_slash;
else
_server_handshake_state = handshake_error;
break;
case request_line_HTTP_slash:
if (c == '1')
_server_handshake_state = request_line_HTTP_slash_1;
else
_server_handshake_state = handshake_error;
break;
case request_line_HTTP_slash_1:
if (c == '.')
_server_handshake_state = request_line_HTTP_slash_1_dot;
else
_server_handshake_state = handshake_error;
break;
case request_line_HTTP_slash_1_dot:
if (c == '1')
_server_handshake_state = request_line_HTTP_slash_1_dot_1;
else
_server_handshake_state = handshake_error;
break;
case request_line_HTTP_slash_1_dot_1:
if (c == '\r')
_server_handshake_state = request_line_cr;
else
_server_handshake_state = handshake_error;
break;
case request_line_cr:
if (c == '\n')
_server_handshake_state = header_field_begin_name;
else
_server_handshake_state = handshake_error;
break;
case header_field_begin_name:
switch (c) {
case '\r':
_server_handshake_state = handshake_end_line_cr;
break;
case '\n':
_server_handshake_state = handshake_error;
break;
default:
_header_name[0] = (char) c;
_header_name_position = 1;
_server_handshake_state = header_field_name;
break;
}
break;
case header_field_name:
if (c == '\r' || c == '\n')
_server_handshake_state = handshake_error;
else if (c == ':') {
_header_name[_header_name_position] = '\0';
_server_handshake_state = header_field_colon;
} else if (_header_name_position + 1 > MAX_HEADER_NAME_LENGTH)
_server_handshake_state = handshake_error;
else {
_header_name[_header_name_position] = c;
_header_name_position++;
_server_handshake_state = header_field_name;
}
break;
case header_field_colon:
case header_field_value_trailing_space:
if (c == '\n')
_server_handshake_state = handshake_error;
else if (c == '\r')
_server_handshake_state = header_field_cr;
else if (c == ' ')
_server_handshake_state = header_field_value_trailing_space;
else {
_header_value[0] = c;
_header_value_position = 1;
_server_handshake_state = header_field_value;
}
break;
case header_field_value:
if (c == '\n')
_server_handshake_state = handshake_error;
else if (c == '\r') {
_header_value[_header_value_position] = '\0';
if (strcasecmp ("upgrade", _header_name) == 0)
_header_upgrade_websocket =
strcasecmp ("websocket", _header_value) == 0;
else if (strcasecmp ("connection", _header_name) == 0)
_header_connection_upgrade =
strcasecmp ("upgrade", _header_value) == 0;
else if (strcasecmp ("Sec-WebSocket-Key", _header_name)
== 0)
strcpy (_websocket_key, _header_value);
else if (strcasecmp ("Sec-WebSocket-Protocol", _header_name)
== 0)
_websocket_protocol =
true; // TODO: check if the value is ZWS2.0
_server_handshake_state = header_field_cr;
} else if (_header_value_position + 1 > MAX_HEADER_VALUE_LENGTH)
_server_handshake_state = handshake_error;
else {
_header_value[_header_value_position] = c;
_header_value_position++;
_server_handshake_state = header_field_value;
}
break;
case header_field_cr:
if (c == '\n')
_server_handshake_state = header_field_begin_name;
else
_server_handshake_state = handshake_error;
break;
case handshake_end_line_cr:
if (c == '\n') {
if (_header_connection_upgrade && _header_upgrade_websocket
&& _websocket_protocol && _websocket_key[0] != '\0') {
_server_handshake_state = handshake_complete;
_handshaking = false;
// TODO: check which decoder/encoder to use according to selected protocol
_encoder = new (std::nothrow)
ws_encoder_t (_options.out_batch_size, false);
alloc_assert (_encoder);
_decoder = new (std::nothrow) ws_decoder_t (
_options.in_batch_size, _options.maxmsgsize,
_options.zero_copy, true);
alloc_assert (_decoder);
_socket->event_handshake_succeeded (_endpoint_uri_pair,
0);
const char *magic_string =
"258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
char plain[MAX_HEADER_VALUE_LENGTH + 36 + 1];
strcpy (plain, _websocket_key);
strcat (plain, magic_string);
sha1_ctxt ctx;
SHA1_Init (&ctx);
SHA1_Update (&ctx, (unsigned char *) _websocket_key,
strlen (_websocket_key));
SHA1_Update (&ctx, (unsigned char *) magic_string,
strlen (magic_string));
unsigned char hash[SHA_DIGEST_LENGTH];
SHA1_Final (hash, &ctx);
int accept_key_len = encode_base64 (
hash, SHA_DIGEST_LENGTH, _websocket_accept,
MAX_HEADER_VALUE_LENGTH);
assert (accept_key_len > 0);
_websocket_accept[accept_key_len] = '\0';
int written =
snprintf ((char *) _write_buffer, WS_BUFFER_SIZE,
"HTTP/1.1 101 Switching Protocols\r\n"
"Upgrade: websocket\r\n"
"Connection: Upgrade\r\n"
"Sec-WebSocket-Accept: %s\r\n"
"Sec-WebSocket-Protocol: ZWS2.0\r\n"
"\r\n",
_websocket_accept);
assert (written >= 0 && written < WS_BUFFER_SIZE);
_outpos = _write_buffer;
_outsize = written;
if (_output_stopped)
restart_output ();
} else
_server_handshake_state = handshake_error;
} else
_server_handshake_state = handshake_error;
break;
case handshake_complete:
// no more bytes are allowed after complete
_server_handshake_state = handshake_error;
default:
assert (false);
}
_inpos++;
_insize--;
if (_server_handshake_state == handshake_error) {
// TODO: send bad request
_socket->event_handshake_failed_protocol (
_endpoint_uri_pair, ZMQ_PROTOCOL_ERROR_WS_UNSPECIFIED);
error (zmq::stream_engine_t::protocol_error);
return false;
}
}
return _server_handshake_state == handshake_complete;
}
bool zmq::ws_engine_t::client_handshake ()
{
int nbytes = tcp_read (_fd, _read_buffer, WS_BUFFER_SIZE);
if (nbytes == 0) {
errno = EPIPE;
error (zmq::stream_engine_t::connection_error);
return false;
} else if (nbytes == -1) {
if (errno != EAGAIN)
error (zmq::stream_engine_t::connection_error);
return false;
}
_inpos = _read_buffer;
_insize = nbytes;
while (_insize > 0) {
char c = (char) *_inpos;
switch (_client_handshake_state) {
case client_handshake_initial:
if (c == 'H')
_client_handshake_state = response_line_H;
else
_client_handshake_state = client_handshake_error;
break;
case response_line_H:
if (c == 'T')
_client_handshake_state = response_line_HT;
else
_client_handshake_state = client_handshake_error;
break;
case response_line_HT:
if (c == 'T')
_client_handshake_state = response_line_HTT;
else
_client_handshake_state = client_handshake_error;
break;
case response_line_HTT:
if (c == 'P')
_client_handshake_state = response_line_HTTP;
else
_client_handshake_state = client_handshake_error;
break;
case response_line_HTTP:
if (c == '/')
_client_handshake_state = response_line_HTTP_slash;
else
_client_handshake_state = client_handshake_error;
break;
case response_line_HTTP_slash:
if (c == '1')
_client_handshake_state = response_line_HTTP_slash_1;
else
_client_handshake_state = client_handshake_error;
break;
case response_line_HTTP_slash_1:
if (c == '.')
_client_handshake_state = response_line_HTTP_slash_1_dot;
else
_client_handshake_state = client_handshake_error;
break;
case response_line_HTTP_slash_1_dot:
if (c == '1')
_client_handshake_state = response_line_HTTP_slash_1_dot_1;
else
_client_handshake_state = client_handshake_error;
break;
case response_line_HTTP_slash_1_dot_1:
if (c == ' ')
_client_handshake_state =
response_line_HTTP_slash_1_dot_1_space;
else
_client_handshake_state = client_handshake_error;
break;
case response_line_HTTP_slash_1_dot_1_space:
if (c == ' ')
_client_handshake_state =
response_line_HTTP_slash_1_dot_1_space;
else if (c == '1')
_client_handshake_state = response_line_status_1;
else
_client_handshake_state = client_handshake_error;
break;
case response_line_status_1:
if (c == '0')
_client_handshake_state = response_line_status_10;
else
_client_handshake_state = client_handshake_error;
break;
case response_line_status_10:
if (c == '1')
_client_handshake_state = response_line_status_101;
else
_client_handshake_state = client_handshake_error;
break;
case response_line_status_101:
if (c == ' ')
_client_handshake_state = response_line_status_101_space;
else
_client_handshake_state = client_handshake_error;
break;
case response_line_status_101_space:
if (c == ' ')
_client_handshake_state = response_line_status_101_space;
else if (c == 'S')
_client_handshake_state = response_line_s;
else
_client_handshake_state = client_handshake_error;
break;
case response_line_s:
if (c == 'w')
_client_handshake_state = response_line_sw;
else
_client_handshake_state = client_handshake_error;
break;
case response_line_sw:
if (c == 'i')
_client_handshake_state = response_line_swi;
else
_client_handshake_state = client_handshake_error;
break;
case response_line_swi:
if (c == 't')
_client_handshake_state = response_line_swit;
else
_client_handshake_state = client_handshake_error;
break;
case response_line_swit:
if (c == 'c')
_client_handshake_state = response_line_switc;
else
_client_handshake_state = client_handshake_error;
break;
case response_line_switc:
if (c == 'h')
_client_handshake_state = response_line_switch;
else
_client_handshake_state = client_handshake_error;
break;
case response_line_switch:
if (c == 'i')
_client_handshake_state = response_line_switchi;
else
_client_handshake_state = client_handshake_error;
break;
case response_line_switchi:
if (c == 'n')
_client_handshake_state = response_line_switchin;
else
_client_handshake_state = client_handshake_error;
break;
case response_line_switchin:
if (c == 'g')
_client_handshake_state = response_line_switching;
else
_client_handshake_state = client_handshake_error;
break;
case response_line_switching:
if (c == ' ')
_client_handshake_state = response_line_switching_space;
else
_client_handshake_state = client_handshake_error;
break;
case response_line_switching_space:
if (c == 'P')
_client_handshake_state = response_line_p;
else
_client_handshake_state = client_handshake_error;
break;
case response_line_p:
if (c == 'r')
_client_handshake_state = response_line_pr;
else
_client_handshake_state = client_handshake_error;
break;
case response_line_pr:
if (c == 'o')
_client_handshake_state = response_line_pro;
else
_client_handshake_state = client_handshake_error;
break;
case response_line_pro:
if (c == 't')
_client_handshake_state = response_line_prot;
else
_client_handshake_state = client_handshake_error;
break;
case response_line_prot:
if (c == 'o')
_client_handshake_state = response_line_proto;
else
_client_handshake_state = client_handshake_error;
break;
case response_line_proto:
if (c == 'c')
_client_handshake_state = response_line_protoc;
else
_client_handshake_state = client_handshake_error;
break;
case response_line_protoc:
if (c == 'o')
_client_handshake_state = response_line_protoco;
else
_client_handshake_state = client_handshake_error;
break;
case response_line_protoco:
if (c == 'l')
_client_handshake_state = response_line_protocol;
else
_client_handshake_state = client_handshake_error;
break;
case response_line_protocol:
if (c == 's')
_client_handshake_state = response_line_protocols;
else
_client_handshake_state = client_handshake_error;
break;
case response_line_protocols:
if (c == '\r')
_client_handshake_state = response_line_cr;
else
_client_handshake_state = client_handshake_error;
break;
case response_line_cr:
if (c == '\n')
_client_handshake_state = client_header_field_begin_name;
else
_client_handshake_state = client_handshake_error;
break;
case client_header_field_begin_name:
switch (c) {
case '\r':
_client_handshake_state = client_handshake_end_line_cr;
break;
case '\n':
_client_handshake_state = client_handshake_error;
break;
default:
_header_name[0] = (char) c;
_header_name_position = 1;
_client_handshake_state = client_header_field_name;
break;
}
break;
case client_header_field_name:
if (c == '\r' || c == '\n')
_client_handshake_state = client_handshake_error;
else if (c == ':') {
_header_name[_header_name_position] = '\0';
_client_handshake_state = client_header_field_colon;
} else if (_header_name_position + 1 > MAX_HEADER_NAME_LENGTH)
_client_handshake_state = client_handshake_error;
else {
_header_name[_header_name_position] = c;
_header_name_position++;
_client_handshake_state = client_header_field_name;
}
break;
case client_header_field_colon:
case client_header_field_value_trailing_space:
if (c == '\n')
_client_handshake_state = client_handshake_error;
else if (c == '\r')
_client_handshake_state = client_header_field_cr;
else if (c == ' ')
_client_handshake_state =
client_header_field_value_trailing_space;
else {
_header_value[0] = c;
_header_value_position = 1;
_client_handshake_state = client_header_field_value;
}
break;
case client_header_field_value:
if (c == '\n')
_client_handshake_state = client_handshake_error;
else if (c == '\r') {
_header_value[_header_value_position] = '\0';
if (strcasecmp ("upgrade", _header_name) == 0)
_header_upgrade_websocket =
strcasecmp ("websocket", _header_value) == 0;
else if (strcasecmp ("connection", _header_name) == 0)
_header_connection_upgrade =
strcasecmp ("upgrade", _header_value) == 0;
else if (strcasecmp ("Sec-WebSocket-Accept", _header_name)
== 0)
strcpy (_websocket_accept, _header_value);
else if (strcasecmp ("Sec-WebSocket-Protocol", _header_name)
== 0)
_websocket_protocol = true; // TODO: check if ZWS2.0
_client_handshake_state = client_header_field_cr;
} else if (_header_value_position + 1 > MAX_HEADER_VALUE_LENGTH)
_client_handshake_state = client_handshake_error;
else {
_header_value[_header_value_position] = c;
_header_value_position++;
_client_handshake_state = client_header_field_value;
}
break;
case client_header_field_cr:
if (c == '\n')
_client_handshake_state = client_header_field_begin_name;
else
_client_handshake_state = client_handshake_error;
break;
case client_handshake_end_line_cr:
if (c == '\n') {
if (_header_connection_upgrade && _header_upgrade_websocket
&& _websocket_protocol
&& _websocket_accept[0] != '\0') {
_client_handshake_state = client_handshake_complete;
_handshaking = false;
_encoder = new (std::nothrow)
ws_encoder_t (_options.out_batch_size, true);
alloc_assert (_encoder);
_decoder = new (std::nothrow) ws_decoder_t (
_options.in_batch_size, _options.maxmsgsize,
_options.zero_copy, false);
alloc_assert (_decoder);
_socket->event_handshake_succeeded (_endpoint_uri_pair,
0);
// TODO: validate accept key
if (_output_stopped)
restart_output ();
_inpos++;
_insize--;
return true;
} else
_client_handshake_state = client_handshake_error;
} else
_client_handshake_state = client_handshake_error;
break;
default:
assert (false);
}
_inpos++;
_insize--;
if (_client_handshake_state == client_handshake_error) {
_socket->event_handshake_failed_protocol (
_endpoint_uri_pair, ZMQ_PROTOCOL_ERROR_WS_UNSPECIFIED);
error (zmq::stream_engine_t::protocol_error);
return false;
}
}
return false;
}
void zmq::ws_engine_t::error (zmq::stream_engine_t::error_reason_t reason_)
{
zmq_assert (_session);
if (reason_ != zmq::stream_engine_t::protocol_error && _handshaking) {
int err = errno;
_socket->event_handshake_failed_no_detail (_endpoint_uri_pair, err);
}
_socket->event_disconnected (_endpoint_uri_pair, _fd);
_session->flush ();
_session->engine_error (reason_);
unplug ();
delete this;
}
bool zmq::ws_engine_t::restart_input ()
{
zmq_assert (_input_stopped);
_input_stopped = false;
set_pollin (_handle);
in_event ();
return true;
}
static int
encode_base64 (const unsigned char *in, int in_len, char *out, int out_len)
{
static const unsigned char base64enc_tab[65] =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
int ii, io;
uint32_t v;
int rem;
for (io = 0, ii = 0, v = 0, rem = 0; ii < in_len; ii++) {
unsigned char ch;
ch = in[ii];
v = (v << 8) | ch;
rem += 8;
while (rem >= 6) {
rem -= 6;
if (io >= out_len)
return -1; /* truncation is failure */
out[io++] = base64enc_tab[(v >> rem) & 63];
}
}
if (rem) {
v <<= (6 - rem);
if (io >= out_len)
return -1; /* truncation is failure */
out[io++] = base64enc_tab[v & 63];
}
while (io & 3) {
if (io >= out_len)
return -1; /* truncation is failure */
out[io++] = '=';
}
if (io >= out_len)
return -1; /* no room for null terminator */
out[io] = 0;
return io;
}
#ifndef __ZMQ_WS_ENGINE_HPP_INCLUDED__
#define __ZMQ_WS_ENGINE_HPP_INCLUDED__
#include "io_object.hpp"
#include "i_engine.hpp"
#include "address.hpp"
#include "msg.hpp"
#include "stream_engine.hpp"
#define WS_BUFFER_SIZE 8192
#define MAX_HEADER_NAME_LENGTH 1024
#define MAX_HEADER_VALUE_LENGTH 2048
namespace zmq
{
class io_thread_t;
class session_base_t;
typedef enum
{
handshake_initial = 0,
request_line_G,
request_line_GE,
request_line_GET,
request_line_GET_space,
request_line_resource,
request_line_resource_space,
request_line_H,
request_line_HT,
request_line_HTT,
request_line_HTTP,
request_line_HTTP_slash,
request_line_HTTP_slash_1,
request_line_HTTP_slash_1_dot,
request_line_HTTP_slash_1_dot_1,
request_line_cr,
header_field_begin_name,
header_field_name,
header_field_colon,
header_field_value_trailing_space,
header_field_value,
header_field_cr,
handshake_end_line_cr,
handshake_complete,
handshake_error = -1
} ws_server_handshake_state_t;
typedef enum
{
client_handshake_initial = 0,
response_line_H,
response_line_HT,
response_line_HTT,
response_line_HTTP,
response_line_HTTP_slash,
response_line_HTTP_slash_1,
response_line_HTTP_slash_1_dot,
response_line_HTTP_slash_1_dot_1,
response_line_HTTP_slash_1_dot_1_space,
response_line_status_1,
response_line_status_10,
response_line_status_101,
response_line_status_101_space,
response_line_s,
response_line_sw,
response_line_swi,
response_line_swit,
response_line_switc,
response_line_switch,
response_line_switchi,
response_line_switchin,
response_line_switching,
response_line_switching_space,
response_line_p,
response_line_pr,
response_line_pro,
response_line_prot,
response_line_proto,
response_line_protoc,
response_line_protoco,
response_line_protocol,
response_line_protocols,
response_line_cr,
client_header_field_begin_name,
client_header_field_name,
client_header_field_colon,
client_header_field_value_trailing_space,
client_header_field_value,
client_header_field_cr,
client_handshake_end_line_cr,
client_handshake_complete,
client_handshake_error = -1
} ws_client_handshake_state_t;
class ws_engine_t : public io_object_t, public i_engine
{
public:
ws_engine_t (fd_t fd_,
const options_t &options_,
const endpoint_uri_pair_t &endpoint_uri_pair_,
bool client_);
~ws_engine_t ();
// i_engine interface implementation.
// Plug the engine to the session.
void plug (zmq::io_thread_t *io_thread_, class session_base_t *session_);
// Terminate and deallocate the engine. Note that 'detached'
// events are not fired on termination.
void terminate ();
// This method is called by the session to signalise that more
// messages can be written to the pipe.
bool restart_input ();
// This method is called by the session to signalise that there
// are messages to send available.
void restart_output ();
void zap_msg_available (){};
void in_event ();
void out_event ();
const endpoint_uri_pair_t &get_endpoint () const;
private:
bool client_handshake ();
bool server_handshake ();
void error (zmq::stream_engine_t::error_reason_t reason_);
void unplug ();
bool _client;
bool _plugged;
socket_base_t *_socket;
fd_t _fd;
session_base_t *_session;
handle_t _handle;
options_t _options;
// Representation of the connected endpoints.
const endpoint_uri_pair_t _endpoint_uri_pair;
bool _handshaking;
ws_client_handshake_state_t _client_handshake_state;
ws_server_handshake_state_t _server_handshake_state;
unsigned char _read_buffer[WS_BUFFER_SIZE];
unsigned char _write_buffer[WS_BUFFER_SIZE];
char _header_name[MAX_HEADER_NAME_LENGTH + 1];
int _header_name_position;
char _header_value[MAX_HEADER_VALUE_LENGTH + 1];
int _header_value_position;
bool _header_upgrade_websocket;
bool _header_connection_upgrade;
bool _websocket_protocol;
char _websocket_key[MAX_HEADER_VALUE_LENGTH + 1];
char _websocket_accept[MAX_HEADER_VALUE_LENGTH + 1];
bool _input_stopped;
i_decoder *_decoder;
unsigned char *_inpos;
size_t _insize;
bool _output_stopped;
unsigned char *_outpos;
size_t _outsize;
i_encoder *_encoder;
bool _sent_routing_id;
bool _received_routing_id;
msg_t _tx_msg;
};
}
#endif
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "precompiled.hpp"
#include <new>
#include <string>
#include <stdio.h>
#include "ws_listener.hpp"
#include "io_thread.hpp"
#include "config.hpp"
#include "err.hpp"
#include "ip.hpp"
#include "tcp.hpp"
#include "socket_base.hpp"
#include "address.hpp"
#include "ws_engine.hpp"
#include "session_base.hpp"
#ifndef ZMQ_HAVE_WINDOWS
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <netinet/in.h>
#include <netdb.h>
#include <fcntl.h>
#ifdef ZMQ_HAVE_VXWORKS
#include <sockLib.h>
#endif
#endif
#ifdef ZMQ_HAVE_OPENVMS
#include <ioctl.h>
#endif
zmq::ws_listener_t::ws_listener_t (io_thread_t *io_thread_,
socket_base_t *socket_,
const options_t &options_) :
stream_listener_base_t (io_thread_, socket_, options_)
{
}
void zmq::ws_listener_t::in_event ()
{
fd_t fd = accept ();
// If connection was reset by the peer in the meantime, just ignore it.
// TODO: Handle specific errors like ENFILE/EMFILE etc.
if (fd == retired_fd) {
_socket->event_accept_failed (
make_unconnected_bind_endpoint_pair (_endpoint), zmq_errno ());
return;
}
int rc = tune_tcp_socket (fd);
rc = rc | tune_tcp_maxrt (fd, options.tcp_maxrt);
if (rc != 0) {
_socket->event_accept_failed (
make_unconnected_bind_endpoint_pair (_endpoint), zmq_errno ());
return;
}
// Create the engine object for this connection.
create_engine (fd);
}
std::string zmq::ws_listener_t::get_socket_name (zmq::fd_t fd_,
socket_end_t socket_end_) const
{
return zmq::get_socket_name<tcp_address_t> (fd_, socket_end_);
}
int zmq::ws_listener_t::create_socket (const char *addr_)
{
_s = tcp_open_socket (addr_, options, true, true, &_address);
if (_s == retired_fd) {
return -1;
}
// TODO why is this only done for the listener?
make_socket_noninheritable (_s);
// Allow reusing of the address.
int flag = 1;
int rc;
#ifdef ZMQ_HAVE_WINDOWS
// TODO this was changed for Windows from SO_REUSEADDRE to
// SE_EXCLUSIVEADDRUSE by 0ab65324195ad70205514d465b03d851a6de051c,
// so the comment above is no longer correct; also, now the settings are
// different between listener and connecter with a src address.
// is this intentional?
rc = setsockopt (_s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE,
reinterpret_cast<const char *> (&flag), sizeof (int));
wsa_assert (rc != SOCKET_ERROR);
#elif defined ZMQ_HAVE_VXWORKS
rc =
setsockopt (_s, SOL_SOCKET, SO_REUSEADDR, (char *) &flag, sizeof (int));
errno_assert (rc == 0);
#else
rc = setsockopt (_s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
errno_assert (rc == 0);
#endif
// Bind the socket to the network interface and port.
#if defined ZMQ_HAVE_VXWORKS
rc = bind (_s, (sockaddr *) _address.addr (), _address.addrlen ());
#else
rc = bind (_s, _address.addr (), _address.addrlen ());
#endif
#ifdef ZMQ_HAVE_WINDOWS
if (rc == SOCKET_ERROR) {
errno = wsa_error_to_errno (WSAGetLastError ());
goto error;
}
#else
if (rc != 0)
goto error;
#endif
// Listen for incoming connections.
rc = listen (_s, options.backlog);
#ifdef ZMQ_HAVE_WINDOWS
if (rc == SOCKET_ERROR) {
errno = wsa_error_to_errno (WSAGetLastError ());
goto error;
}
#else
if (rc != 0)
goto error;
#endif
return 0;
error:
int err = errno;
close ();
errno = err;
return -1;
}
int zmq::ws_listener_t::set_local_address (const char *addr_)
{
if (options.use_fd != -1) {
// in this case, the addr_ passed is not used and ignored, since the
// socket was already created by the application
_s = options.use_fd;
} else {
if (create_socket (addr_) == -1)
return -1;
}
_endpoint = get_socket_name (_s, socket_end_local);
_socket->event_listening (make_unconnected_bind_endpoint_pair (_endpoint),
_s);
return 0;
}
zmq::fd_t zmq::ws_listener_t::accept ()
{
// The situation where connection cannot be accepted due to insufficient
// resources is considered valid and treated by ignoring the connection.
// Accept one connection and deal with different failure modes.
zmq_assert (_s != retired_fd);
struct sockaddr_storage ss;
memset (&ss, 0, sizeof (ss));
#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS
int ss_len = sizeof (ss);
#else
socklen_t ss_len = sizeof (ss);
#endif
#if defined ZMQ_HAVE_SOCK_CLOEXEC && defined HAVE_ACCEPT4
fd_t sock = ::accept4 (_s, reinterpret_cast<struct sockaddr *> (&ss),
&ss_len, SOCK_CLOEXEC);
#else
fd_t sock =
::accept (_s, reinterpret_cast<struct sockaddr *> (&ss), &ss_len);
#endif
if (sock == retired_fd) {
#if defined ZMQ_HAVE_WINDOWS
const int last_error = WSAGetLastError ();
wsa_assert (last_error == WSAEWOULDBLOCK || last_error == WSAECONNRESET
|| last_error == WSAEMFILE || last_error == WSAENOBUFS);
#elif defined ZMQ_HAVE_ANDROID
errno_assert (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR
|| errno == ECONNABORTED || errno == EPROTO
|| errno == ENOBUFS || errno == ENOMEM || errno == EMFILE
|| errno == ENFILE || errno == EINVAL);
#else
errno_assert (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR
|| errno == ECONNABORTED || errno == EPROTO
|| errno == ENOBUFS || errno == ENOMEM || errno == EMFILE
|| errno == ENFILE);
#endif
return retired_fd;
}
make_socket_noninheritable (sock);
if (zmq::set_nosigpipe (sock)) {
#ifdef ZMQ_HAVE_WINDOWS
int rc = closesocket (sock);
wsa_assert (rc != SOCKET_ERROR);
#else
int rc = ::close (sock);
errno_assert (rc == 0);
#endif
return retired_fd;
}
// Set the IP Type-Of-Service priority for this client socket
if (options.tos != 0)
set_ip_type_of_service (sock, options.tos);
return sock;
}
void zmq::ws_listener_t::create_engine (fd_t fd)
{
const endpoint_uri_pair_t endpoint_pair (
get_socket_name (fd, socket_end_local),
get_socket_name (fd, socket_end_remote), endpoint_type_bind);
ws_engine_t *engine =
new (std::nothrow) ws_engine_t (fd, options, endpoint_pair, false);
alloc_assert (engine);
// Choose I/O thread to run connecter in. Given that we are already
// running in an I/O thread, there must be at least one available.
io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_assert (io_thread);
// Create and launch a session object.
session_base_t *session =
session_base_t::create (io_thread, false, _socket, options, NULL);
errno_assert (session);
session->inc_seqnum ();
launch_child (session);
send_attach (session, engine, false);
_socket->event_accepted (endpoint_pair, fd);
}
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_WS_LISTENER_HPP_INCLUDED__
#define __ZMQ_WS_LISTENER_HPP_INCLUDED__
#include "fd.hpp"
#include "tcp_address.hpp"
#include "stream_listener_base.hpp"
namespace zmq
{
class ws_listener_t : public stream_listener_base_t
{
public:
ws_listener_t (zmq::io_thread_t *io_thread_,
zmq::socket_base_t *socket_,
const options_t &options_);
// Set address to listen on.
int set_local_address (const char *addr_);
protected:
std::string get_socket_name (fd_t fd_, socket_end_t socket_end_) const;
void create_engine (fd_t fd);
private:
// Handlers for I/O events.
void in_event ();
// Accept the new connection. Returns the file descriptor of the
// newly created connection. The function may return retired_fd
// if the connection was dropped while waiting in the listen backlog
// or was denied because of accept filters.
fd_t accept ();
int create_socket (const char *addr_);
// Address to listen on.
tcp_address_t _address;
ws_listener_t (const ws_listener_t &);
const ws_listener_t &operator= (const ws_listener_t &);
};
}
#endif
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_WS_PROTOCOL_HPP_INCLUDED__
#define __ZMQ_WS_PROTOCOL_HPP_INCLUDED__
namespace zmq
{
// Definition of constants for WS transport protocol.
class ws_protocol_t
{
public:
// Message flags.
enum opcode_t
{
opcode_continuation = 0,
opcode_text = 0x01,
opcode_binary = 0x02,
opcode_close = 0x08,
opcode_ping = 0x09,
opcode_pong = 0xA
};
enum
{
more_flag = 1,
command_flag = 2
};
};
}
#endif
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
SETUP_TEARDOWN_TESTCONTEXT
void test_roundtrip ()
{
void *sb = test_context_socket (ZMQ_REP);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sb, "ws://*:5556"));
void *sc = test_context_socket (ZMQ_REQ);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, "ws://127.0.0.1:5556"));
bounce (sb, sc);
test_context_socket_close (sc);
test_context_socket_close (sb);
}
void test_short_message ()
{
void *sb = test_context_socket (ZMQ_REP);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sb, "ws://*:5557"));
void *sc = test_context_socket (ZMQ_REQ);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, "ws://127.0.0.1:5557"));
zmq_msg_t msg;
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&msg, 255));
for (unsigned char i = 0; i < 255; ++i)
((unsigned char *) zmq_msg_data (&msg))[i] = i;
int rc = zmq_msg_send (&msg, sc, 0);
TEST_ASSERT_EQUAL_INT (255, rc);
rc = zmq_msg_recv (&msg, sb, 0);
TEST_ASSERT_EQUAL_INT (255, rc);
for (unsigned char i = 0; i < 255; ++i)
TEST_ASSERT_EQUAL_INT (i, ((unsigned char *) zmq_msg_data (&msg))[i]);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
test_context_socket_close (sc);
test_context_socket_close (sb);
}
void test_large_message ()
{
void *sb = test_context_socket (ZMQ_REP);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sb, "ws://*:5557"));
void *sc = test_context_socket (ZMQ_REQ);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, "ws://127.0.0.1:5557"));
zmq_msg_t msg;
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&msg, 65536));
for (int i = 0; i < 65536; ++i)
((unsigned char *) zmq_msg_data (&msg))[i] = i % 255;
int rc = zmq_msg_send (&msg, sc, 0);
TEST_ASSERT_EQUAL_INT (65536, rc);
rc = zmq_msg_recv (&msg, sb, 0);
TEST_ASSERT_EQUAL_INT (65536, rc);
for (int i = 0; i < 65536; ++i)
TEST_ASSERT_EQUAL_INT (i % 255,
((unsigned char *) zmq_msg_data (&msg))[i]);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
test_context_socket_close (sc);
test_context_socket_close (sb);
}
int main ()
{
setup_test_environment ();
UNITY_BEGIN ();
RUN_TEST (test_roundtrip);
RUN_TEST (test_short_message);
RUN_TEST (test_large_message);
return UNITY_END ();
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment