ipc_listener.cpp 12.4 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31 32 33 34
#include "ipc_listener.hpp"

#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS

35 36 37 38
#include <new>

#include <string.h>

39
#include "stream_engine.hpp"
40
#include "ipc_address.hpp"
41
#include "io_thread.hpp"
42
#include "session_base.hpp"
43 44
#include "config.hpp"
#include "err.hpp"
45
#include "ip.hpp"
46
#include "socket_base.hpp"
47 48 49 50 51

#include <unistd.h>
#include <sys/socket.h>
#include <fcntl.h>
#include <sys/un.h>
52
#include <sys/stat.h>
53

54
#ifdef ZMQ_HAVE_LOCAL_PEERCRED
55
#   include <sys/types.h>
56
#   include <sys/ucred.h>
57 58
#endif
#ifdef ZMQ_HAVE_SO_PEERCRED
59
#   include <sys/types.h>
60 61
#   include <pwd.h>
#   include <grp.h>
62 63 64
#   if defined ZMQ_HAVE_OPENBSD
#       define ucred sockpeercred
#   endif
65 66
#endif

67 68 69 70 71 72 73
const char *zmq::ipc_listener_t::tmp_env_vars[] = {
  "TMPDIR",
  "TEMPDIR",
  "TMP",
  0  // Sentinel
};

74 75
int zmq::ipc_listener_t::create_wildcard_address(std::string& path_,
        std::string& file_)
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
{
    std::string tmp_path;

    // If TMPDIR, TEMPDIR, or TMP are available and are directories, create
    // the socket directory there.
    const char **tmp_env = tmp_env_vars;
    while ( tmp_path.empty() && *tmp_env != 0 ) {
        char *tmpdir = getenv(*tmp_env);
        struct stat statbuf;

        // Confirm it is actually a directory before trying to use
        if ( tmpdir != 0 && ::stat(tmpdir, &statbuf) == 0 && S_ISDIR(statbuf.st_mode) ) {
            tmp_path.assign(tmpdir);
            if ( *(tmp_path.rbegin()) != '/' ) {
                tmp_path.push_back('/');
            }
        }

        // Try the next environment variable
        ++tmp_env;
    }

    // Append a directory name
    tmp_path.append("tmpXXXXXX");

    // We need room for tmp_path + trailing NUL
    std::vector<char> buffer(tmp_path.length()+1);
103
    strcpy (&buffer[0], tmp_path.c_str ());
104

105
#ifdef HAVE_MKDTEMP
106 107 108 109 110 111
    // Create the directory.  POSIX requires that mkdtemp() creates the
    // directory with 0700 permissions, meaning the only possible race
    // with socket creation could be the same user.  However, since
    // each socket is created in a directory created by mkdtemp(), and
    // mkdtemp() guarantees a unique directory name, there will be no
    // collision.
112
    if (mkdtemp (&buffer[0]) == 0) {
113 114 115
        return -1;
    }

116
    path_.assign (&buffer[0]);
117 118 119 120 121
    file_.assign (path_ + "/socket");
#else
    // Silence -Wunused-parameter. #pragma and __attribute__((unused)) are not
    // very portable unfortunately...
    (void) path_;
122
    int fd = mkstemp (&buffer[0]);
123 124 125 126
    if (fd == -1)
         return -1;
    ::close (fd);

127
    file_.assign (&buffer[0]);
128
#endif
129 130 131 132

    return 0;
}

133 134 135 136 137 138 139 140 141 142 143 144
zmq::ipc_listener_t::ipc_listener_t (io_thread_t *io_thread_,
      socket_base_t *socket_, const options_t &options_) :
    own_t (io_thread_, options_),
    io_object_t (io_thread_),
    has_file (false),
    s (retired_fd),
    socket (socket_)
{
}

zmq::ipc_listener_t::~ipc_listener_t ()
{
145
    zmq_assert (s == retired_fd);
146 147 148 149 150 151 152 153 154 155 156 157
}

void zmq::ipc_listener_t::process_plug ()
{
    //  Start polling for incoming connections.
    handle = add_fd (s);
    set_pollin (handle);
}

void zmq::ipc_listener_t::process_term (int linger_)
{
    rm_fd (handle);
158
    close ();
159 160 161 162 163 164 165 166 167
    own_t::process_term (linger_);
}

