async-unix-test.c++ 14.7 KB
Newer Older
Kenton Varda's avatar
Kenton Varda committed
1 2
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
3
//
Kenton Varda's avatar
Kenton Varda committed
4 5 6 7 8 9
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
10
//
Kenton Varda's avatar
Kenton Varda committed
11 12
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
13
//
Kenton Varda's avatar
Kenton Varda committed
14 15 16 17 18 19 20
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
21 22 23 24

#include "async-unix.h"
#include "thread.h"
#include "debug.h"
25
#include "io.h"
26 27 28 29
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>
30
#include <kj/compat/gtest.h>
31
#include <pthread.h>
32
#include <algorithm>
33 34 35 36 37

namespace kj {

inline void delay() { usleep(10000); }

38 39 40 41 42 43 44
// On OSX, si_code seems to be zero when SI_USER is expected.
#if __linux__ || __CYGWIN__
#define EXPECT_SI_CODE EXPECT_EQ
#else
#define EXPECT_SI_CODE(a,b)
#endif

45 46 47 48 49
void captureSignals() {
  static bool captured = false;
  if (!captured) {
    captured = true;

50 51 52 53
    // We use SIGIO and SIGURG as our test signals because they're two signals that we can be
    // reasonably confident won't otherwise be delivered to any KJ or Cap'n Proto test.  We can't
    // use SIGUSR1 because it is reserved by UnixEventPort and SIGUSR2 is used by Valgrind on OSX.
    UnixEventPort::captureSignal(SIGURG);
54
    UnixEventPort::captureSignal(SIGIO);
55
  }
56
}
57

58 59
TEST(AsyncUnixTest, Signals) {
  captureSignals();
60 61
  UnixEventPort port;
  EventLoop loop(port);
62
  WaitScope waitScope(loop);
63

64
  kill(getpid(), SIGURG);
65

66 67
  siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
  EXPECT_EQ(SIGURG, info.si_signo);
68
  EXPECT_SI_CODE(SI_USER, info.si_code);
69 70
}

71
#if defined(SIGRTMIN) && !__BIONIC__ && !(__linux__ && __mips__)
72
TEST(AsyncUnixTest, SignalWithValue) {
73 74
  // This tests that if we use sigqueue() to attach a value to the signal, that value is received
  // correctly.  Note that this only works on platforms that support real-time signals -- even
75
  // though the signal we're sending is SIGURG, the sigqueue() system call is introduced by RT
76
  // signals.  Hence this test won't run on e.g. Mac OSX.
Kenton Varda's avatar
Kenton Varda committed
77 78
  //
  // Also, Android's bionic does not appear to support sigqueue() even though the kernel does.
79 80 81 82
  //
  // Also, this test fails on Linux on mipsel. si_value comes back as zero. No one with a mips
  // machine wants to debug the problem but they demand a patch fixing it, so we disable the test.
  // Sad. https://github.com/sandstorm-io/capnproto/issues/204
83

84
  captureSignals();
85 86
  UnixEventPort port;
  EventLoop loop(port);
87
  WaitScope waitScope(loop);
88

89
  union sigval value;
90
  memset(&value, 0, sizeof(value));
91
  value.sival_int = 123;
92
  sigqueue(getpid(), SIGURG, value);
93

94 95
  siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
  EXPECT_EQ(SIGURG, info.si_signo);
96
  EXPECT_SI_CODE(SI_QUEUE, info.si_code);
97 98
  EXPECT_EQ(123, info.si_value.sival_int);
}
99

100
TEST(AsyncUnixTest, SignalWithPointerValue) {
101 102 103 104
  // This tests that if we use sigqueue() to attach a value to the signal, that value is received
  // correctly.  Note that this only works on platforms that support real-time signals -- even
  // though the signal we're sending is SIGURG, the sigqueue() system call is introduced by RT
  // signals.  Hence this test won't run on e.g. Mac OSX.
Kenton Varda's avatar
Kenton Varda committed
105 106
  //
  // Also, Android's bionic does not appear to support sigqueue() even though the kernel does.
107 108 109 110
  //
  // Also, this test fails on Linux on mipsel. si_value comes back as zero. No one with a mips
  // machine wants to debug the problem but they demand a patch fixing it, so we disable the test.
  // Sad. https://github.com/sandstorm-io/capnproto/issues/204
111

112
  captureSignals();
113 114 115 116 117 118 119 120 121 122 123 124 125 126
  UnixEventPort port;
  EventLoop loop(port);
  WaitScope waitScope(loop);

  union sigval value;
  memset(&value, 0, sizeof(value));
  value.sival_ptr = &port;
  sigqueue(getpid(), SIGURG, value);

  siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
  EXPECT_EQ(SIGURG, info.si_signo);
  EXPECT_SI_CODE(SI_QUEUE, info.si_code);
  EXPECT_EQ(&port, info.si_value.sival_ptr);
}
127
#endif
128

129 130
TEST(AsyncUnixTest, SignalsMultiListen) {
  captureSignals();
131 132
  UnixEventPort port;
  EventLoop loop(port);
133
  WaitScope waitScope(loop);
134

135
  port.onSignal(SIGIO).then([](siginfo_t&&) {
136
    KJ_FAIL_EXPECT("Received wrong signal.");
137
  }).detach([](kj::Exception&& exception) {
138
    KJ_FAIL_EXPECT(exception);
139
  });
140

141
  kill(getpid(), SIGURG);
142

143 144
  siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
  EXPECT_EQ(SIGURG, info.si_signo);
145
  EXPECT_SI_CODE(SI_USER, info.si_code);
146 147
}

148 149 150 151 152
#if !__CYGWIN32__
// Cygwin32 (but not Cygwin64) appears not to deliver SIGURG in the following test (but it does
// deliver SIGIO, if you reverse the order of the waits).  Since this doesn't occur on any other
// platform I'm assuming it's a Cygwin bug.

153 154
TEST(AsyncUnixTest, SignalsMultiReceive) {
  captureSignals();
155 156
  UnixEventPort port;
  EventLoop loop(port);
157
  WaitScope waitScope(loop);
158

159
  kill(getpid(), SIGURG);
160 161
  kill(getpid(), SIGIO);

162 163
  siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
  EXPECT_EQ(SIGURG, info.si_signo);
164 165
  EXPECT_SI_CODE(SI_USER, info.si_code);

166
  info = port.onSignal(SIGIO).wait(waitScope);
167 168 169 170
  EXPECT_EQ(SIGIO, info.si_signo);
  EXPECT_SI_CODE(SI_USER, info.si_code);
}

171 172
#endif  // !__CYGWIN32__

173 174
TEST(AsyncUnixTest, SignalsAsync) {
  captureSignals();
175 176
  UnixEventPort port;
  EventLoop loop(port);
177
  WaitScope waitScope(loop);
178 179 180

  // Arrange for a signal to be sent from another thread.
  pthread_t mainThread = pthread_self();
181
  Thread thread([&]() {
182
    delay();
183
    pthread_kill(mainThread, SIGURG);
184
  });
185

186 187
  siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
  EXPECT_EQ(SIGURG, info.si_signo);
188
#if __linux__
189
  EXPECT_SI_CODE(SI_TKILL, info.si_code);
190
#endif
191 192
}

193 194 195 196 197
#if !__CYGWIN32__
// Cygwin32 (but not Cygwin64) appears not to deliver SIGURG in the following test (but it does
// deliver SIGIO, if you reverse the order of the waits).  Since this doesn't occur on any other
// platform I'm assuming it's a Cygwin bug.

198
TEST(AsyncUnixTest, SignalsNoWait) {
199 200
  // Verify that UnixEventPort::poll() correctly receives pending signals.

201
  captureSignals();
202 203
  UnixEventPort port;
  EventLoop loop(port);
204
  WaitScope waitScope(loop);
205

206
  bool receivedSigurg = false;
207
  bool receivedSigio = false;
208 209 210
  port.onSignal(SIGURG).then([&](siginfo_t&& info) {
    receivedSigurg = true;
    EXPECT_EQ(SIGURG, info.si_signo);
211
    EXPECT_SI_CODE(SI_USER, info.si_code);
212
  }).detach([](Exception&& e) { KJ_FAIL_EXPECT(e); });
213 214 215 216
  port.onSignal(SIGIO).then([&](siginfo_t&& info) {
    receivedSigio = true;
    EXPECT_EQ(SIGIO, info.si_signo);
    EXPECT_SI_CODE(SI_USER, info.si_code);
217
  }).detach([](Exception&& e) { KJ_FAIL_EXPECT(e); });
218

219
  kill(getpid(), SIGURG);
220 221
  kill(getpid(), SIGIO);

222
  EXPECT_FALSE(receivedSigurg);
223 224 225 226
  EXPECT_FALSE(receivedSigio);

  loop.run();

227
  EXPECT_FALSE(receivedSigurg);
228 229 230 231
  EXPECT_FALSE(receivedSigio);

  port.poll();

232
  EXPECT_FALSE(receivedSigurg);
233 234 235 236
  EXPECT_FALSE(receivedSigio);

  loop.run();

237
  EXPECT_TRUE(receivedSigurg);
238 239 240
  EXPECT_TRUE(receivedSigio);
}

241 242
#endif  // !__CYGWIN32__

243 244
TEST(AsyncUnixTest, ReadObserver) {
  captureSignals();
245 246
  UnixEventPort port;
  EventLoop loop(port);
247
  WaitScope waitScope(loop);
248 249 250

  int pipefds[2];
  KJ_SYSCALL(pipe(pipefds));
251 252
  kj::AutoCloseFd infd(pipefds[0]), outfd(pipefds[1]);

253
  UnixEventPort::FdObserver observer(port, infd, UnixEventPort::FdObserver::OBSERVE_READ);
254 255 256

  KJ_SYSCALL(write(outfd, "foo", 3));

257
  observer.whenBecomesReadable().wait(waitScope);
258 259

#if __linux__  // platform known to support POLLRDHUP
260
  EXPECT_FALSE(KJ_ASSERT_NONNULL(observer.atEndHint()));
261

262 263 264 265 266 267 268 269
  char buffer[4096];
  ssize_t n;
  KJ_SYSCALL(n = read(infd, &buffer, sizeof(buffer)));
  EXPECT_EQ(3, n);

  KJ_SYSCALL(write(outfd, "bar", 3));
  outfd = nullptr;

270
  observer.whenBecomesReadable().wait(waitScope);
271

272
  EXPECT_TRUE(KJ_ASSERT_NONNULL(observer.atEndHint()));
273
#endif
274 275
}

276 277
TEST(AsyncUnixTest, ReadObserverMultiListen) {
  captureSignals();
278 279
  UnixEventPort port;
  EventLoop loop(port);
280
  WaitScope waitScope(loop);
281 282 283 284 285

  int bogusPipefds[2];
  KJ_SYSCALL(pipe(bogusPipefds));
  KJ_DEFER({ close(bogusPipefds[1]); close(bogusPipefds[0]); });

286 287 288 289
  UnixEventPort::FdObserver bogusObserver(port, bogusPipefds[0],
      UnixEventPort::FdObserver::OBSERVE_READ);

  bogusObserver.whenBecomesReadable().then([]() {
290
    ADD_FAILURE() << "Received wrong poll.";
291
  }).detach([](kj::Exception&& exception) {
292 293
    ADD_FAILURE() << kj::str(exception).cStr();
  });
294 295 296 297

  int pipefds[2];
  KJ_SYSCALL(pipe(pipefds));
  KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
298

299 300
  UnixEventPort::FdObserver observer(port, pipefds[0],
      UnixEventPort::FdObserver::OBSERVE_READ);
301 302
  KJ_SYSCALL(write(pipefds[1], "foo", 3));

303
  observer.whenBecomesReadable().wait(waitScope);
304 305
}

306 307
TEST(AsyncUnixTest, ReadObserverMultiReceive) {
  captureSignals();
308 309
  UnixEventPort port;
  EventLoop loop(port);
310
  WaitScope waitScope(loop);
311 312 313 314

  int pipefds[2];
  KJ_SYSCALL(pipe(pipefds));
  KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
315

316 317
  UnixEventPort::FdObserver observer(port, pipefds[0],
      UnixEventPort::FdObserver::OBSERVE_READ);
318 319 320 321 322
  KJ_SYSCALL(write(pipefds[1], "foo", 3));

  int pipefds2[2];
  KJ_SYSCALL(pipe(pipefds2));
  KJ_DEFER({ close(pipefds2[1]); close(pipefds2[0]); });
323

324 325
  UnixEventPort::FdObserver observer2(port, pipefds2[0],
      UnixEventPort::FdObserver::OBSERVE_READ);
326 327
  KJ_SYSCALL(write(pipefds2[1], "bar", 3));

328 329 330 331
  auto promise1 = observer.whenBecomesReadable();
  auto promise2 = observer2.whenBecomesReadable();
  promise1.wait(waitScope);
  promise2.wait(waitScope);
332 333
}

334 335
TEST(AsyncUnixTest, ReadObserverAsync) {
  captureSignals();
336 337
  UnixEventPort port;
  EventLoop loop(port);
338
  WaitScope waitScope(loop);
339

340
  // Make a pipe and wait on its read end while another thread writes to it.
341 342
  int pipefds[2];
  KJ_SYSCALL(pipe(pipefds));
343
  KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
344 345
  UnixEventPort::FdObserver observer(port, pipefds[0],
      UnixEventPort::FdObserver::OBSERVE_READ);
346

347 348 349
  Thread thread([&]() {
    delay();
    KJ_SYSCALL(write(pipefds[1], "foo", 3));
350 351
  });

352
  // Wait for the event in this thread.
353
  observer.whenBecomesReadable().wait(waitScope);
354 355
}

356
TEST(AsyncUnixTest, ReadObserverNoWait) {
357 358
  // Verify that UnixEventPort::poll() correctly receives pending FD events.

359
  captureSignals();
360 361
  UnixEventPort port;
  EventLoop loop(port);
362
  WaitScope waitScope(loop);
363 364 365 366

  int pipefds[2];
  KJ_SYSCALL(pipe(pipefds));
  KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
367 368
  UnixEventPort::FdObserver observer(port, pipefds[0],
      UnixEventPort::FdObserver::OBSERVE_READ);
369 370 371 372

  int pipefds2[2];
  KJ_SYSCALL(pipe(pipefds2));
  KJ_DEFER({ close(pipefds2[1]); close(pipefds2[0]); });
373 374
  UnixEventPort::FdObserver observer2(port, pipefds2[0],
      UnixEventPort::FdObserver::OBSERVE_READ);
375 376

  int receivedCount = 0;
377
  observer.whenBecomesReadable().then([&]() {
378
    receivedCount++;
379
  }).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
380
  observer2.whenBecomesReadable().then([&]() {
381
    receivedCount++;
382
  }).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399

  KJ_SYSCALL(write(pipefds[1], "foo", 3));
  KJ_SYSCALL(write(pipefds2[1], "bar", 3));

  EXPECT_EQ(0, receivedCount);

  loop.run();

  EXPECT_EQ(0, receivedCount);

  port.poll();

  EXPECT_EQ(0, receivedCount);

  loop.run();

  EXPECT_EQ(2, receivedCount);
400 401
}

402 403 404 405 406 407 408 409
static void setNonblocking(int fd) {
  int flags;
  KJ_SYSCALL(flags = fcntl(fd, F_GETFL));
  if ((flags & O_NONBLOCK) == 0) {
    KJ_SYSCALL(fcntl(fd, F_SETFL, flags | O_NONBLOCK));
  }
}

410 411
TEST(AsyncUnixTest, WriteObserver) {
  captureSignals();
412 413 414 415 416 417 418 419
  UnixEventPort port;
  EventLoop loop(port);
  WaitScope waitScope(loop);

  int pipefds[2];
  KJ_SYSCALL(pipe(pipefds));
  kj::AutoCloseFd infd(pipefds[0]), outfd(pipefds[1]);
  setNonblocking(outfd);
420
  setNonblocking(infd);
421

422
  UnixEventPort::FdObserver observer(port, outfd, UnixEventPort::FdObserver::OBSERVE_WRITE);
423 424 425 426 427 428 429 430

  // Fill buffer.
  ssize_t n;
  do {
    KJ_NONBLOCKING_SYSCALL(n = write(outfd, "foo", 3));
  } while (n >= 0);

  bool writable = false;
431
  auto promise = observer.whenBecomesWritable()
432 433 434 435 436 437 438 439
      .then([&]() { writable = true; }).eagerlyEvaluate(nullptr);

  loop.run();
  port.poll();
  loop.run();

  EXPECT_FALSE(writable);

440 441 442 443
  // Empty the read end so that the write end becomes writable. Note that Linux implements a
  // high watermark / low watermark heuristic which means that only reading one byte is not
  // sufficient. The amount we have to read is in fact architecture-dependent -- it appears to be
  // 1 page. To be safe, we read everything.
444
  char buffer[4096];
445
  do {
446
    KJ_NONBLOCKING_SYSCALL(n = read(infd, &buffer, sizeof(buffer)));
447
  } while (n > 0);
448 449 450 451 452 453 454 455

  loop.run();
  port.poll();
  loop.run();

  EXPECT_TRUE(writable);
}

456 457
TEST(AsyncUnixTest, SteadyTimers) {
  captureSignals();
458 459 460 461 462
  UnixEventPort port;
  EventLoop loop(port);
  WaitScope waitScope(loop);

  auto start = port.steadyTime();
463 464
  kj::Vector<TimePoint> expected;
  kj::Vector<TimePoint> actual;
465

466 467
  auto addTimer = [&](Duration delay) {
    expected.add(max(start + delay, start));
468
    port.atSteadyTime(start + delay).then([&]() {
469
      actual.add(port.steadyTime());
470 471 472
    }).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
  };

473 474 475 476 477
  addTimer(30 * MILLISECONDS);
  addTimer(40 * MILLISECONDS);
  addTimer(20350 * MICROSECONDS);
  addTimer(30 * MILLISECONDS);
  addTimer(-10 * MILLISECONDS);
478 479

  std::sort(expected.begin(), expected.end());
480
  port.atSteadyTime(expected.back() + MILLISECONDS).wait(waitScope);
481 482 483

  ASSERT_EQ(expected.size(), actual.size());
  for (int i = 0; i < expected.size(); ++i) {
484 485
    KJ_EXPECT(expected[i] <= actual[i], "Actual time for timer i is too early.",
              i, ((expected[i] - actual[i]) / NANOSECONDS));
486 487 488
  }
}

489 490
TEST(AsyncUnixTest, Wake) {
  captureSignals();
491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517
  UnixEventPort port;
  EventLoop loop(port);
  WaitScope waitScope(loop);

  EXPECT_FALSE(port.poll());
  port.wake();
  EXPECT_TRUE(port.poll());
  EXPECT_FALSE(port.poll());

  port.wake();
  EXPECT_TRUE(port.wait());

  {
    auto promise = port.atSteadyTime(port.steadyTime());
    EXPECT_FALSE(port.wait());
  }

  bool woken = false;
  Thread thread([&]() {
    delay();
    woken = true;
    port.wake();
  });

  EXPECT_TRUE(port.wait());
}

518
}  // namespace kj