object.hpp 4.38 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2010 iMatix Corporation
Martin Sustrik's avatar
Martin Sustrik committed
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the Lesser GNU General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    Lesser GNU General Public License for more details.

    You should have received a copy of the Lesser GNU General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

Martin Sustrik's avatar
Martin Sustrik committed
20 21
#ifndef __ZMQ_OBJECT_HPP_INCLUDED__
#define __ZMQ_OBJECT_HPP_INCLUDED__
Martin Sustrik's avatar
Martin Sustrik committed
22 23

#include "stdint.hpp"
24
#include "blob.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
25

Martin Sustrik's avatar
Martin Sustrik committed
26
namespace zmq
Martin Sustrik's avatar
Martin Sustrik committed
27 28 29 30 31 32 33 34
{
    //  Base class for all objects that participate in inter-thread
    //  communication.

    class object_t
    {
    public:

35
        object_t (class dispatcher_t *dispatcher_, uint32_t thread_slot_);
Martin Sustrik's avatar
Martin Sustrik committed
36
        object_t (object_t *parent_);
malosek's avatar
malosek committed
37
        virtual ~object_t ();
Martin Sustrik's avatar
Martin Sustrik committed
38

39
        uint32_t get_thread_slot ();
40
        dispatcher_t *get_dispatcher ();
Martin Sustrik's avatar
Martin Sustrik committed
41 42
        void process_command (struct command_t &cmd_);

Martin Sustrik's avatar
Martin Sustrik committed
43 44 45 46
        //  Allow pipe to access corresponding dispatcher functions.
        void register_pipe (class pipe_t *pipe_);
        void unregister_pipe (class pipe_t *pipe_);

Martin Sustrik's avatar
Martin Sustrik committed
47 48
    protected:

49 50 51 52 53 54
        //  Using following function, socket is able to access global
        //  repository of inproc endpoints.
        int register_endpoint (const char *addr_, class socket_base_t *socket_);
        void unregister_endpoints (class socket_base_t *socket_);
        class socket_base_t *find_endpoint (const char *addr_);

55
        //  Chooses least loaded I/O thread.
Martin Sustrik's avatar
Martin Sustrik committed
56 57 58 59 60
        class io_thread_t *choose_io_thread (uint64_t taskset_);

        //  Derived object can use these functions to send commands
        //  to other objects.
        void send_stop ();
61
        void send_plug (class owned_t *destination_, bool inc_seqnum_ = true);
62 63 64
        void send_own (class socket_base_t *destination_,
            class owned_t *object_);
        void send_attach (class session_t *destination_,
65 66
             struct i_engine *engine_, const blob_t &peer_identity_,
             bool inc_seqnum_ = true);
67
        void send_bind (class socket_base_t *destination_,
68
             class reader_t *in_pipe_, class writer_t *out_pipe_,
69
             const blob_t &peer_identity_, bool inc_seqnum_ = true);
Martin Sustrik's avatar
Martin Sustrik committed
70
        void send_revive (class object_t *destination_);
Martin Hurton's avatar
Martin Hurton committed
71 72
        void send_reader_info (class writer_t *destination_,
             uint64_t msgs_read_);
Martin Sustrik's avatar
Martin Sustrik committed
73 74
        void send_pipe_term (class writer_t *destination_);
        void send_pipe_term_ack (class reader_t *destination_);
75 76 77 78
        void send_term_req (class socket_base_t *destination_,
            class owned_t *object_);
        void send_term (class owned_t *destination_);
        void send_term_ack (class socket_base_t *destination_);
Martin Sustrik's avatar
Martin Sustrik committed
79 80 81 82

        //  These handlers can be overloaded by the derived objects. They are
        //  called when command arrives from another thread.
        virtual void process_stop ();
83
        virtual void process_plug ();
84
        virtual void process_own (class owned_t *object_);
85
        virtual void process_attach (struct i_engine *engine_,
86
            const blob_t &peer_identity_);
87
        virtual void process_bind (class reader_t *in_pipe_,
88
            class writer_t *out_pipe_, const blob_t &peer_identity_);
Martin Sustrik's avatar
Martin Sustrik committed
89
        virtual void process_revive ();
Martin Hurton's avatar
Martin Hurton committed
90
        virtual void process_reader_info (uint64_t msgs_read_);
Martin Sustrik's avatar
Martin Sustrik committed
91 92
        virtual void process_pipe_term ();
        virtual void process_pipe_term_ack ();
93
        virtual void process_term_req (class owned_t *object_);
94 95
        virtual void process_term ();
        virtual void process_term_ack ();
Martin Sustrik's avatar
Martin Sustrik committed
96

97 98 99 100 101
        //  Special handler called after a command that requires a seqnum
        //  was processed. The implementation should catch up with its counter
        //  of processed commands here.
        virtual void process_seqnum ();

Martin Sustrik's avatar
Martin Sustrik committed
102
        //  Pointer to the root of the infrastructure.
103
        class dispatcher_t *dispatcher;
Martin Sustrik's avatar
Martin Sustrik committed
104 105

        //  Slot ID of the thread the object belongs to.
106
        uint32_t thread_slot;
Martin Sustrik's avatar
Martin Sustrik committed
107 108 109 110 111 112 113 114 115 116 117 118

    private:

        void send_command (command_t &cmd_);

        object_t (const object_t&);
        void operator = (const object_t&);
    };

}

#endif