void zmq::ipc_listener_t::in_event ()
{
    fd_t fd = accept ();

    //  If connection was reset by the peer in the meantime, just ignore it.
    //  TODO: Handle specific errors like ENFILE/EMFILE etc.
168
    if (fd == retired_fd) {
169
        socket->event_accept_failed (endpoint, zmq_errno());
170
        return;
171
    }
172 173

    //  Create the engine object for this connection.
174
    stream_engine_t *engine = new (std::nothrow)
175
        stream_engine_t (fd, options, endpoint);
176 177 178 179 180 181 182
    alloc_assert (engine);

    //  Choose I/O thread to run connecter in. Given that we are already
    //  running in an I/O thread, there must be at least one available.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    zmq_assert (io_thread);

183
    //  Create and launch a session object.
184
    session_base_t *session = session_base_t::create (io_thread, false, socket,
185
        options, NULL);
186
    errno_assert (session);
187 188 189
    session->inc_seqnum ();
    launch_child (session);
    send_attach (session, engine, false);
190
    socket->event_accepted (endpoint, fd);
191 192
}

193
int zmq::ipc_listener_t::get_address (std::string &addr_)
194
{
195
    struct sockaddr_storage ss;
AJ Lewis's avatar
AJ Lewis committed
196 197 198
#ifdef ZMQ_HAVE_HPUX
    int sl = sizeof (ss);
#else
199
    socklen_t sl = sizeof (ss);
AJ Lewis's avatar
AJ Lewis committed
200
#endif
201
    int rc = getsockname (s, (sockaddr *) &ss, &sl);
202
    if (rc != 0) {
203
        addr_.clear ();
204
        return rc;
205
    }
Mikko Koppanen's avatar
Mikko Koppanen committed
206

207 208
    ipc_address_t addr ((struct sockaddr *) &ss, sl);
    return addr.to_string (addr_);
209 210
}

211
int zmq::ipc_listener_t::set_address (const char *addr_)
212
{
213 214 215 216
    //  Create addr on stack for auto-cleanup
    std::string addr (addr_);

    //  Allow wildcard file
217
    if (options.use_fd == -1 && addr [0] == '*') {
218
        if ( create_wildcard_address(tmp_socket_dirname, addr) < 0 ) {
219
            return -1;
220
        }
221
    }
222

223 224
    //  Get rid of the file associated with the UNIX domain socket that
    //  may have been left behind by the previous run of the application.
225 226 227
    //  MUST NOT unlink if the FD is managed by the user, or it will stop
    //  working after the first client connects. The user will take care of
    //  cleaning up the file after the service is stopped.
228
    if (options.use_fd == -1) {
229 230
        ::unlink (addr.c_str());
    }
231
    filename.clear ();
232

233 234
    //  Initialise the address structure.
    ipc_address_t address;
235
    int rc = address.resolve (addr.c_str());
236 237 238 239 240 241 242 243
    if (rc != 0) {
        if ( !tmp_socket_dirname.empty() ) {
            // We need to preserve errno to return to the user
            int errno_ = errno;
            ::rmdir(tmp_socket_dirname.c_str ());
            tmp_socket_dirname.clear();
            errno = errno_;
        }
244
        return -1;
245
    }
246

247 248
    address.to_string (endpoint);

249 250
    if (options.use_fd != -1) {
        s = options.use_fd;
251 252 253
    } else {
        //  Create a listening socket.
        s = open_socket (AF_UNIX, SOCK_STREAM, 0);
254 255 256 257 258 259 260 261
        if (s == -1) {
            if ( !tmp_socket_dirname.empty() ) {
                // We need to preserve errno to return to the user
                int errno_ = errno;
                ::rmdir(tmp_socket_dirname.c_str ());
                tmp_socket_dirname.clear();
                errno = errno_;
            }
262
            return -1;
263
        }
264 265 266 267 268 269 270 271 272 273 274

        //  Bind the socket to the file path.
        rc = bind (s, address.addr (), address.addrlen ());
        if (rc != 0)
            goto error;

        //  Listen for incoming connections.
        rc = listen (s, options.backlog);
        if (rc != 0)
            goto error;
    }
275

276
    filename.assign (addr.c_str());
277
    has_file = true;
278

279
    socket->event_listening (endpoint, s);
280
    return 0;
281 282 283 284 285 286

error:
    int err = errno;
    close ();
    errno = err;
    return -1;
287 288 289 290 291 292
}

int zmq::ipc_listener_t::close ()
{
    zmq_assert (s != retired_fd);
    int rc = ::close (s);
293
    errno_assert (rc == 0);
294

295 296
    s = retired_fd;

297 298
    //  If there's an underlying UNIX domain socket, get rid of the file it
    //  is associated with.
299 300 301
    //  MUST NOT unlink if the FD is managed by the user, or it will stop
    //  working after the first client connects. The user will take care of
    //  cleaning up the file after the service is stopped.
302 303 304 305 306 307 308 309 310 311 312 313
    if (has_file && options.use_fd == -1) {
        rc = 0;

        if ( !filename.empty () ) {
          rc = ::unlink(filename.c_str ());
        }

        if ( rc == 0 && !tmp_socket_dirname.empty() ) {
          rc = ::rmdir(tmp_socket_dirname.c_str ());
          tmp_socket_dirname.clear();
        }

314
        if (rc != 0) {
315
            socket->event_close_failed (endpoint, zmq_errno());
316
            return -1;
317
        }
318 319
    }

320
    socket->event_closed (endpoint, s);
321 322 323
    return 0;
}

