Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
3a558fcc
Commit
3a558fcc
authored
Jan 31, 2013
by
Pieter Hintjens
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Rewrote raw test completely
parent
8c928934
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
88 additions
and
198 deletions
+88
-198
test_raw_sock.cpp
tests/test_raw_sock.cpp
+88
-198
No files found.
tests/test_raw_sock.cpp
View file @
3a558fcc
...
...
@@ -18,237 +18,127 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <pthread.h>
#include <netinet/tcp.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <assert.h>
#include <fcntl.h>
#include <zmq.h>
#include <unistd.h>
#include <poll.h>
//ToDo: Windows?
const
char
*
test_str
=
"TEST-STRING"
;
int
tcp_client
(
void
)
{
struct
sockaddr_in
serv_addr
;
struct
hostent
*
server
;
const
int
portno
=
5555
;
int
sockfd
=
socket
(
AF_INET
,
SOCK_STREAM
,
0
);
assert
(
sockfd
>=
0
);
server
=
gethostbyname
(
"localhost"
);
assert
(
server
);
memset
(
&
serv_addr
,
0
,
sizeof
serv_addr
);
serv_addr
.
sin_family
=
AF_INET
;
memmove
(
&
serv_addr
.
sin_addr
.
s_addr
,
server
->
h_addr
,
server
->
h_length
);
serv_addr
.
sin_port
=
htons
(
portno
);
int
rc
=
connect
(
sockfd
,
(
struct
sockaddr
*
)
&
serv_addr
,
sizeof
serv_addr
);
assert
(
rc
==
0
);
int
nodelay
=
1
;
rc
=
setsockopt
(
sockfd
,
IPPROTO_TCP
,
TCP_NODELAY
,
(
char
*
)
&
nodelay
,
sizeof
nodelay
);
assert
(
rc
==
0
);
return
sockfd
;
}
int
tcp_server
(
void
)
{
int
listenfd
=
socket
(
AF_INET
,
SOCK_STREAM
,
0
);
assert
(
listenfd
!=
-
1
);
int
flag
=
1
;
int
rc
=
setsockopt
(
listenfd
,
SOL_SOCKET
,
SO_REUSEADDR
,
&
flag
,
sizeof
flag
);
assert
(
rc
==
0
);
struct
sockaddr_in
serv_addr
;
memset
(
&
serv_addr
,
0
,
sizeof
serv_addr
);
serv_addr
.
sin_family
=
AF_INET
;
serv_addr
.
sin_addr
.
s_addr
=
htonl
(
INADDR_ANY
);
serv_addr
.
sin_port
=
htons
(
5555
);
rc
=
bind
(
listenfd
,
(
struct
sockaddr
*
)
&
serv_addr
,
sizeof
serv_addr
);
assert
(
rc
==
0
);
rc
=
listen
(
listenfd
,
8
);
assert
(
rc
==
0
);
int
sockfd
=
accept
(
listenfd
,
NULL
,
NULL
);
assert
(
sockfd
!=
-
1
);
rc
=
close
(
listenfd
);
assert
(
rc
==
0
);
int
flags
=
fcntl
(
sockfd
,
F_GETFL
,
0
);
if
(
flags
==
-
1
)
flags
=
0
;
rc
=
fcntl
(
sockfd
,
F_SETFL
,
flags
|
O_NONBLOCK
);
assert
(
rc
!=
-
1
);
return
sockfd
;
}
void
tcp_client_write
(
int
sockfd
,
const
void
*
buf
,
int
buf_len
)
{
assert
(
buf
);
int
n
=
write
(
sockfd
,
buf
,
buf_len
);
assert
(
n
>=
0
);
}
void
tcp_client_read
(
int
sockfd
)
{
struct
timeval
tm
;
tm
.
tv_sec
=
1
;
tm
.
tv_usec
=
0
;
fd_set
r
;
char
buffer
[
16
];
#include <assert.h>
#include <string.h>
#include <stdbool.h>
FD_ZERO
(
&
r
);
FD_SET
(
sockfd
,
&
r
);
// ZMTP protocol greeting structure
int
sr
=
select
(
sockfd
+
1
,
&
r
,
NULL
,
NULL
,
&
tm
);
assert
(
sr
>
0
);
typedef
unsigned
char
byte
;
typedef
struct
{
byte
signature
[
10
];
// 0xFF 8*0x00 0x7F
byte
revision
;
// 0x01 = ZMTP/2.0
byte
socktype
;
// Defined in ZMTP spec
byte
identity
[
2
];
// Empty message
}
zmtp_greeting_t
;
int
n
=
read
(
sockfd
,
buffer
,
16
);
assert
(
n
>
0
);
assert
(
memcmp
(
buffer
,
test_str
,
strlen
(
test_str
))
==
0
);
}
#define ZMTP_DEALER 5 // Socket type constants
#define ZMTP_ROUTER 6
size_t
tcp_read
(
int
s
,
char
*
buf
,
size_t
bufsize
)
{
size_t
total_size
=
0
;
struct
pollfd
pfd
=
{
s
,
POLLIN
};
int
rc
=
poll
(
&
pfd
,
1
,
1000
);
assert
(
rc
>
0
);
// This is a greeting matching what 0MQ will send us; note the
// 8-byte size is set to 1 for backwards compatibility
while
(
rc
>
0
&&
total_size
<
bufsize
)
{
int
chunk_size
=
read
(
s
,
buf
+
total_size
,
bufsize
-
total_size
);
assert
(
chunk_size
>=
0
);
total_size
+=
chunk_size
;
rc
=
poll
(
&
pfd
,
1
,
1000
);
}
return
total_size
;
}
static
zmtp_greeting_t
greeting
=
{
{
0xFF
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
1
,
0x7F
},
1
,
0
,
{
0
,
0
}
};
void
tcp_client_close
(
int
sockf
d
)
int
main
(
voi
d
)
{
close
(
sockfd
);
}
fprintf
(
stderr
,
"test_raw_sock running...
\n
"
);
int
rc
;
void
test_zmq_connect
(
void
)
{
void
*
ctx
=
zmq_init
(
1
);
// Set up our context and sockets
void
*
ctx
=
zmq_ctx_new
();
assert
(
ctx
);
void
*
zs
=
zmq_socket
(
ctx
,
ZMQ_ROUTER
);
assert
(
zs
);
// We'll be using this socket in raw mode
void
*
router
=
zmq_socket
(
ctx
,
ZMQ_ROUTER
);
assert
(
router
);
int
rc
=
zmq_setsockopt
(
zs
,
ZMQ_IDENTITY
,
"X"
,
1
);
int
on
=
1
;
rc
=
zmq_setsockopt
(
router
,
ZMQ_ROUTER_RAW
,
&
on
,
sizeof
(
on
));
assert
(
rc
==
0
);
int
raw_sock
=
1
;
rc
=
zmq_setsockopt
(
zs
,
ZMQ_ROUTER_RAW
,
&
raw_sock
,
sizeof
raw_sock
);
int
zero
=
0
;
rc
=
zmq_setsockopt
(
router
,
ZMQ_LINGER
,
&
zero
,
sizeof
(
zero
));
assert
(
rc
==
0
);
rc
=
zmq_bind
(
router
,
"tcp://*:5555"
);
assert
(
rc
==
0
);
rc
=
zmq_connect
(
zs
,
"tcp://127.0.0.1:5555"
);
// We'll be using this socket as the other peer
void
*
dealer
=
zmq_socket
(
ctx
,
ZMQ_DEALER
);
assert
(
dealer
);
rc
=
zmq_setsockopt
(
dealer
,
ZMQ_LINGER
,
&
zero
,
sizeof
(
zero
));
assert
(
rc
==
0
);
rc
=
zmq_connect
(
dealer
,
"tcp://localhost:5555"
);
int
i
;
for
(
i
=
0
;
i
<
8
;
i
++
)
{
int
server_fd
=
tcp_server
();
assert
(
server_fd
!=
-
1
);
// Send a message on the dealer socket
rc
=
zmq_send
(
dealer
,
"Hello"
,
5
,
0
);
assert
(
rc
==
5
);
zmq_msg_t
msg
;
rc
=
zmq_msg_init_size
(
&
msg
,
strlen
(
test_str
));
// First frame is identity
zmq_msg_t
identity
;
rc
=
zmq_msg_init
(
&
identity
);
assert
(
rc
==
0
);
memcpy
(
zmq_msg_data
(
&
msg
),
test_str
,
strlen
(
test_str
));
rc
=
zmq_msg_send
(
&
msg
,
zs
,
0
);
rc
=
zmq_msg_recv
(
&
identity
,
router
,
0
);
assert
(
rc
>
0
);
assert
(
zmq_msg_more
(
&
identity
));
char
buffer
[
128
];
size_t
bytes_read
=
tcp_read
(
server_fd
,
buffer
,
sizeof
buffer
);
// Second frame is greeting signature
byte
buffer
[
255
];
rc
=
zmq_recv
(
router
,
buffer
,
255
,
0
);
assert
(
rc
==
10
);
assert
(
memcmp
(
buffer
,
greeting
.
signature
,
10
)
==
0
);
assert
(
bytes_read
==
strlen
(
test_str
));
assert
(
memcmp
(
buffer
,
test_str
,
bytes_read
)
==
0
);
// Send our own protocol greeting
rc
=
zmq_msg_send
(
&
identity
,
router
,
ZMQ_SNDMORE
);
assert
(
rc
>
0
);
greeting
.
socktype
=
ZMTP_ROUTER
;
rc
=
zmq_send
(
router
,
&
greeting
,
sizeof
(
greeting
),
0
);
assert
(
rc
==
sizeof
(
greeting
));
rc
=
close
(
server_fd
);
assert
(
rc
==
0
);
}
// Now we expect the data from the DEALER socket
// First frame is, again, the identity of the connection
rc
=
zmq_msg_recv
(
&
identity
,
router
,
0
);
assert
(
rc
>
0
);
assert
(
zmq_msg_more
(
&
identity
));
rc
=
zmq_close
(
zs
);
assert
(
rc
==
0
);
// Second frame contains all remaining data from DEALER
rc
=
zmq_recv
(
router
,
buffer
,
255
,
0
);
assert
(
rc
==
11
);
rc
=
zmq_term
(
ctx
);
assert
(
rc
==
0
);
}
// First four bytes are [revision][socktype][identity]
assert
(
buffer
[
0
]
==
1
);
// Revision = 1
assert
(
buffer
[
1
]
==
ZMTP_DEALER
);
int
main
(
void
)
{
fprintf
(
stderr
,
"test_raw_sock running...
\n
"
);
// Identity is 2 byte message
assert
(
buffer
[
2
]
==
0
);
// Flags = 0
assert
(
buffer
[
3
]
==
0
);
// Size = 0
zmq_msg_t
message
;
zmq_msg_t
id
;
// Then we have a 5-byte message "Hello"
assert
(
buffer
[
4
]
==
0
);
// Flags = 0
assert
(
buffer
[
5
]
==
5
);
// Size = 5
assert
(
memcmp
(
buffer
+
6
,
"Hello"
,
5
)
==
0
);
//===================
void
*
ctx
=
zmq_init
(
1
);
assert
(
ctx
);
// Send "World" back to DEALER
rc
=
zmq_msg_send
(
&
identity
,
router
,
ZMQ_SNDMORE
);
assert
(
rc
>
0
);
byte
world
[]
=
{
0
,
5
,
'W'
,
'o'
,
'r'
,
'l'
,
'd'
};
rc
=
zmq_send
(
router
,
world
,
sizeof
(
world
),
0
);
assert
(
rc
==
sizeof
(
world
));
void
*
sb
=
zmq_socket
(
ctx
,
ZMQ_ROUTER
);
assert
(
sb
);
// Expect response on DEALER socket
rc
=
zmq_recv
(
dealer
,
buffer
,
255
,
0
);
assert
(
rc
==
5
);
assert
(
memcmp
(
buffer
,
"World"
,
5
)
==
0
);
int
raw_sock
=
1
;
int
rc
=
zmq_setsockopt
(
sb
,
ZMQ_ROUTER_RAW
,
&
raw_sock
,
sizeof
raw_sock
);
rc
=
zmq_close
(
dealer
);
assert
(
rc
==
0
);
rc
=
zmq_bind
(
sb
,
"tcp://127.0.0.1:5555"
);
assert
(
rc
==
0
);
int
sock_fd
=
tcp_client
();
assert
(
sock_fd
>=
0
);
// ===================
zmq_msg_init
(
&
message
);
zmq_msg_init
(
&
id
);
rc
=
zmq_close
(
router
);
assert
(
rc
==
0
);
zmq_pollitem_t
items
[]
=
{
{
sb
,
0
,
ZMQ_POLLIN
,
0
},
};
tcp_client_write
(
sock_fd
,
test_str
,
strlen
(
test_str
));
zmq_poll
(
items
,
1
,
500
);
assert
(
items
[
0
].
revents
&
ZMQ_POLLIN
);
int
n
=
zmq_msg_recv
(
&
id
,
sb
,
0
);
assert
(
n
>
0
);
n
=
zmq_msg_recv
(
&
message
,
sb
,
0
);
assert
(
n
>
0
);
assert
(
memcmp
(
zmq_msg_data
(
&
message
),
test_str
,
strlen
(
test_str
))
==
0
);
zmq_msg_send
(
&
id
,
sb
,
ZMQ_SNDMORE
);
zmq_msg_send
(
&
message
,
sb
,
ZMQ_SNDMORE
);
// SNDMORE option is ignored
tcp_client_read
(
sock_fd
);
tcp_client_close
(
sock_fd
);
zmq_msg_close
(
&
id
);
zmq_msg_close
(
&
message
);
zmq_close
(
sb
);
zmq_term
(
ctx
);
test_zmq_connect
();
fprintf
(
stderr
,
"test_raw_sock PASSED.
\n
"
);
rc
=
zmq_ctx_term
(
ctx
);
assert
(
rc
==
0
);
return
0
;
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment