ipc_listener.cpp 8.07 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

20 21 22 23
#include "ipc_listener.hpp"

#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS

24 25 26 27
#include <new>

#include <string.h>

28
#include "stream_engine.hpp"
29
#include "ipc_address.hpp"
30
#include "io_thread.hpp"
31
#include "session_base.hpp"
32 33
#include "config.hpp"
#include "err.hpp"
34
#include "ip.hpp"
35
#include "socket_base.hpp"
36 37 38 39 40 41

#include <unistd.h>
#include <sys/socket.h>
#include <fcntl.h>
#include <sys/un.h>

42 43 44 45 46 47 48 49
#if defined ZMQ_HAVE_SO_PEERCRED || defined ZMQ_HAVE_LOCAL_PEERCRED
#   include <sys/types.h>
#endif
#ifdef ZMQ_HAVE_SO_PEERCRED
#   include <pwd.h>
#   include <grp.h>
#endif

50 51 52 53 54 55 56 57 58 59 60 61
zmq::ipc_listener_t::ipc_listener_t (io_thread_t *io_thread_,
      socket_base_t *socket_, const options_t &options_) :
    own_t (io_thread_, options_),
    io_object_t (io_thread_),
    has_file (false),
    s (retired_fd),
    socket (socket_)
{
}

zmq::ipc_listener_t::~ipc_listener_t ()
{
62
    zmq_assert (s == retired_fd);
63 64 65 66 67 68 69 70 71 72 73 74
}

void zmq::ipc_listener_t::process_plug ()
{
    //  Start polling for incoming connections.
    handle = add_fd (s);
    set_pollin (handle);
}

void zmq::ipc_listener_t::process_term (int linger_)
{
    rm_fd (handle);
75
    close ();
76 77 78 79 80 81 82 83 84
    own_t::process_term (linger_);
}

void zmq::ipc_listener_t::in_event ()
{
    fd_t fd = accept ();

    //  If connection was reset by the peer in the meantime, just ignore it.
    //  TODO: Handle specific errors like ENFILE/EMFILE etc.
85
    if (fd == retired_fd) {
86
        socket->event_accept_failed (endpoint, zmq_errno());
87
        return;
88
    }
89 90

    //  Create the engine object for this connection.
91
    stream_engine_t *engine = new (std::nothrow)
92
        stream_engine_t (fd, options, endpoint);
93 94 95 96 97 98 99 100
    alloc_assert (engine);

    //  Choose I/O thread to run connecter in. Given that we are already
    //  running in an I/O thread, there must be at least one available.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    zmq_assert (io_thread);

    //  Create and launch a session object. 
101
    session_base_t *session = session_base_t::create (io_thread, false, socket,
102
        options, NULL);
103
    errno_assert (session);
104 105 106
    session->inc_seqnum ();
    launch_child (session);
    send_attach (session, engine, false);
107
    socket->event_accepted (endpoint, fd);
108 109
}

110
int zmq::ipc_listener_t::get_address (std::string &addr_)
111
{
112
    struct sockaddr_storage ss;
AJ Lewis's avatar
AJ Lewis committed
113 114 115
#ifdef ZMQ_HAVE_HPUX
    int sl = sizeof (ss);
#else
116
    socklen_t sl = sizeof (ss);
AJ Lewis's avatar
AJ Lewis committed
117
#endif
118
    int rc = getsockname (s, (sockaddr *) &ss, &sl);
119
    if (rc != 0) {
120
        addr_.clear ();
121
        return rc;
122
    }
Mikko Koppanen's avatar
Mikko Koppanen committed
123

124 125
    ipc_address_t addr ((struct sockaddr *) &ss, sl);
    return addr.to_string (addr_);
126 127
}

128
int zmq::ipc_listener_t::set_address (const char *addr_)
129
{
130 131 132 133
    //  Create addr on stack for auto-cleanup
    std::string addr (addr_);

    //  Allow wildcard file
134 135
    if (addr [0] == '*') {
        char buffer [12] = "2134XXXXXX";
136 137
        int fd = mkstemp (buffer);
        if (fd == -1)
138 139
            return -1;
        addr.assign (buffer);
140
        ::close (fd);
141
    }
142

143 144
    //  Get rid of the file associated with the UNIX domain socket that
    //  may have been left behind by the previous run of the application.
145
    ::unlink (addr.c_str());
146
    filename.clear ();
147

148 149
    //  Initialise the address structure.
    ipc_address_t address;
150
    int rc = address.resolve (addr.c_str());
151
    if (rc != 0)
152
        return -1;
153 154

    //  Create a listening socket.
155
    s = open_socket (AF_UNIX, SOCK_STREAM, 0);
156
    if (s == -1)
157 158
        return -1;

159 160
    address.to_string (endpoint);

161
    //  Bind the socket to the file path.
162
    rc = bind (s, address.addr (), address.addrlen ());
163
    if (rc != 0)
164
        goto error;
165

166
    filename.assign (addr.c_str());
167
    has_file = true;
168

169
    //  Listen for incoming connections.
170
    rc = listen (s, options.backlog);
171
    if (rc != 0)
172
        goto error;
173

174
    socket->event_listening (endpoint, s);
175
    return 0;
176 177 178 179 180 181

error:
    int err = errno;
    close ();
    errno = err;
    return -1;
182 183 184 185 186 187
}

int zmq::ipc_listener_t::close ()
{
    zmq_assert (s != retired_fd);
    int rc = ::close (s);
188
    errno_assert (rc == 0);
189

190 191
    s = retired_fd;

192 193
    //  If there's an underlying UNIX domain socket, get rid of the file it
    //  is associated with.
194 195
    if (has_file && !filename.empty ()) {
        rc = ::unlink(filename.c_str ());
196
        if (rc != 0) {
197
            socket->event_close_failed (endpoint, zmq_errno());
198
            return -1;
199
        }
200 201
    }

202
    socket->event_closed (endpoint, s);
203 204 205
    return 0;
}

206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
#if defined ZMQ_HAVE_SO_PEERCRED

bool zmq::ipc_listener_t::filter (fd_t sock)
{
    if (options.ipc_uid_accept_filters.empty () &&
        options.ipc_pid_accept_filters.empty () &&
        options.ipc_gid_accept_filters.empty ())
        return true;

    struct ucred cred;
    socklen_t size = sizeof (cred);

    if (getsockopt (sock, SOL_SOCKET, SO_PEERCRED, &cred, &size))
        return false;
    if (options.ipc_uid_accept_filters.find (cred.uid) != options.ipc_uid_accept_filters.end () ||
221
            options.ipc_gid_accept_filters.find (cred.gid) != options.ipc_gid_accept_filters.end () ||
222 223 224 225 226 227 228 229 230 231 232 233
            options.ipc_pid_accept_filters.find (cred.pid) != options.ipc_pid_accept_filters.end ())
        return true;

    struct passwd *pw;
    struct group *gr;

    if (!(pw = getpwuid (cred.uid)))
        return false;
    for (options_t::ipc_gid_accept_filters_t::const_iterator it = options.ipc_gid_accept_filters.begin ();
            it != options.ipc_gid_accept_filters.end (); it++) {
        if (!(gr = getgrgid (*it)))
            continue;
234
        for (char **mem = gr->gr_mem; *mem; mem++) {
235 236
            if (!strcmp (*mem, pw->pw_name))
                return true;
237
        }
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268
    }
    return false;
}

#elif defined ZMQ_HAVE_LOCAL_PEERCRED

bool zmq::ipc_listener_t::filter (fd_t sock)
{
    if (options.ipc_uid_accept_filters.empty () &&
        options.ipc_gid_accept_filters.empty ())
        return true;

    struct xucred cred;
    socklen_t size = sizeof (cred);

    if (getsockopt (sock, 0, LOCAL_PEERCRED, &cred, &size))
        return false;
    if (cred.cr_version != XUCRED_VERSION)
        return false;
    if (options.ipc_uid_accept_filters.find (cred.cr_uid) != options.ipc_uid_accept_filters.end ())
        return true;
    for (int i = 0; i < cred.cr_ngroups; i++) {
        if (options.ipc_gid_accept_filters.find (cred.cr_groups[i]) != options.ipc_gid_accept_filters.end ())
            return true;
    }

    return false;
}

#endif

269 270
zmq::fd_t zmq::ipc_listener_t::accept ()
{
271
    //  Accept one connection and deal with different failure modes.
272 273
    //  The situation where connection cannot be accepted due to insufficient
    //  resources is considered valid and treated by ignoring the connection.
274 275
    zmq_assert (s != retired_fd);
    fd_t sock = ::accept (s, NULL, NULL);
276 277
    if (sock == -1) {
        errno_assert (errno == EAGAIN || errno == EWOULDBLOCK ||
278
            errno == EINTR || errno == ECONNABORTED || errno == EPROTO ||
279
            errno == ENFILE);
280
        return retired_fd;
281
    }
282

283 284 285 286 287 288 289
    //  Race condition can cause socket not to be closed (if fork happens
    //  between accept and this point).
#ifdef FD_CLOEXEC
    int rc = fcntl (sock, F_SETFD, FD_CLOEXEC);
    errno_assert (rc != -1);
#endif

290 291 292 293 294 295 296 297 298
    // IPC accept() filters
#if defined ZMQ_HAVE_SO_PEERCRED || defined ZMQ_HAVE_LOCAL_PEERCRED
    if (!filter (sock)) {
        int rc = ::close (sock);
        errno_assert (rc == 0);
        return retired_fd;
    }
#endif

299 300 301 302
    return sock;
}

#endif