324 325 326 327 328 329 330 331 332 333 334 335 336 337 338
#if defined ZMQ_HAVE_SO_PEERCRED

bool zmq::ipc_listener_t::filter (fd_t sock)
{
    if (options.ipc_uid_accept_filters.empty () &&
        options.ipc_pid_accept_filters.empty () &&
        options.ipc_gid_accept_filters.empty ())
        return true;

    struct ucred cred;
    socklen_t size = sizeof (cred);

    if (getsockopt (sock, SOL_SOCKET, SO_PEERCRED, &cred, &size))
        return false;
    if (options.ipc_uid_accept_filters.find (cred.uid) != options.ipc_uid_accept_filters.end () ||
339
            options.ipc_gid_accept_filters.find (cred.gid) != options.ipc_gid_accept_filters.end () ||
340 341 342 343 344 345 346 347 348 349 350 351
            options.ipc_pid_accept_filters.find (cred.pid) != options.ipc_pid_accept_filters.end ())
        return true;

    struct passwd *pw;
    struct group *gr;

    if (!(pw = getpwuid (cred.uid)))
        return false;
    for (options_t::ipc_gid_accept_filters_t::const_iterator it = options.ipc_gid_accept_filters.begin ();
            it != options.ipc_gid_accept_filters.end (); it++) {
        if (!(gr = getgrgid (*it)))
            continue;
352
        for (char **mem = gr->gr_mem; *mem; mem++) {
353 354
            if (!strcmp (*mem, pw->pw_name))
                return true;
355
        }
356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386
    }
    return false;
}

#elif defined ZMQ_HAVE_LOCAL_PEERCRED

bool zmq::ipc_listener_t::filter (fd_t sock)
{
    if (options.ipc_uid_accept_filters.empty () &&
        options.ipc_gid_accept_filters.empty ())
        return true;

    struct xucred cred;
    socklen_t size = sizeof (cred);

    if (getsockopt (sock, 0, LOCAL_PEERCRED, &cred, &size))
        return false;
    if (cred.cr_version != XUCRED_VERSION)
        return false;
    if (options.ipc_uid_accept_filters.find (cred.cr_uid) != options.ipc_uid_accept_filters.end ())
        return true;
    for (int i = 0; i < cred.cr_ngroups; i++) {
        if (options.ipc_gid_accept_filters.find (cred.cr_groups[i]) != options.ipc_gid_accept_filters.end ())
            return true;
    }

    return false;
}

#endif

387 388
zmq::fd_t zmq::ipc_listener_t::accept ()
{
389
    //  Accept one connection and deal with different failure modes.
390 391
    //  The situation where connection cannot be accepted due to insufficient
    //  resources is considered valid and treated by ignoring the connection.
392
    zmq_assert (s != retired_fd);
393
#if defined ZMQ_HAVE_SOCK_CLOEXEC
hjp's avatar
hjp committed
394
    fd_t sock = ::accept4 (s, NULL, NULL, SOCK_CLOEXEC);
395
#else
396
    fd_t sock = ::accept (s, NULL, NULL);
397
#endif
398 399
    if (sock == -1) {
        errno_assert (errno == EAGAIN || errno == EWOULDBLOCK ||
400
            errno == EINTR || errno == ECONNABORTED || errno == EPROTO ||
401
            errno == ENFILE);
402
        return retired_fd;
403
    }
404

405
#if !defined ZMQ_HAVE_SOCK_CLOEXEC && defined FD_CLOEXEC
406 407 408 409 410 411
    //  Race condition can cause socket not to be closed (if fork happens
    //  between accept and this point).
    int rc = fcntl (sock, F_SETFD, FD_CLOEXEC);
    errno_assert (rc != -1);
#endif

412 413 414 415 416 417 418 419 420
    // IPC accept() filters
#if defined ZMQ_HAVE_SO_PEERCRED || defined ZMQ_HAVE_LOCAL_PEERCRED
    if (!filter (sock)) {
        int rc = ::close (sock);
        errno_assert (rc == 0);
        return retired_fd;
    }
#endif

421 422 423 424 425 426 427 428 429 430 431
    if (zmq::set_nosigpipe (sock)) {
#ifdef ZMQ_HAVE_WINDOWS
        int rc = closesocket (sock);
        wsa_assert (rc != SOCKET_ERROR);
#else
        int rc = ::close (sock);
        errno_assert (rc == 0);
#endif
        return retired_fd;
    }

432 433 434 435
    return sock;
}

#endif