ipc_listener.cpp 12.1 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31 32 33 34
#include "ipc_listener.hpp"

#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS

35 36 37 38
#include <new>

#include <string.h>

39
#include "stream_engine.hpp"
40
#include "ipc_address.hpp"
41
#include "io_thread.hpp"
42
#include "session_base.hpp"
43 44
#include "config.hpp"
#include "err.hpp"
45
#include "ip.hpp"
46
#include "socket_base.hpp"
47 48 49 50 51

#include <unistd.h>
#include <sys/socket.h>
#include <fcntl.h>
#include <sys/un.h>
52
#include <sys/stat.h>
53

54
#ifdef ZMQ_HAVE_LOCAL_PEERCRED
55
#   include <sys/types.h>
56
#   include <sys/ucred.h>
57 58
#endif
#ifdef ZMQ_HAVE_SO_PEERCRED
59
#   include <sys/types.h>
60 61
#   include <pwd.h>
#   include <grp.h>
62 63 64
#   if defined ZMQ_HAVE_OPENBSD
#       define ucred sockpeercred
#   endif
65 66
#endif

67 68 69 70 71 72 73
const char *zmq::ipc_listener_t::tmp_env_vars[] = {
  "TMPDIR",
  "TEMPDIR",
  "TMP",
  0  // Sentinel
};

74 75
int zmq::ipc_listener_t::create_wildcard_address(std::string& path_,
        std::string& file_)
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
{
    std::string tmp_path;

    // If TMPDIR, TEMPDIR, or TMP are available and are directories, create
    // the socket directory there.
    const char **tmp_env = tmp_env_vars;
    while ( tmp_path.empty() && *tmp_env != 0 ) {
        char *tmpdir = getenv(*tmp_env);
        struct stat statbuf;

        // Confirm it is actually a directory before trying to use
        if ( tmpdir != 0 && ::stat(tmpdir, &statbuf) == 0 && S_ISDIR(statbuf.st_mode) ) {
            tmp_path.assign(tmpdir);
            if ( *(tmp_path.rbegin()) != '/' ) {
                tmp_path.push_back('/');
            }
        }

        // Try the next environment variable
        ++tmp_env;
    }

    // Append a directory name
    tmp_path.append("tmpXXXXXX");

    // We need room for tmp_path + trailing NUL
    std::vector<char> buffer(tmp_path.length()+1);
103
    strcpy (&buffer[0], tmp_path.c_str ());
104

105
#ifdef HAVE_MKDTEMP
106 107 108 109 110 111
    // Create the directory.  POSIX requires that mkdtemp() creates the
    // directory with 0700 permissions, meaning the only possible race
    // with socket creation could be the same user.  However, since
    // each socket is created in a directory created by mkdtemp(), and
    // mkdtemp() guarantees a unique directory name, there will be no
    // collision.
112
    if (mkdtemp (&buffer[0]) == 0) {
113 114 115
        return -1;
    }

116
    path_.assign (&buffer[0]);
117 118 119 120 121
    file_.assign (path_ + "/socket");
#else
    // Silence -Wunused-parameter. #pragma and __attribute__((unused)) are not
    // very portable unfortunately...
    (void) path_;
122
    int fd = mkstemp (&buffer[0]);
123 124 125 126
    if (fd == -1)
         return -1;
    ::close (fd);

127
    file_.assign (&buffer[0]);
128
#endif
129 130 131 132

    return 0;
}

133 134 135 136 137 138 139 140 141 142 143 144
zmq::ipc_listener_t::ipc_listener_t (io_thread_t *io_thread_,
      socket_base_t *socket_, const options_t &options_) :
    own_t (io_thread_, options_),
    io_object_t (io_thread_),
    has_file (false),
    s (retired_fd),
    socket (socket_)
{
}

zmq::ipc_listener_t::~ipc_listener_t ()
{
145
    zmq_assert (s == retired_fd);
146 147 148 149 150 151 152 153 154 155 156 157
}

void zmq::ipc_listener_t::process_plug ()
{
    //  Start polling for incoming connections.
    handle = add_fd (s);
    set_pollin (handle);
}

void zmq::ipc_listener_t::process_term (int linger_)
{
    rm_fd (handle);
158
    close ();
159 160 161 162 163 164 165 166 167
    own_t::process_term (linger_);
}

