Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
a7bb41ef
Unverified
Commit
a7bb41ef
authored
Feb 06, 2020
by
Luca Boccassi
Committed by
GitHub
Feb 06, 2020
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #3820 from somdoron/WSENCODING
websocket problems
parents
6b75209f
65283429
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
98 additions
and
34 deletions
+98
-34
ws_address.cpp
src/ws_address.cpp
+5
-3
ws_encoder.cpp
src/ws_encoder.cpp
+30
-22
ws_encoder.hpp
src/ws_encoder.hpp
+1
-0
ws_listener.cpp
src/ws_listener.cpp
+6
-2
wss_address.cpp
src/wss_address.cpp
+1
-1
test_ws_transport.cpp
tests/test_ws_transport.cpp
+55
-6
No files found.
src/ws_address.cpp
View file @
a7bb41ef
...
@@ -67,13 +67,15 @@ zmq::ws_address_t::ws_address_t (const sockaddr *sa_, socklen_t sa_len_)
...
@@ -67,13 +67,15 @@ zmq::ws_address_t::ws_address_t (const sockaddr *sa_, socklen_t sa_len_)
&&
sa_len_
>=
static_cast
<
socklen_t
>
(
sizeof
(
_address
.
ipv6
)))
&&
sa_len_
>=
static_cast
<
socklen_t
>
(
sizeof
(
_address
.
ipv6
)))
memcpy
(
&
_address
.
ipv6
,
sa_
,
sizeof
(
_address
.
ipv6
));
memcpy
(
&
_address
.
ipv6
,
sa_
,
sizeof
(
_address
.
ipv6
));
_path
=
std
::
string
(
"
/
"
);
_path
=
std
::
string
(
""
);
char
hbuf
[
NI_MAXHOST
];
char
hbuf
[
NI_MAXHOST
];
const
int
rc
=
getnameinfo
(
addr
(),
addrlen
(),
hbuf
,
sizeof
(
hbuf
),
NULL
,
const
int
rc
=
getnameinfo
(
addr
(),
addrlen
(),
hbuf
,
sizeof
(
hbuf
),
NULL
,
0
,
NI_NUMERICHOST
);
0
,
NI_NUMERICHOST
);
if
(
rc
!=
0
)
if
(
rc
!=
0
)
{
_host
=
std
::
string
(
"localhost"
);
_host
=
std
::
string
(
"localhost"
);
return
;
}
std
::
ostringstream
os
;
std
::
ostringstream
os
;
...
@@ -128,7 +130,7 @@ int zmq::ws_address_t::to_string (std::string &addr_) const
...
@@ -128,7 +130,7 @@ int zmq::ws_address_t::to_string (std::string &addr_) const
{
{
std
::
ostringstream
os
;
std
::
ostringstream
os
;
os
<<
std
::
string
(
"ws://"
)
<<
host
()
<<
std
::
string
(
":"
)
os
<<
std
::
string
(
"ws://"
)
<<
host
()
<<
std
::
string
(
":"
)
<<
_address
.
port
();
<<
_address
.
port
()
<<
_path
;
addr_
=
os
.
str
();
addr_
=
os
.
str
();
return
0
;
return
0
;
...
...
src/ws_encoder.cpp
View file @
a7bb41ef
...
@@ -55,19 +55,24 @@ void zmq::ws_encoder_t::message_ready ()
...
@@ -55,19 +55,24 @@ void zmq::ws_encoder_t::message_ready ()
{
{
int
offset
=
0
;
int
offset
=
0
;
_is_binary
=
false
;
if
(
in_progress
()
->
is_ping
())
if
(
in_progress
()
->
is_ping
())
_tmp_buf
[
offset
++
]
=
0x80
|
zmq
::
ws_protocol_t
::
opcode_ping
;
_tmp_buf
[
offset
++
]
=
0x80
|
zmq
::
ws_protocol_t
::
opcode_ping
;
else
if
(
in_progress
()
->
is_pong
())
else
if
(
in_progress
()
->
is_pong
())
_tmp_buf
[
offset
++
]
=
0x80
|
zmq
::
ws_protocol_t
::
opcode_pong
;
_tmp_buf
[
offset
++
]
=
0x80
|
zmq
::
ws_protocol_t
::
opcode_pong
;
else
if
(
in_progress
()
->
is_close_cmd
())
else
if
(
in_progress
()
->
is_close_cmd
())
_tmp_buf
[
offset
++
]
=
0x80
|
zmq
::
ws_protocol_t
::
opcode_close
;
_tmp_buf
[
offset
++
]
=
0x80
|
zmq
::
ws_protocol_t
::
opcode_close
;
else
else
{
_tmp_buf
[
offset
++
]
=
0x82
;
// Final | binary
_tmp_buf
[
offset
++
]
=
0x82
;
// Final | binary
_is_binary
=
true
;
}
_tmp_buf
[
offset
]
=
_must_mask
?
0x80
:
0x00
;
_tmp_buf
[
offset
]
=
_must_mask
?
0x80
:
0x00
;
size_t
size
=
in_progress
()
->
size
();
size_t
size
=
in_progress
()
->
size
();
size
++
;
// TODO: check if binary
if
(
_is_binary
)
size
++
;
if
(
size
<=
125
)
if
(
size
<=
125
)
_tmp_buf
[
offset
++
]
|=
static_cast
<
unsigned
char
>
(
size
&
127
);
_tmp_buf
[
offset
++
]
|=
static_cast
<
unsigned
char
>
(
size
&
127
);
...
@@ -88,17 +93,17 @@ void zmq::ws_encoder_t::message_ready ()
...
@@ -88,17 +93,17 @@ void zmq::ws_encoder_t::message_ready ()
offset
+=
4
;
offset
+=
4
;
}
}
// TODO: check if binary
if
(
_is_binary
)
{
// Encode flags.
// Encode flags.
unsigned
char
protocol_flags
=
0
;
unsigned
char
protocol_flags
=
0
;
if
(
in_progress
()
->
flags
()
&
msg_t
::
more
)
if
(
in_progress
()
->
flags
()
&
msg_t
::
more
)
protocol_flags
|=
ws_protocol_t
::
more_flag
;
protocol_flags
|=
ws_protocol_t
::
more_flag
;
if
(
in_progress
()
->
flags
()
&
msg_t
::
command
)
if
(
in_progress
()
->
flags
()
&
msg_t
::
command
)
protocol_flags
|=
ws_protocol_t
::
command_flag
;
protocol_flags
|=
ws_protocol_t
::
command_flag
;
_tmp_buf
[
offset
++
]
=
_tmp_buf
[
offset
++
]
=
_must_mask
?
protocol_flags
^
_mask
[
0
]
:
protocol_flags
;
_must_mask
?
protocol_flags
^
_mask
[
0
]
:
protocol_flags
;
}
next_step
(
_tmp_buf
,
offset
,
&
ws_encoder_t
::
size_ready
,
false
);
next_step
(
_tmp_buf
,
offset
,
&
ws_encoder_t
::
size_ready
,
false
);
}
}
...
@@ -109,20 +114,23 @@ void zmq::ws_encoder_t::size_ready ()
...
@@ -109,20 +114,23 @@ void zmq::ws_encoder_t::size_ready ()
assert
(
in_progress
()
!=
&
_masked_msg
);
assert
(
in_progress
()
!=
&
_masked_msg
);
const
size_t
size
=
in_progress
()
->
size
();
const
size_t
size
=
in_progress
()
->
size
();
_masked_msg
.
close
();
_masked_msg
.
init_size
(
size
);
int
mask_index
=
1
;
// TODO: check if binary message
unsigned
char
*
dest
=
static_cast
<
unsigned
char
*>
(
_masked_msg
.
data
());
unsigned
char
*
src
=
unsigned
char
*
src
=
static_cast
<
unsigned
char
*>
(
in_progress
()
->
data
());
static_cast
<
unsigned
char
*>
(
in_progress
()
->
data
());
for
(
size_t
i
=
0
,
size
=
in_progress
()
->
size
();
i
<
size
;
unsigned
char
*
dest
=
src
;
++
i
,
mask_index
++
)
// If msg is shared or data is constant we cannot mask in-place, allocate a new msg for it
if
(
in_progress
()
->
flags
()
&
msg_t
::
shared
||
in_progress
()
->
is_cmsg
())
{
_masked_msg
.
close
();
_masked_msg
.
init_size
(
size
);
dest
=
static_cast
<
unsigned
char
*>
(
_masked_msg
.
data
());
}
int
mask_index
=
_is_binary
?
1
:
0
;
for
(
size_t
i
=
0
;
i
<
size
;
++
i
,
mask_index
++
)
dest
[
i
]
=
src
[
i
]
^
_mask
[
mask_index
%
4
];
dest
[
i
]
=
src
[
i
]
^
_mask
[
mask_index
%
4
];
next_step
(
_masked_msg
.
data
(),
_masked_msg
.
size
(),
next_step
(
dest
,
size
,
&
ws_encoder_t
::
message_ready
,
true
);
&
ws_encoder_t
::
message_ready
,
true
);
}
else
{
}
else
{
next_step
(
in_progress
()
->
data
(),
in_progress
()
->
size
(),
next_step
(
in_progress
()
->
data
(),
in_progress
()
->
size
(),
&
ws_encoder_t
::
message_ready
,
true
);
&
ws_encoder_t
::
message_ready
,
true
);
...
...
src/ws_encoder.hpp
View file @
a7bb41ef
...
@@ -50,6 +50,7 @@ class ws_encoder_t ZMQ_FINAL : public encoder_base_t<ws_encoder_t>
...
@@ -50,6 +50,7 @@ class ws_encoder_t ZMQ_FINAL : public encoder_base_t<ws_encoder_t>
bool
_must_mask
;
bool
_must_mask
;
unsigned
char
_mask
[
4
];
unsigned
char
_mask
[
4
];
msg_t
_masked_msg
;
msg_t
_masked_msg
;
bool
_is_binary
;
ZMQ_NON_COPYABLE_NOR_MOVABLE
(
ws_encoder_t
)
ZMQ_NON_COPYABLE_NOR_MOVABLE
(
ws_encoder_t
)
};
};
...
...
src/ws_listener.cpp
View file @
a7bb41ef
...
@@ -124,10 +124,14 @@ void zmq::ws_listener_t::in_event ()
...
@@ -124,10 +124,14 @@ void zmq::ws_listener_t::in_event ()
std
::
string
zmq
::
ws_listener_t
::
get_socket_name
(
zmq
::
fd_t
fd_
,
std
::
string
zmq
::
ws_listener_t
::
get_socket_name
(
zmq
::
fd_t
fd_
,
socket_end_t
socket_end_
)
const
socket_end_t
socket_end_
)
const
{
{
std
::
string
socket_name
;
if
(
_wss
)
if
(
_wss
)
return
zmq
::
get_socket_name
<
wss_address_t
>
(
fd_
,
socket_end_
);
socket_name
=
zmq
::
get_socket_name
<
wss_address_t
>
(
fd_
,
socket_end_
);
else
socket_name
=
zmq
::
get_socket_name
<
ws_address_t
>
(
fd_
,
socket_end_
);
return
zmq
::
get_socket_name
<
ws_address_t
>
(
fd_
,
socket_end_
);
return
socket_name
+
_address
.
path
(
);
}
}
int
zmq
::
ws_listener_t
::
create_socket
(
const
char
*
addr_
)
int
zmq
::
ws_listener_t
::
create_socket
(
const
char
*
addr_
)
...
...
src/wss_address.cpp
View file @
a7bb41ef
...
@@ -46,7 +46,7 @@ int zmq::wss_address_t::to_string (std::string &addr_) const
...
@@ -46,7 +46,7 @@ int zmq::wss_address_t::to_string (std::string &addr_) const
{
{
std
::
ostringstream
os
;
std
::
ostringstream
os
;
os
<<
std
::
string
(
"wss://"
)
<<
host
()
<<
std
::
string
(
":"
)
os
<<
std
::
string
(
"wss://"
)
<<
host
()
<<
std
::
string
(
":"
)
<<
_address
.
port
();
<<
_address
.
port
()
<<
path
()
;
addr_
=
os
.
str
();
addr_
=
os
.
str
();
return
0
;
return
0
;
...
...
tests/test_ws_transport.cpp
View file @
a7bb41ef
...
@@ -41,7 +41,6 @@ void test_roundtrip ()
...
@@ -41,7 +41,6 @@ void test_roundtrip ()
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_bind
(
sb
,
"ws://*:*/roundtrip"
));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_bind
(
sb
,
"ws://*:*/roundtrip"
));
TEST_ASSERT_SUCCESS_ERRNO
(
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_getsockopt
(
sb
,
ZMQ_LAST_ENDPOINT
,
connect_address
,
&
addr_length
));
zmq_getsockopt
(
sb
,
ZMQ_LAST_ENDPOINT
,
connect_address
,
&
addr_length
));
strcat
(
connect_address
,
"/roundtrip"
);
void
*
sc
=
test_context_socket
(
ZMQ_REQ
);
void
*
sc
=
test_context_socket
(
ZMQ_REQ
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
sc
,
connect_address
));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
sc
,
connect_address
));
...
@@ -79,7 +78,6 @@ void test_heartbeat ()
...
@@ -79,7 +78,6 @@ void test_heartbeat ()
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_bind
(
sb
,
"ws://*:*/heartbeat"
));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_bind
(
sb
,
"ws://*:*/heartbeat"
));
TEST_ASSERT_SUCCESS_ERRNO
(
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_getsockopt
(
sb
,
ZMQ_LAST_ENDPOINT
,
connect_address
,
&
addr_length
));
zmq_getsockopt
(
sb
,
ZMQ_LAST_ENDPOINT
,
connect_address
,
&
addr_length
));
strcat
(
connect_address
,
"/heartbeat"
);
void
*
sc
=
test_context_socket
(
ZMQ_REQ
);
void
*
sc
=
test_context_socket
(
ZMQ_REQ
);
...
@@ -113,7 +111,6 @@ void test_short_message ()
...
@@ -113,7 +111,6 @@ void test_short_message ()
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_bind
(
sb
,
"ws://*:*/short"
));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_bind
(
sb
,
"ws://*:*/short"
));
TEST_ASSERT_SUCCESS_ERRNO
(
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_getsockopt
(
sb
,
ZMQ_LAST_ENDPOINT
,
connect_address
,
&
addr_length
));
zmq_getsockopt
(
sb
,
ZMQ_LAST_ENDPOINT
,
connect_address
,
&
addr_length
));
strcat
(
connect_address
,
"/short"
);
void
*
sc
=
test_context_socket
(
ZMQ_REQ
);
void
*
sc
=
test_context_socket
(
ZMQ_REQ
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
sc
,
connect_address
));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
sc
,
connect_address
));
...
@@ -147,7 +144,6 @@ void test_large_message ()
...
@@ -147,7 +144,6 @@ void test_large_message ()
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_bind
(
sb
,
"ws://*:*/large"
));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_bind
(
sb
,
"ws://*:*/large"
));
TEST_ASSERT_SUCCESS_ERRNO
(
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_getsockopt
(
sb
,
ZMQ_LAST_ENDPOINT
,
connect_address
,
&
addr_length
));
zmq_getsockopt
(
sb
,
ZMQ_LAST_ENDPOINT
,
connect_address
,
&
addr_length
));
strcat
(
connect_address
,
"/short"
);
void
*
sc
=
test_context_socket
(
ZMQ_REQ
);
void
*
sc
=
test_context_socket
(
ZMQ_REQ
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
sc
,
connect_address
));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
sc
,
connect_address
));
...
@@ -197,8 +193,6 @@ void test_curve ()
...
@@ -197,8 +193,6 @@ void test_curve ()
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_bind
(
server
,
"ws://*:*/roundtrip"
));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_bind
(
server
,
"ws://*:*/roundtrip"
));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_getsockopt
(
server
,
ZMQ_LAST_ENDPOINT
,
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_getsockopt
(
server
,
ZMQ_LAST_ENDPOINT
,
connect_address
,
&
addr_length
));
connect_address
,
&
addr_length
));
strcat
(
connect_address
,
"/roundtrip"
);
void
*
client
=
test_context_socket
(
ZMQ_REQ
);
void
*
client
=
test_context_socket
(
ZMQ_REQ
);
TEST_ASSERT_SUCCESS_ERRNO
(
TEST_ASSERT_SUCCESS_ERRNO
(
...
@@ -215,6 +209,60 @@ void test_curve ()
...
@@ -215,6 +209,60 @@ void test_curve ()
test_context_socket_close
(
server
);
test_context_socket_close
(
server
);
}
}
void
test_mask_shared_msg
()
{
char
connect_address
[
MAX_SOCKET_STRING
+
strlen
(
"/mask-shared"
)];
size_t
addr_length
=
sizeof
(
connect_address
);
void
*
sb
=
test_context_socket
(
ZMQ_DEALER
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_bind
(
sb
,
"ws://*:*/mask-shared"
));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_getsockopt
(
sb
,
ZMQ_LAST_ENDPOINT
,
connect_address
,
&
addr_length
));
void
*
sc
=
test_context_socket
(
ZMQ_DEALER
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
sc
,
connect_address
));
zmq_msg_t
msg
;
zmq_msg_init_size
(
&
msg
,
255
);
// Message have to be long enough so it won't fit inside msg
unsigned
char
*
data
=
(
unsigned
char
*
)
zmq_msg_data
(
&
msg
);
for
(
int
i
=
0
;
i
<
255
;
i
++
)
data
[
i
]
=
i
;
// Taking a copy to make the msg shared
zmq_msg_t
copy
;
zmq_msg_init
(
&
copy
);
zmq_msg_copy
(
&
copy
,
&
msg
);
// Sending the shared msg
int
rc
=
zmq_msg_send
(
&
msg
,
sc
,
0
);
TEST_ASSERT_EQUAL_INT
(
255
,
rc
);
// Recv the msg and check that it was masked correctly
rc
=
zmq_msg_recv
(
&
msg
,
sb
,
0
);
TEST_ASSERT_EQUAL_INT
(
255
,
rc
);
data
=
(
unsigned
char
*
)
zmq_msg_data
(
&
msg
);
for
(
int
i
=
0
;
i
<
255
;
i
++
)
TEST_ASSERT_EQUAL_INT
(
i
,
data
[
i
]);
// Testing that copy was not masked
data
=
(
unsigned
char
*
)
zmq_msg_data
(
&
copy
);
for
(
int
i
=
0
;
i
<
255
;
i
++
)
TEST_ASSERT_EQUAL_INT
(
i
,
data
[
i
]);
// Constant msg cannot be masked as well, as it is constant
rc
=
zmq_send_const
(
sc
,
"HELLO"
,
5
,
0
);
TEST_ASSERT_EQUAL_INT
(
5
,
rc
);
recv_string_expect_success
(
sb
,
"HELLO"
,
0
);
zmq_msg_close
(
&
copy
);
zmq_msg_close
(
&
msg
);
test_context_socket_close
(
sc
);
test_context_socket_close
(
sb
);
}
int
main
()
int
main
()
{
{
setup_test_environment
();
setup_test_environment
();
...
@@ -225,6 +273,7 @@ int main ()
...
@@ -225,6 +273,7 @@ int main ()
RUN_TEST
(
test_short_message
);
RUN_TEST
(
test_short_message
);
RUN_TEST
(
test_large_message
);
RUN_TEST
(
test_large_message
);
RUN_TEST
(
test_heartbeat
);
RUN_TEST
(
test_heartbeat
);
RUN_TEST
(
test_mask_shared_msg
);
if
(
zmq_has
(
"curve"
))
if
(
zmq_has
(
"curve"
))
RUN_TEST
(
test_curve
);
RUN_TEST
(
test_curve
);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment