object.hpp 4.42 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2010 iMatix Corporation
Martin Sustrik's avatar
Martin Sustrik committed
3 4 5 6

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
7
    the terms of the GNU Lesser General Public License as published by
Martin Sustrik's avatar
Martin Sustrik committed
8 9 10 11 12 13
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
    GNU Lesser General Public License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
15

16
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
17 18 19
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

Martin Sustrik's avatar
Martin Sustrik committed
20 21
#ifndef __ZMQ_OBJECT_HPP_INCLUDED__
#define __ZMQ_OBJECT_HPP_INCLUDED__
Martin Sustrik's avatar
Martin Sustrik committed
22

23 24
#include "../include/zmq.h"

Martin Sustrik's avatar
Martin Sustrik committed
25
#include "stdint.hpp"
26
#include "blob.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
27

Martin Sustrik's avatar
Martin Sustrik committed
28
namespace zmq
Martin Sustrik's avatar
Martin Sustrik committed
29 30 31 32 33 34 35 36
{
    //  Base class for all objects that participate in inter-thread
    //  communication.

    class object_t
    {
    public:

Martin Sustrik's avatar
Martin Sustrik committed
37
        object_t (class ctx_t *ctx_, uint32_t tid_);
Martin Sustrik's avatar
Martin Sustrik committed
38
        object_t (object_t *parent_);
malosek's avatar
malosek committed
39
        virtual ~object_t ();
Martin Sustrik's avatar
Martin Sustrik committed
40

Martin Sustrik's avatar
Martin Sustrik committed
41
        uint32_t get_tid ();
42
        ctx_t *get_ctx ();
Martin Sustrik's avatar
Martin Sustrik committed
43 44 45 46
        void process_command (struct command_t &cmd_);

    protected:

47 48 49 50 51 52
        //  Using following function, socket is able to access global
        //  repository of inproc endpoints.
        int register_endpoint (const char *addr_, class socket_base_t *socket_);
        void unregister_endpoints (class socket_base_t *socket_);
        class socket_base_t *find_endpoint (const char *addr_);

53 54 55
        //  Logs an message.
        void log (zmq_msg_t *msg_);

56
        //  Chooses least loaded I/O thread.
57
        class io_thread_t *choose_io_thread (uint64_t affinity_);
Martin Sustrik's avatar
Martin Sustrik committed
58

59 60
        //  Zombify particular socket. In other words, pass the ownership to
        //  the context.
61
        void zombify_socket (class socket_base_t *socket_);
62

Martin Sustrik's avatar
Martin Sustrik committed
63 64 65
        //  Derived object can use these functions to send commands
        //  to other objects.
        void send_stop ();
66 67 68 69
        void send_plug (class own_t *destination_,
            bool inc_seqnum_ = true);
        void send_own (class own_t *destination_,
            class own_t *object_);
70
        void send_attach (class session_t *destination_,
71 72
             struct i_engine *engine_, const blob_t &peer_identity_,
             bool inc_seqnum_ = true);
73
        void send_bind (class own_t *destination_,
74
             class reader_t *in_pipe_, class writer_t *out_pipe_,
75
             const blob_t &peer_identity_, bool inc_seqnum_ = true);
76 77
        void send_activate_reader (class reader_t *destination_);
        void send_activate_writer (class writer_t *destination_,
Martin Hurton's avatar
Martin Hurton committed
78
             uint64_t msgs_read_);
Martin Sustrik's avatar
Martin Sustrik committed
79 80
        void send_pipe_term (class writer_t *destination_);
        void send_pipe_term_ack (class reader_t *destination_);
81 82
        void send_term_req (class own_t *destination_,
            class own_t *object_);
83
        void send_term (class own_t *destination_, int linger_);
84
        void send_term_ack (class own_t *destination_);
Martin Sustrik's avatar
Martin Sustrik committed
85 86 87 88

        //  These handlers can be overloaded by the derived objects. They are
        //  called when command arrives from another thread.
        virtual void process_stop ();
89
        virtual void process_plug ();
90
        virtual void process_own (class own_t *object_);
91
        virtual void process_attach (struct i_engine *engine_,
92
            const blob_t &peer_identity_);
93
        virtual void process_bind (class reader_t *in_pipe_,
94
            class writer_t *out_pipe_, const blob_t &peer_identity_);
95 96
        virtual void process_activate_reader ();
        virtual void process_activate_writer (uint64_t msgs_read_);
Martin Sustrik's avatar
Martin Sustrik committed
97 98
        virtual void process_pipe_term ();
        virtual void process_pipe_term_ack ();
99
        virtual void process_term_req (class own_t *object_);
100
        virtual void process_term (int linger_);
101
        virtual void process_term_ack ();
Martin Sustrik's avatar
Martin Sustrik committed
102

103 104 105 106 107
        //  Special handler called after a command that requires a seqnum
        //  was processed. The implementation should catch up with its counter
        //  of processed commands here.
        virtual void process_seqnum ();

108 109
    private:

110 111
        //  Context provides access to the global state.
        class ctx_t *ctx;
Martin Sustrik's avatar
Martin Sustrik committed
112

Martin Sustrik's avatar
Martin Sustrik committed
113 114
        //  Thread ID of the thread the object belongs to.
        uint32_t tid;
Martin Sustrik's avatar
Martin Sustrik committed
115 116 117 118 119 120 121 122 123 124

        void send_command (command_t &cmd_);

        object_t (const object_t&);
        void operator = (const object_t&);
    };

}

#endif