void zmq::ipc_listener_t::in_event ()
{
    fd_t fd = accept ();

    //  If connection was reset by the peer in the meantime, just ignore it.
    //  TODO: Handle specific errors like ENFILE/EMFILE etc.
168
    if (fd == retired_fd) {
169
        socket->event_accept_failed (endpoint, zmq_errno());
170
        return;
171
    }
172 173

    //  Create the engine object for this connection.
174
    stream_engine_t *engine = new (std::nothrow)
175
        stream_engine_t (fd, options, endpoint);
176 177 178 179 180 181 182
    alloc_assert (engine);

    //  Choose I/O thread to run connecter in. Given that we are already
    //  running in an I/O thread, there must be at least one available.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    zmq_assert (io_thread);

183
    //  Create and launch a session object.
184
    session_base_t *session = session_base_t::create (io_thread, false, socket,
185
        options, NULL);
186
    errno_assert (session);
187 188 189
    session->inc_seqnum ();
    launch_child (session);
    send_attach (session, engine, false);
190
    socket->event_accepted (endpoint, fd);
191 192
}

193
int zmq::ipc_listener_t::get_address (std::string &addr_)
194
{
195
    struct sockaddr_storage ss;
AJ Lewis's avatar
AJ Lewis committed
196 197 198
#ifdef ZMQ_HAVE_HPUX
    int sl = sizeof (ss);
#else
199
    socklen_t sl = sizeof (ss);
AJ Lewis's avatar
AJ Lewis committed
200
#endif
201
    int rc = getsockname (s, (sockaddr *) &ss, &sl);
202
    if (rc != 0) {
203
        addr_.clear ();
204
        return rc;
205
    }
Mikko Koppanen's avatar
Mikko Koppanen committed
206

207 208
    ipc_address_t addr ((struct sockaddr *) &ss, sl);
    return addr.to_string (addr_);
209 210
}

211
int zmq::ipc_listener_t::set_address (const char *addr_)
212
{
213 214 215 216
    //  Create addr on stack for auto-cleanup
    std::string addr (addr_);

    //  Allow wildcard file
217
    if (options.use_fd == -1 && addr [0] == '*') {
218
        if ( create_wildcard_address(tmp_socket_dirname, addr) < 0 ) {
219
            return -1;
220
        }
221
    }
222

223 224
    //  Get rid of the file associated with the UNIX domain socket that
    //  may have been left behind by the previous run of the application.
225 226 227
    //  MUST NOT unlink if the FD is managed by the user, or it will stop
    //  working after the first client connects. The user will take care of
    //  cleaning up the file after the service is stopped.
228
    if (options.use_fd == -1) {
229 230
        ::unlink (addr.c_str());
    }
231
    filename.clear ();
232

233 234
    //  Initialise the address structure.
    ipc_address_t address;
235
    int rc = address.resolve (addr.c_str());
236 237 238 239 240 241 242 243
    if (rc != 0) {
        if ( !tmp_socket_dirname.empty() ) {
            // We need to preserve errno to return to the user
            int errno_ = errno;
            ::rmdir(tmp_socket_dirname.c_str ());
            tmp_socket_dirname.clear();
            errno = errno_;
        }
244
        return -1;
245
    }
246

247 248
    address.to_string (endpoint);

249 250
    if (options.use_fd != -1) {
        s = options.use_fd;
251 252 253
    } else {
        //  Create a listening socket.
        s = open_socket (AF_UNIX, SOCK_STREAM, 0);
254 255 256 257 258 259 260 261
        if (s == -1) {
            if ( !tmp_socket_dirname.empty() ) {
                // We need to preserve errno to return to the user
                int errno_ = errno;
                ::rmdir(tmp_socket_dirname.c_str ());
                tmp_socket_dirname.clear();
                errno = errno_;
            }
262
            return -1;
263
        }
264 265 266 267 268 269 270 271 272 273 274

        //  Bind the socket to the file path.
        rc = bind (s, address.addr (), address.addrlen ());
        if (rc != 0)
            goto error;

        //  Listen for incoming connections.
        rc = listen (s, options.backlog);
        if (rc != 0)
            goto error;
    }
275

276
    filename.assign (addr.c_str());
277
    has_file = true;
278

279
    socket->event_listening (endpoint, s);
280
    return 0;
281 282 283 284 285 286

error:
    int err = errno;
    close ();
    errno = err;
    return -1;
287 288 289 290 291
}

int zmq::ipc_listener_t::close ()
{
    zmq_assert (s != retired_fd);
292
    int fd_for_event = s;
293
    int rc = ::close (s);
294
    errno_assert (rc == 0);
295

296 297
    s = retired_fd;

298 299 300 301
    if (has_file && options.use_fd == -1) {
        rc = 0;

        if ( rc == 0 && !tmp_socket_dirname.empty() ) {
302 303
            rc = ::rmdir(tmp_socket_dirname.c_str ());
            tmp_socket_dirname.clear();
304 305
        }

306
        if (rc != 0) {
307
            socket->event_close_failed (endpoint, zmq_errno());
308
            return -1;
309
        }
310 311
    }

312
    socket->event_closed (endpoint, fd_for_event);
313 314 315
    return 0;
}

316 317 318 319 320 321 322 323 324 325 326 327 328 329 330
#if defined ZMQ_HAVE_SO_PEERCRED

bool zmq::ipc_listener_t::filter (fd_t sock)
{
    if (options.ipc_uid_accept_filters.empty () &&
        options.ipc_pid_accept_filters.empty () &&
        options.ipc_gid_accept_filters.empty ())
        return true;

    struct ucred cred;
    socklen_t size = sizeof (cred);

    if (getsockopt (sock, SOL_SOCKET, SO_PEERCRED, &cred, &size))
        return false;
    if (options.ipc_uid_accept_filters.find (cred.uid) != options.ipc_uid_accept_filters.end () ||
331
            options.ipc_gid_accept_filters.find (cred.gid) != options.ipc_gid_accept_filters.end () ||
332 333 334 335 336 337 338 339 340 341 342 343
            options.ipc_pid_accept_filters.find (cred.pid) != options.ipc_pid_accept_filters.end ())
        return true;

    struct passwd *pw;
    struct group *gr;

    if (!(pw = getpwuid (cred.uid)))
        return false;
    for (options_t::ipc_gid_accept_filters_t::const_iterator it = options.ipc_gid_accept_filters.begin ();
            it != options.ipc_gid_accept_filters.end (); it++) {
        if (!(gr = getgrgid (*it)))
            continue;
344
        for (char **mem = gr->gr_mem; *mem; mem++) {
345 346
            if (!strcmp (*mem, pw->pw_name))
                return true;
347
        }
348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378
    }
    return false;
}

#elif defined ZMQ_HAVE_LOCAL_PEERCRED

bool zmq::ipc_listener_t::filter (fd_t sock)
{
    if (options.ipc_uid_accept_filters.empty () &&
        options.ipc_gid_accept_filters.empty ())
        return true;

    struct xucred cred;
    socklen_t size = sizeof (cred);

    if (getsockopt (sock, 0, LOCAL_PEERCRED, &cred, &size))
        return false;
    if (cred.cr_version != XUCRED_VERSION)
        return false;
    if (options.ipc_uid_accept_filters.find (cred.cr_uid) != options.ipc_uid_accept_filters.end ())
        return true;
    for (int i = 0; i < cred.cr_ngroups; i++) {
        if (options.ipc_gid_accept_filters.find (cred.cr_groups[i]) != options.ipc_gid_accept_filters.end ())
            return true;
    }

    return false;
}

#endif

379 380
zmq::fd_t zmq::ipc_listener_t::accept ()
{
381
    //  Accept one connection and deal with different failure modes.
382 383
    //  The situation where connection cannot be accepted due to insufficient
    //  resources is considered valid and treated by ignoring the connection.
384
    zmq_assert (s != retired_fd);
385
#if defined ZMQ_HAVE_SOCK_CLOEXEC && defined HAVE_ACCEPT4
hjp's avatar
hjp committed
386
    fd_t sock = ::accept4 (s, NULL, NULL, SOCK_CLOEXEC);
387
#else
388
    fd_t sock = ::accept (s, NULL, NULL);
389
#endif
390 391
    if (sock == -1) {
        errno_assert (errno == EAGAIN || errno == EWOULDBLOCK ||
392
            errno == EINTR || errno == ECONNABORTED || errno == EPROTO ||
393
            errno == ENFILE);
394
        return retired_fd;
395
    }
396

397
#if (!defined ZMQ_HAVE_SOCK_CLOEXEC || !defined HAVE_ACCEPT4) && defined FD_CLOEXEC
398 399 400 401 402 403
    //  Race condition can cause socket not to be closed (if fork happens
    //  between accept and this point).
    int rc = fcntl (sock, F_SETFD, FD_CLOEXEC);
    errno_assert (rc != -1);
#endif

404 405 406 407 408 409 410 411 412
    // IPC accept() filters
#if defined ZMQ_HAVE_SO_PEERCRED || defined ZMQ_HAVE_LOCAL_PEERCRED
    if (!filter (sock)) {
        int rc = ::close (sock);
        errno_assert (rc == 0);
        return retired_fd;
    }
#endif

413 414 415 416 417 418 419 420 421 422 423
    if (zmq::set_nosigpipe (sock)) {
#ifdef ZMQ_HAVE_WINDOWS
        int rc = closesocket (sock);
        wsa_assert (rc != SOCKET_ERROR);
#else
        int rc = ::close (sock);
        errno_assert (rc == 0);
#endif
        return retired_fd;
    }

424 425 426 427
    return sock;
}

#endif