Commit c980820d authored by Pieter Hintjens's avatar Pieter Hintjens

I'm reverting the various changes to the throughput test programs since as far

as I can see, these didn't work any more. At the very least, the command line
API was broken and forced the user to enter new, exotic arguments. Patches
should not break existing APIs. But also, the internals of these programs had
become weird.

If we want to build more complex performance tests, that's fine, but we should
make new programs, not break the old ones. We need minimal, safe performance
tests in 0MQ.

Also, the code was quite horrid. So it's gone. If anyone wants to bring it back
please make the code neat, and build new APIs instead of breaking the old ones.

Cheers
Pieter
parent 67e02ca8
/*
Copyright (c) 2007-2012 iMatix Corporation
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
......@@ -23,143 +21,37 @@
#include "../include/zmq_utils.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <limits.h>
#include "platform.hpp"
#ifndef ZMQ_HAVE_WINDOWS
#include <sys/time.h>
#endif
#define ZMSG 1
#define DATA 0
typedef struct US_TIMER US_TIMER;
struct US_TIMER{
struct timeval time_was;
struct timeval time_now;
};
/* Records the current timer state
*/
void tm_init( US_TIMER *t){
#if defined ZMQ_HAVE_WINDOWS
// Get the high resolution counter's accuracy.
LARGE_INTEGER ticksPerSecond;
QueryPerformanceFrequency (&ticksPerSecond);
// What time is it?
LARGE_INTEGER tick;
if ( !QueryPerformanceCounter (&tick) ) { perror( "tm_init()" ); }
// Seconds
t->time_now.tv_sec = (long)( tick.QuadPart / ticksPerSecond.QuadPart );
// Microseconds
t->time_now.tv_usec = (long)( ( tick.QuadPart - t->time_now.tv_sec * ticksPerSecond.QuadPart ) * 1000000 / ticksPerSecond.QuadPart );
#else
if( gettimeofday( &t->time_now, NULL) < 0){ perror( "tm_init()");}
#endif
t->time_was = t->time_now;
}
/* Returns the time passed in microsecond precision in seconds since last init
of timer.
*/
float tm_secs( US_TIMER *t){
register float seconds;
#if defined ZMQ_HAVE_WINDOWS
// Get the high resolution counter's accuracy.
LARGE_INTEGER ticksPerSecond;
QueryPerformanceFrequency (&ticksPerSecond);
// What time is it?
LARGE_INTEGER tick;
if ( !QueryPerformanceCounter (&tick) ) { perror( "tm_secs()" ); }
// Seconds
t->time_now.tv_sec = (long)( tick.QuadPart / ticksPerSecond.QuadPart );
// Microseconds
t->time_now.tv_usec = (long)( ( tick.QuadPart - t->time_now.tv_sec * ticksPerSecond.QuadPart ) * 1000000 / ticksPerSecond.QuadPart );
#else
if( gettimeofday( &t->time_now, NULL) < 0){ perror( "tm_secs()");}
#endif
seconds = ( ((float)( t->time_now.tv_sec - t->time_was.tv_sec)) +
(((float)( t->time_now.tv_usec - t->time_was.tv_usec)) / 1000000.0));
t->time_was = t->time_now;
return( seconds);
}
const char *bind_to;
int message_count = 1000;
int message_size = 1024;
int threads = 1;
int workers = 1;
int sndbuflen = 128*256;
int rcvbuflen = 128*256;
int flow = ZMQ_PULL;
int rec = DATA;
void my_free (void *data, void *hint)
{
// free (data);
}
int main (int argc, char *argv [])
{
US_TIMER timer;
const char *bind_to;
int message_count;
size_t message_size;
void *ctx;
void *s;
int rc;
int i;
void *buf = NULL;
zmq_msg_t msg;
void *watch;
unsigned long elapsed;
unsigned long throughput;
double megabits;
if (argc != 9) {
printf ("usage: local_thr <bind-to> <message-size> <message-count> <SND buffer> <RCV buffer> <flow (PUSH/PULL)> <rec (ZMSG/DATA)> <zmq-threads>\n");
if (argc != 4) {
printf ("usage: local_thr <bind-to> <message-size> <message-count>\n");
return 1;
}
bind_to = argv [1];
message_size = atoi (argv [2]);
message_count = atoi (argv [3]);
sndbuflen = atoi (argv [4]);
rcvbuflen = atoi (argv [5]);
if( !strcmp( argv [6], "PUSH")){
flow = ZMQ_PUSH;
}
if( !strcmp( argv [6], "PULL")){
flow = ZMQ_PULL;
}
if( !strcmp( argv [7], "ZMSG")){
rec = ZMSG;
}
if( !strcmp( argv [7], "DATA")){
rec = DATA;
}
threads = atoi (argv [8]);
if( !(buf = malloc( message_size))){ perror("malloc"); return -1;}
ctx = zmq_ctx_new ();
ctx = zmq_init (1);
if (!ctx) {
printf ("error in zmq_ctx_new: %s\n", zmq_strerror (errno));
return -1;
}
rc = zmq_ctx_set ( ctx, ZMQ_IO_THREADS, threads);
if (rc) {
printf ("error in zmq_ctx_set: %s\n", zmq_strerror (errno));
printf ("error in zmq_init: %s\n", zmq_strerror (errno));
return -1;
}
s = zmq_socket (ctx, flow);
s = zmq_socket (ctx, ZMQ_PULL);
if (!s) {
printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
return -1;
......@@ -168,141 +60,60 @@ int main (int argc, char *argv [])
// Add your socket options here.
// For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.
size_t rcvbuflenlen = (size_t)sizeof rcvbuflen;
size_t sndbuflenlen = (size_t)sizeof sndbuflen;
rc = zmq_setsockopt (s, ZMQ_RCVBUF, &rcvbuflen, rcvbuflenlen);
if (rc != 0) {
printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno));
return -1;
}
rc = zmq_setsockopt (s, ZMQ_SNDBUF, &sndbuflen, sndbuflenlen);
if (rc != 0) {
printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno));
return -1;
}
sndbuflen = 1;
rc = zmq_setsockopt (s, ZMQ_DELAY_ATTACH_ON_CONNECT, &sndbuflen, sndbuflenlen);
if (rc != 0) {
printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno));
return -1;
}
sndbuflen = 2;
rc = zmq_setsockopt (s, ZMQ_SNDHWM, &sndbuflen, sndbuflenlen);
if (rc != 0) {
printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno));
return -1;
}
sndbuflen = 2;
rc = zmq_setsockopt (s, ZMQ_RCVHWM, &sndbuflen, sndbuflenlen);
if (rc != 0) {
printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno));
return -1;
}
rc = zmq_getsockopt (s, ZMQ_RCVBUF, &rcvbuflen, &rcvbuflenlen);
if (rc != 0) {
printf ("error in zmq_getsockopt: %s\n", zmq_strerror (errno));
return -1;
}
rc = zmq_getsockopt (s, ZMQ_SNDBUF, &sndbuflen, &sndbuflenlen);
if (rc != 0) {
printf ("error in zmq_getsockopt: %s\n", zmq_strerror (errno));
return -1;
}
printf("RCVBUF=%d KB SNDBUF=%d KB adjusted\n", rcvbuflen/1024, sndbuflen/1024);
printf("Threads: %d\n", zmq_ctx_get( ctx, ZMQ_IO_THREADS));
rc = zmq_bind (s, bind_to);
if (rc != 0) {
printf ("error in zmq_bind: %s\n", zmq_strerror (errno));
return -1;
}
if( !(buf = malloc( message_size))){ perror("malloc"); return -1;}
printf("%sING %s...\n", flow == ZMQ_PUSH ? "PUSH":"PULL", rec ? "ZMQ_MSG":"DATA");
tm_init( &timer);
if( flow == ZMQ_PULL){
if( rec == ZMSG){
zmq_msg_t msg;
rc = zmq_msg_init (&msg);
if (rc != 0) {
printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno));
return -1;
}
for (i = 0; i != message_count; i++) {
rc = zmq_msg_recv (&msg, s, 0);
rc = zmq_recvmsg (s, &msg, 0);
if (rc < 0) {
printf ("error in zmq_recv: %s\n", zmq_strerror (errno));
printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
return -1;
}
}
rc = zmq_msg_close (&msg);
if (rc != 0) {
printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
if (zmq_msg_size (&msg) != message_size) {
printf ("message of incorrect size received\n");
return -1;
}}
}
else{
watch = zmq_stopwatch_start ();
for (i = 0; i != message_count; i++) {
rc = zmq_recv( s, buf, message_size, 0);
for (i = 0; i != message_count - 1; i++) {
rc = zmq_recvmsg (s, &msg, 0);
if (rc < 0) {
printf ("error in zmq_recv: %s\n", zmq_strerror (errno));
printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
return -1;
}
}}
}
else{
if( rec == ZMSG){
zmq_msg_t msg;
for (i = 0; i != message_count; i++) {
rc = zmq_msg_init_data (&msg, buf, message_size, NULL, NULL);
if (rc != 0) {
printf ("error in zmq_msg_init_data: %s\n", zmq_strerror (errno));
if (zmq_msg_size (&msg) != message_size) {
printf ("message of incorrect size received\n");
return -1;
}
rc = zmq_msg_send( &msg, s, 0);
if (rc < 0) {
printf ("error in zmq_send: %s\n", zmq_strerror (errno));
return -1;
}
}}
elapsed = zmq_stopwatch_stop (watch);
if (elapsed == 0)
elapsed = 1;
else{
for (i = 0; i != message_count; i++){
rc = zmq_send( s, buf, message_size, 0);
if (rc < 0) {
printf ("error in zmq_send: %s\n", zmq_strerror (errno));
rc = zmq_msg_close (&msg);
if (rc != 0) {
printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
return -1;
}
}}
}
float secs = tm_secs( &timer);
float total = (((float) message_count) * ((float) message_size)) / (1024.0*1024.0*1024.0);
printf ("Message size: %d KBytes, time: %f secs\n", (int) message_size/1024, secs);
printf ("%sed %.3f GB @ %.3f GB/s\n", (flow == ZMQ_PULL) ? "Pull":"Push", total, total/secs);
throughput = (unsigned long)
((double) message_count / (double) elapsed * 1000000);
megabits = (double) (throughput * message_size * 8) / 1000000;
printf ("message size: %d [B]\n", (int) message_size);
printf ("message count: %d\n", (int) message_count);
printf ("mean throughput: %d [msg/s]\n", (int) throughput);
printf ("mean throughput: %.3f [Mb/s]\n", (double) megabits);
rc = zmq_close (s);
if (rc != 0) {
......@@ -316,7 +127,5 @@ int main (int argc, char *argv [])
return -1;
}
if( buf) free( buf);
return 0;
}
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment