Commit 176879e5 authored by Martin Sustrik's avatar Martin Sustrik

engine virtualised; chatroom example removed

parent 1d650934
COPYING
INSTALL
*.m4
*.o *.o
*.lo *.lo
*.loT
*.la *.la
.*
include_HEADERS = include/zmq.h include/zmq.hpp include_HEADERS = include/zmq.h include/zmq.hpp
if BUILD_PYTHON if BUILD_PYTHON
DIR_P = python DIR_P = python
endif endif
...@@ -9,5 +8,5 @@ if BUILD_RUBY ...@@ -9,5 +8,5 @@ if BUILD_RUBY
DIR_R = ruby DIR_R = ruby
endif endif
SUBDIRS = src examples $(DIR_P) $(DIR_R) SUBDIRS = src $(DIR_P) $(DIR_R)
DIST_SUBDIRS = src examples $(DIR_P) $(DIR_R) DIST_SUBDIRS = src $(DIR_P) $(DIR_R)
...@@ -275,8 +275,7 @@ AC_FUNC_MALLOC ...@@ -275,8 +275,7 @@ AC_FUNC_MALLOC
AC_TYPE_SIGNAL AC_TYPE_SIGNAL
AC_CHECK_FUNCS(perror gettimeofday memset socket getifaddrs freeifaddrs) AC_CHECK_FUNCS(perror gettimeofday memset socket getifaddrs freeifaddrs)
AC_OUTPUT(Makefile src/Makefile examples/Makefile examples/chat/Makefile python/Makefile \ AC_OUTPUT(Makefile src/Makefile python/Makefile python/setup.py ruby/Makefile)
python/setup.py ruby/Makefile)
AC_MSG_RESULT([]) AC_MSG_RESULT([])
AC_MSG_RESULT([ ******************************************************** ]) AC_MSG_RESULT([ ******************************************************** ])
......
SUBDIRS = chat
DIST_SUBDIRS = chat
INCLUDES = -I$(top_builddir) -I$(top_builddir)/include
noinst_PROGRAMS = chatroom display prompt
chatroom_SOURCES = chatroom.cpp
chatroom_LDADD = $(top_builddir)/src/libzmq.la
chatroom_CXXFLAGS = -Wall -pedantic -Werror
display_SOURCES = display.cpp
display_LDADD = $(top_builddir)/src/libzmq.la
display_CXXFLAGS = -Wall -pedantic -Werror
prompt_SOURCES = prompt.cpp
prompt_LDADD = $(top_builddir)/src/libzmq.la
prompt_CXXFLAGS = -Wall -pedantic -Werror
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <time.h>
#include <string.h>
#include <iostream>
using namespace std;
#include <zmq.hpp>
int main (int argc, const char *argv [])
{
// Check the command line syntax
if (argc != 3) {
cerr << "usage: chatroom <in-interface> <out-interface>" << endl;
return 1;
}
// Retrieve command line arguments
const char *in_interface = argv [1];
const char *out_interface = argv [2];
// Initialise 0MQ infrastructure
zmq::context_t ctx (1, 1);
// Create two sockets. One for receiving messages from 'propmt'
// applications, one for sending messages to 'display' applications
zmq::socket_t in_socket (ctx, ZMQ_SUB);
in_socket.bind (in_interface);
zmq::socket_t out_socket (ctx, ZMQ_PUB);
out_socket.bind (out_interface);
while (true) {
// Get a message
zmq::message_t in_message;
in_socket.recv (&in_message);
// Get the current time. Replace the newline character at the end
// by space character.
char timebuf [256];
time_t current_time;
time (&current_time);
snprintf (timebuf, 256, "%s", ctime (&current_time));
timebuf [strlen (timebuf) - 1] = ' ';
// Create and fill in the message
zmq::message_t out_message (strlen (timebuf) + in_message.size ());
char *data = (char*) out_message.data ();
memcpy (data, timebuf, strlen (timebuf));
data += strlen (timebuf);
memcpy (data, in_message.data (), in_message.size ());
// Send the message
out_socket.send (out_message);
}
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <string.h>
#include <string>
#include <iostream>
using namespace std;
#include <zmq.hpp>
int main (int argc, const char *argv [])
{
// Check the command line syntax.
if (argc != 2) {
cerr << "usage: display <chatroom-out-address>" << endl;
return 1;
}
// Retrieve command line arguments
const char *chatroom_out_address = argv [1];
// Initialise 0MQ infrastructure, connect to the chatroom and ask for all
// messages and gap notifications.
zmq::context_t ctx (1, 1);
zmq::socket_t s (ctx, ZMQ_SUB);
s.connect (chatroom_out_address);
while (true) {
// Get a message and print it to the console.
zmq::message_t message;
s.recv (&message);
if (message.type () == zmq::message_gap)
cout << "Problems connecting to the chatroom..." << endl;
else
cout << (char*) message.data () << flush;
}
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <string.h>
#include <string>
#include <iostream>
using namespace std;
#include <zmq.hpp>
int main (int argc, const char *argv [])
{
// Check the command line syntax.
if (argc != 3) {
cerr << "usage: prompt <chatroom-in-address> <user name>" << endl;
return 1;
}
// Retrieve command line arguments
const char *chatroom_in_address = argv [1];
const char *user_name = argv [2];
// Initialise 0MQ infrastructure and connect to the chatroom.
zmq::context_t ctx (1, 1);
zmq::socket_t s (ctx, ZMQ_PUB);
s.connect (chatroom_in_address);
while (true) {
// Allow user to input the message text. Prepend it by user name.
char textbuf [1024];
char *rcc = fgets (textbuf, sizeof (textbuf), stdin);
assert (rcc);
string text (user_name);
text = text + ": " + textbuf;
// Create the message (terminating zero is part of the message)
zmq::message_t message (text.size () + 1);
memcpy (message.data (), text.c_str (), text.size () + 1);
// Send the message
s.send (message);
}
}
...@@ -20,6 +20,7 @@ libzmq_la_SOURCES = \ ...@@ -20,6 +20,7 @@ libzmq_la_SOURCES = \
io_thread.hpp \ io_thread.hpp \
ip.hpp \ ip.hpp \
i_endpoint.hpp \ i_endpoint.hpp \
i_engine.hpp \
i_poller.hpp \ i_poller.hpp \
i_poll_events.hpp \ i_poll_events.hpp \
i_signaler.hpp \ i_signaler.hpp \
......
...@@ -65,7 +65,7 @@ namespace zmq ...@@ -65,7 +65,7 @@ namespace zmq
// Attach the engine to the session. // Attach the engine to the session.
struct { struct {
class zmq_engine_t *engine; struct i_engine *engine;
} attach; } attach;
// Sent from session to socket to establish pipe(s) between them. // Sent from session to socket to establish pipe(s) between them.
......
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_I_ENGINE_HPP_INCLUDED__
#define __ZMQ_I_ENGINE_HPP_INCLUDED__
namespace zmq
{
struct i_engine
{
virtual ~i_engine () {}
// Plug the engine to the session.
virtual void plug (struct i_inout *inout_) = 0;
// Unplug the engine from the session.
virtual void unplug () = 0;
// This method is called by the session to signalise that there
// are messages to send available.
virtual void revive () = 0;
};
}
#endif
...@@ -26,7 +26,6 @@ ...@@ -26,7 +26,6 @@
#include "owned.hpp" #include "owned.hpp"
#include "session.hpp" #include "session.hpp"
#include "socket_base.hpp" #include "socket_base.hpp"
#include "zmq_engine.hpp" // TODO: remove this line
zmq::object_t::object_t (dispatcher_t *dispatcher_, int thread_slot_) : zmq::object_t::object_t (dispatcher_t *dispatcher_, int thread_slot_) :
dispatcher (dispatcher_), dispatcher (dispatcher_),
...@@ -153,7 +152,7 @@ void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_) ...@@ -153,7 +152,7 @@ void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_)
send_command (cmd); send_command (cmd);
} }
void zmq::object_t::send_attach (session_t *destination_, zmq_engine_t *engine_) void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_)
{ {
// The assumption here is that command sequence number of the destination // The assumption here is that command sequence number of the destination
// object was already incremented in find_session function. // object was already incremented in find_session function.
...@@ -241,7 +240,7 @@ void zmq::object_t::process_own (owned_t *object_) ...@@ -241,7 +240,7 @@ void zmq::object_t::process_own (owned_t *object_)
zmq_assert (false); zmq_assert (false);
} }
void zmq::object_t::process_attach (zmq_engine_t *engine_) void zmq::object_t::process_attach (i_engine *engine_)
{ {
zmq_assert (false); zmq_assert (false);
} }
......
...@@ -60,7 +60,7 @@ namespace zmq ...@@ -60,7 +60,7 @@ namespace zmq
void send_own (class socket_base_t *destination_, void send_own (class socket_base_t *destination_,
class owned_t *object_); class owned_t *object_);
void send_attach (class session_t *destination_, void send_attach (class session_t *destination_,
class zmq_engine_t *engine_); struct i_engine *engine_);
void send_bind (object_t *destination_, class owned_t *session_, void send_bind (object_t *destination_, class owned_t *session_,
class reader_t *in_pipe_, class writer_t *out_pipe_); class reader_t *in_pipe_, class writer_t *out_pipe_);
void send_revive (class object_t *destination_); void send_revive (class object_t *destination_);
...@@ -76,7 +76,7 @@ namespace zmq ...@@ -76,7 +76,7 @@ namespace zmq
virtual void process_stop (); virtual void process_stop ();
virtual void process_plug (); virtual void process_plug ();
virtual void process_own (class owned_t *object_); virtual void process_own (class owned_t *object_);
virtual void process_attach (class zmq_engine_t *engine_); virtual void process_attach (struct i_engine *engine_);
virtual void process_bind (class owned_t *session_, virtual void process_bind (class owned_t *session_,
class reader_t *in_pipe_, class writer_t *out_pipe_); class reader_t *in_pipe_, class writer_t *out_pipe_);
virtual void process_revive (); virtual void process_revive ();
......
...@@ -47,7 +47,7 @@ void zmq::owned_t::process_plug () ...@@ -47,7 +47,7 @@ void zmq::owned_t::process_plug ()
finalise_command (); finalise_command ();
} }
void zmq::owned_t::process_attach (zmq_engine_t *engine_) void zmq::owned_t::process_attach (struct i_engine *engine_)
{ {
// Keep track of how many commands were processed so far. // Keep track of how many commands were processed so far.
processed_seqnum++; processed_seqnum++;
......
...@@ -61,7 +61,7 @@ namespace zmq ...@@ -61,7 +61,7 @@ namespace zmq
// It's vital that session invokes io_object_t::process_attach // It's vital that session invokes io_object_t::process_attach
// at the end of it's own attach handler. // at the end of it's own attach handler.
void process_attach (class zmq_engine_t *engine_); void process_attach (struct i_engine *engine_);
// io_object_t defines a new handler used to disconnect the object // io_object_t defines a new handler used to disconnect the object
// from the poller object. Implement the handlen in the derived // from the poller object. Implement the handlen in the derived
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
*/ */
#include "session.hpp" #include "session.hpp"
#include "zmq_engine.hpp" #include "i_engine.hpp"
#include "err.hpp" #include "err.hpp"
#include "pipe.hpp" #include "pipe.hpp"
...@@ -149,7 +149,7 @@ void zmq::session_t::process_unplug () ...@@ -149,7 +149,7 @@ void zmq::session_t::process_unplug ()
} }
} }
void zmq::session_t::process_attach (class zmq_engine_t *engine_) void zmq::session_t::process_attach (i_engine *engine_)
{ {
zmq_assert (engine_); zmq_assert (engine_);
engine = engine_; engine = engine_;
......
...@@ -58,7 +58,7 @@ namespace zmq ...@@ -58,7 +58,7 @@ namespace zmq
// Handlers for incoming commands. // Handlers for incoming commands.
void process_plug (); void process_plug ();
void process_unplug (); void process_unplug ();
void process_attach (class zmq_engine_t *engine_); void process_attach (struct i_engine *engine_);
// Inbound pipe, i.e. one the session is getting messages from. // Inbound pipe, i.e. one the session is getting messages from.
class reader_t *in_pipe; class reader_t *in_pipe;
...@@ -69,7 +69,7 @@ namespace zmq ...@@ -69,7 +69,7 @@ namespace zmq
// Outbound pipe, i.e. one the socket is sending messages to. // Outbound pipe, i.e. one the socket is sending messages to.
class writer_t *out_pipe; class writer_t *out_pipe;
class zmq_engine_t *engine; struct i_engine *engine;
// The name of the session. One that is used to register it with // The name of the session. One that is used to register it with
// socket-level repository of sessions. // socket-level repository of sessions.
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#ifndef __ZMQ_ZMQ_ENGINE_HPP_INCLUDED__ #ifndef __ZMQ_ZMQ_ENGINE_HPP_INCLUDED__
#define __ZMQ_ZMQ_ENGINE_HPP_INCLUDED__ #define __ZMQ_ZMQ_ENGINE_HPP_INCLUDED__
#include "i_engine.hpp"
#include "io_object.hpp" #include "io_object.hpp"
#include "tcp_socket.hpp" #include "tcp_socket.hpp"
#include "zmq_encoder.hpp" #include "zmq_encoder.hpp"
...@@ -28,24 +29,22 @@ ...@@ -28,24 +29,22 @@
namespace zmq namespace zmq
{ {
class zmq_engine_t : public io_object_t class zmq_engine_t : public io_object_t, public i_engine
{ {
public: public:
zmq_engine_t (class io_thread_t *parent_, fd_t fd_); zmq_engine_t (class io_thread_t *parent_, fd_t fd_);
~zmq_engine_t (); ~zmq_engine_t ();
// i_engine interface implementation.
void plug (struct i_inout *inout_); void plug (struct i_inout *inout_);
void unplug (); void unplug ();
void revive ();
// i_poll_events interface implementation. // i_poll_events interface implementation.
void in_event (); void in_event ();
void out_event (); void out_event ();
// This method is called by the session to signalise that there
// are messages to send available.
void revive ();
private: private:
// Function to handle network disconnections. // Function to handle network disconnections.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment