/*
    Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "testutil.hpp"
#include "../include/zmq_utils.h"

// Asynchronous client-to-server (DEALER to ROUTER) - pure libzmq
//
// While this example runs in a single process, that is to make
// it easier to start and stop the example. Each task may have its own
// context and conceptually acts as a separate process. To have this
// behaviour, it is necessary to replace the inproc transport of the
// control socket by a tcp transport.

// This is our client task
// It connects to the server, and then sends a request once per second
// It collects responses as they arrive, and it prints them out. We will
// run several client tasks in parallel, each with a different random ID.

#define CONTENT_SIZE 13
#define CONTENT_SIZE_MAX 32
#define ID_SIZE 10
#define ID_SIZE_MAX 32
#define QT_WORKERS	5
#define QT_CLIENTS	3
#define is_verbose 0

static void
client_task (void *ctx)
{
//    void *ctx = zmq_ctx_new (); // if we want our own context, we shall use tcp instead of inproc for the control socket
//    assert (ctx);
    void *client = zmq_socket (ctx, ZMQ_DEALER);
    assert (client);

	// Control socket receives terminate command from main over inproc
    void *control = zmq_socket (ctx, ZMQ_SUB);
    assert (control);
    int rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0);
    assert (rc == 0);
    rc = zmq_connect (control, "inproc://control");
    assert (rc == 0);

    char content [CONTENT_SIZE_MAX];
	// Set random identity to make tracing easier
	char identity [ID_SIZE];
	sprintf (identity, "%04X-%04X", rand() % 0xFFFF, rand() % 0xFFFF);
    rc = zmq_setsockopt (client, ZMQ_IDENTITY, identity, ID_SIZE); // includes '\0' as an helper for printf
    assert (rc == 0);
    rc = zmq_connect (client, "tcp://127.0.0.1:9999");
    assert (rc == 0);

	zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 }, { control, 0, ZMQ_POLLIN, 0 } };
	int request_nbr = 0;
	bool run = true;
	while (run) {
		// Tick once per 200 ms, pulling in arriving messages
		int centitick;
		for (centitick = 0; centitick < 20; centitick++) {
			zmq_poll (items, 2, 10);
			if (items [0].revents & ZMQ_POLLIN) {
			    int rcvmore;
			    size_t sz = sizeof (rcvmore);
			    rc = zmq_recv (client, content, CONTENT_SIZE_MAX, 0);
			    assert (rc == CONTENT_SIZE);
				if (is_verbose) printf("client receive - identity = %s    content = %s\n", identity, content);
			    //  Check that message is still the same
			    assert (memcmp (content, "request #", 9) == 0);
			    rc = zmq_getsockopt (client, ZMQ_RCVMORE, &rcvmore, &sz);
			    assert (rc == 0);
			    assert (!rcvmore);
			}
			if (items [1].revents & ZMQ_POLLIN) {
			    rc = zmq_recv (control, content, CONTENT_SIZE_MAX, 0);
				if (is_verbose) printf("client receive - identity = %s    command = %s\n", identity, content);
			    if (memcmp (content, "TERMINATE", 10) == 0) {
			    	run = false;
			    	break;
			    }
			}
		}
		sprintf(content, "request #%03d", ++request_nbr); // CONTENT_SIZE
	    rc = zmq_send (client, content, CONTENT_SIZE, 0);
	    assert (rc == CONTENT_SIZE);
	}

    rc = zmq_close (client);
    assert (rc == 0);
    rc = zmq_close (control);
    assert (rc == 0);
//    rc = zmq_ctx_term (ctx);
//    assert (rc == 0);
}

// This is our server task.
// It uses the multithreaded server model to deal requests out to a pool
// of workers and route replies back to clients. One worker can handle
// one request at a time but one client can talk to multiple workers at
// once.

static void server_worker (void *ctx);

void
server_task (void *ctx)
{
//    void *ctx = zmq_ctx_new (); // if we want our own context, we shall use tcp instead of inproc for the control socket
//    assert (ctx);

    // Frontend socket talks to clients over TCP
    void *frontend = zmq_socket (ctx, ZMQ_ROUTER);
    assert (frontend);
    int rc = zmq_bind (frontend, "tcp://127.0.0.1:9999");
    assert (rc == 0);

	// Backend socket talks to workers over inproc
    void *backend = zmq_socket (ctx, ZMQ_DEALER);
    assert (backend);
    rc = zmq_bind (backend, "inproc://backend");
    assert (rc == 0);

	// Control socket receives terminate command from main over inproc
    void *control = zmq_socket (ctx, ZMQ_SUB);
    assert (control);
    rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0);
    assert (rc == 0);
    rc = zmq_connect (control, "inproc://control");
    assert (rc == 0);

	// Launch pool of worker threads, precise number is not critical
	int thread_nbr;
	void* threads [5];
	for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++)
		threads[thread_nbr] = zmq_threadstart (&server_worker, ctx);

	// Connect backend to frontend via a proxy
	zmq_proxy_steerable (frontend, backend, NULL, control);

	for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++)
		zmq_threadclose (threads[thread_nbr]);

    rc = zmq_close (frontend);
    assert (rc == 0);
    rc = zmq_close (backend);
    assert (rc == 0);
    rc = zmq_close (control);
    assert (rc == 0);
//    rc = zmq_ctx_term (ctx);
//    assert (rc == 0);
}

// Each worker task works on one request at a time and sends a random number
// of replies back, with random delays between replies:
// The comments in the first column, if suppressed, makes it a poller version

static void
server_worker (void *ctx)
{
    void *worker = zmq_socket (ctx, ZMQ_DEALER);
    assert (worker);
    int rc = zmq_connect (worker, "inproc://backend");
    assert (rc == 0);

	// Control socket receives terminate command from main over inproc
    void *control = zmq_socket (ctx, ZMQ_SUB);
    assert (control);
    rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0);
    assert (rc == 0);
    rc = zmq_connect (control, "inproc://control");
    assert (rc == 0);

    char content [CONTENT_SIZE_MAX]; //	bigger than what we need to check that
	char identity [ID_SIZE_MAX]; 	 // the size received is the size sent

//	zmq_pollitem_t items [] = { { worker, 0, ZMQ_POLLIN, 0 }, { control, 0, ZMQ_POLLIN, 0 } }; // POLLING
	bool run = true;
	while (run) {
//		zmq_poll (items, 2, 10); // POLLING
//		if (items [1].revents & ZMQ_POLLIN) { // POLLING
			rc = zmq_recv (control, content, CONTENT_SIZE_MAX, ZMQ_DONTWAIT); // usually, rc == -1 (no message)
			if (rc > 0) {
				if (is_verbose) printf("server_worker receives command = %s\n", content);
				if (memcmp (content, "TERMINATE", 10) == 0)
					run = false;
			}
//		} // POLLING

//		if (items [0].revents & ZMQ_POLLIN) { // POLLING
			// The DEALER socket gives us the reply envelope and message
			rc = zmq_recv (worker, identity, ID_SIZE_MAX, ZMQ_DONTWAIT); // if we don't poll, we have to use ZMQ_DONTWAIT, if we poll, we can block-receive with 0
			if (rc == ID_SIZE) {
				rc = zmq_recv (worker, content, CONTENT_SIZE_MAX, 0);
				assert (rc == CONTENT_SIZE);
				if (is_verbose) printf("server receive - identity = %s    content = %s\n", identity, content);

				// Send 0..4 replies back
				int reply, replies = rand() % 5;
				for (reply = 0; reply < replies; reply++) {
					// Sleep for some fraction of a second
					msleep (rand () % 10 + 1);
					//  Send message from server to client
					rc = zmq_send (worker, identity, ID_SIZE, ZMQ_SNDMORE);
					assert (rc == ID_SIZE);
					rc = zmq_send (worker, content, CONTENT_SIZE, 0);
					assert (rc == CONTENT_SIZE);
				}
			}
//		} // POLLING
	}
    rc = zmq_close (worker);
    assert (rc == 0);
    rc = zmq_close (control);
    assert (rc == 0);
}

// The main thread simply starts several clients and a server, and then
// waits for the server to finish.

int main (void)
{
    setup_test_environment ();

    void *ctx = zmq_ctx_new ();
    assert (ctx);
	// Control socket receives terminate command from main over inproc
    void *control = zmq_socket (ctx, ZMQ_PUB);
    assert (control);
    int rc = zmq_bind (control, "inproc://control");
    assert (rc == 0);

    void* threads [QT_CLIENTS + 1];
	for (int i = 0; i < QT_CLIENTS; i++)
	{
		threads[i] = zmq_threadstart  (&client_task, ctx);
	}
	threads[QT_CLIENTS] = zmq_threadstart  (&server_task, ctx);
	msleep (500); // Run for 500 ms then quit

	rc = zmq_send (control, "TERMINATE", 10, 0);
	assert (rc == 10);

	// clean everything
    rc = zmq_close (control);
    assert (rc == 0);
	//msleep (1000); // not sure it is usefull

	for (int i = 0; i < QT_CLIENTS + 1; i++)
		zmq_threadclose (threads[i]);
    rc = zmq_ctx_term (ctx);
    assert (rc == 0);
	return 0;
}