async-unix-test.c++ 13.8 KB
Newer Older
Kenton Varda's avatar
Kenton Varda committed
1 2
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
3
//
Kenton Varda's avatar
Kenton Varda committed
4 5 6 7 8 9
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
10
//
Kenton Varda's avatar
Kenton Varda committed
11 12
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
13
//
Kenton Varda's avatar
Kenton Varda committed
14 15 16 17 18 19 20
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
21 22 23 24

#include "async-unix.h"
#include "thread.h"
#include "debug.h"
25
#include "io.h"
26 27 28 29
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>
30
#include <kj/compat/gtest.h>
31
#include <pthread.h>
32
#include <algorithm>
33 34 35 36 37

namespace kj {

inline void delay() { usleep(10000); }

38 39 40 41 42 43 44
// On OSX, si_code seems to be zero when SI_USER is expected.
#if __linux__ || __CYGWIN__
#define EXPECT_SI_CODE EXPECT_EQ
#else
#define EXPECT_SI_CODE(a,b)
#endif

45 46 47 48 49
void captureSignals() {
  static bool captured = false;
  if (!captured) {
    captured = true;

50 51 52 53
    // We use SIGIO and SIGURG as our test signals because they're two signals that we can be
    // reasonably confident won't otherwise be delivered to any KJ or Cap'n Proto test.  We can't
    // use SIGUSR1 because it is reserved by UnixEventPort and SIGUSR2 is used by Valgrind on OSX.
    UnixEventPort::captureSignal(SIGURG);
54
    UnixEventPort::captureSignal(SIGIO);
55
  }
56
}
57

58 59
TEST(AsyncUnixTest, Signals) {
  captureSignals();
60 61
  UnixEventPort port;
  EventLoop loop(port);
62
  WaitScope waitScope(loop);
63

64
  kill(getpid(), SIGURG);
65

66 67
  siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
  EXPECT_EQ(SIGURG, info.si_signo);
68
  EXPECT_SI_CODE(SI_USER, info.si_code);
69 70
}

Kenton Varda's avatar
Kenton Varda committed
71
#if defined(SIGRTMIN) && !__BIONIC__
72
TEST(AsyncUnixTest, SignalWithValue) {
73 74
  // This tests that if we use sigqueue() to attach a value to the signal, that value is received
  // correctly.  Note that this only works on platforms that support real-time signals -- even
75
  // though the signal we're sending is SIGURG, the sigqueue() system call is introduced by RT
76
  // signals.  Hence this test won't run on e.g. Mac OSX.
Kenton Varda's avatar
Kenton Varda committed
77 78
  //
  // Also, Android's bionic does not appear to support sigqueue() even though the kernel does.
79

80
  captureSignals();
81 82
  UnixEventPort port;
  EventLoop loop(port);
83
  WaitScope waitScope(loop);
84

85
  union sigval value;
86
  memset(&value, 0, sizeof(value));
87
  value.sival_int = 123;
88
  sigqueue(getpid(), SIGURG, value);
89

90 91
  siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
  EXPECT_EQ(SIGURG, info.si_signo);
92
  EXPECT_SI_CODE(SI_QUEUE, info.si_code);
93 94
  EXPECT_EQ(123, info.si_value.sival_int);
}
95

96
TEST(AsyncUnixTest, SignalWithPointerValue) {
97 98 99 100
  // This tests that if we use sigqueue() to attach a value to the signal, that value is received
  // correctly.  Note that this only works on platforms that support real-time signals -- even
  // though the signal we're sending is SIGURG, the sigqueue() system call is introduced by RT
  // signals.  Hence this test won't run on e.g. Mac OSX.
Kenton Varda's avatar
Kenton Varda committed
101 102
  //
  // Also, Android's bionic does not appear to support sigqueue() even though the kernel does.
103

104
  captureSignals();
105 106 107 108 109 110 111 112 113 114 115 116 117 118
  UnixEventPort port;
  EventLoop loop(port);
  WaitScope waitScope(loop);

  union sigval value;
  memset(&value, 0, sizeof(value));
  value.sival_ptr = &port;
  sigqueue(getpid(), SIGURG, value);

  siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
  EXPECT_EQ(SIGURG, info.si_signo);
  EXPECT_SI_CODE(SI_QUEUE, info.si_code);
  EXPECT_EQ(&port, info.si_value.sival_ptr);
}
119
#endif
120

121 122
TEST(AsyncUnixTest, SignalsMultiListen) {
  captureSignals();
123 124
  UnixEventPort port;
  EventLoop loop(port);
125
  WaitScope waitScope(loop);
126

127
  port.onSignal(SIGIO).then([](siginfo_t&&) {
128
    KJ_FAIL_EXPECT("Received wrong signal.");
129
  }).detach([](kj::Exception&& exception) {
130
    KJ_FAIL_EXPECT(exception);
131
  });
132

133
  kill(getpid(), SIGURG);
134

135 136
  siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
  EXPECT_EQ(SIGURG, info.si_signo);
137
  EXPECT_SI_CODE(SI_USER, info.si_code);
138 139
}

140 141 142 143 144
#if !__CYGWIN32__
// Cygwin32 (but not Cygwin64) appears not to deliver SIGURG in the following test (but it does
// deliver SIGIO, if you reverse the order of the waits).  Since this doesn't occur on any other
// platform I'm assuming it's a Cygwin bug.

145 146
TEST(AsyncUnixTest, SignalsMultiReceive) {
  captureSignals();
147 148
  UnixEventPort port;
  EventLoop loop(port);
149
  WaitScope waitScope(loop);
150

151
  kill(getpid(), SIGURG);
152 153
  kill(getpid(), SIGIO);

154 155
  siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
  EXPECT_EQ(SIGURG, info.si_signo);
156 157
  EXPECT_SI_CODE(SI_USER, info.si_code);

158
  info = port.onSignal(SIGIO).wait(waitScope);
159 160 161 162
  EXPECT_EQ(SIGIO, info.si_signo);
  EXPECT_SI_CODE(SI_USER, info.si_code);
}

163 164
#endif  // !__CYGWIN32__

165 166
TEST(AsyncUnixTest, SignalsAsync) {
  captureSignals();
167 168
  UnixEventPort port;
  EventLoop loop(port);
169
  WaitScope waitScope(loop);
170 171 172

  // Arrange for a signal to be sent from another thread.
  pthread_t mainThread = pthread_self();
173
  Thread thread([&]() {
174
    delay();
175
    pthread_kill(mainThread, SIGURG);
176
  });
177

178 179
  siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
  EXPECT_EQ(SIGURG, info.si_signo);
180
#if __linux__
181
  EXPECT_SI_CODE(SI_TKILL, info.si_code);
182
#endif
183 184
}

185 186 187 188 189
#if !__CYGWIN32__
// Cygwin32 (but not Cygwin64) appears not to deliver SIGURG in the following test (but it does
// deliver SIGIO, if you reverse the order of the waits).  Since this doesn't occur on any other
// platform I'm assuming it's a Cygwin bug.

190
TEST(AsyncUnixTest, SignalsNoWait) {
191 192
  // Verify that UnixEventPort::poll() correctly receives pending signals.

193
  captureSignals();
194 195
  UnixEventPort port;
  EventLoop loop(port);
196
  WaitScope waitScope(loop);
197

198
  bool receivedSigurg = false;
199
  bool receivedSigio = false;
200 201 202
  port.onSignal(SIGURG).then([&](siginfo_t&& info) {
    receivedSigurg = true;
    EXPECT_EQ(SIGURG, info.si_signo);
203
    EXPECT_SI_CODE(SI_USER, info.si_code);
204
  }).detach([](Exception&& e) { KJ_FAIL_EXPECT(e); });
205 206 207 208
  port.onSignal(SIGIO).then([&](siginfo_t&& info) {
    receivedSigio = true;
    EXPECT_EQ(SIGIO, info.si_signo);
    EXPECT_SI_CODE(SI_USER, info.si_code);
209
  }).detach([](Exception&& e) { KJ_FAIL_EXPECT(e); });
210

211
  kill(getpid(), SIGURG);
212 213
  kill(getpid(), SIGIO);

214
  EXPECT_FALSE(receivedSigurg);
215 216 217 218
  EXPECT_FALSE(receivedSigio);

  loop.run();

219
  EXPECT_FALSE(receivedSigurg);
220 221 222 223
  EXPECT_FALSE(receivedSigio);

  port.poll();

224
  EXPECT_FALSE(receivedSigurg);
225 226 227 228
  EXPECT_FALSE(receivedSigio);

  loop.run();

229
  EXPECT_TRUE(receivedSigurg);
230 231 232
  EXPECT_TRUE(receivedSigio);
}

233 234
#endif  // !__CYGWIN32__

235 236
TEST(AsyncUnixTest, ReadObserver) {
  captureSignals();
237 238
  UnixEventPort port;
  EventLoop loop(port);
239
  WaitScope waitScope(loop);
240 241 242

  int pipefds[2];
  KJ_SYSCALL(pipe(pipefds));
243 244
  kj::AutoCloseFd infd(pipefds[0]), outfd(pipefds[1]);

245
  UnixEventPort::FdObserver observer(port, infd, UnixEventPort::FdObserver::OBSERVE_READ);
246 247 248

  KJ_SYSCALL(write(outfd, "foo", 3));

249
  observer.whenBecomesReadable().wait(waitScope);
250 251

#if __linux__  // platform known to support POLLRDHUP
252
  EXPECT_FALSE(KJ_ASSERT_NONNULL(observer.atEndHint()));
253

254 255 256 257 258 259 260 261
  char buffer[4096];
  ssize_t n;
  KJ_SYSCALL(n = read(infd, &buffer, sizeof(buffer)));
  EXPECT_EQ(3, n);

  KJ_SYSCALL(write(outfd, "bar", 3));
  outfd = nullptr;

262
  observer.whenBecomesReadable().wait(waitScope);
263

264
  EXPECT_TRUE(KJ_ASSERT_NONNULL(observer.atEndHint()));
265
#endif
266 267
}

268 269
TEST(AsyncUnixTest, ReadObserverMultiListen) {
  captureSignals();
270 271
  UnixEventPort port;
  EventLoop loop(port);
272
  WaitScope waitScope(loop);
273 274 275 276 277

  int bogusPipefds[2];
  KJ_SYSCALL(pipe(bogusPipefds));
  KJ_DEFER({ close(bogusPipefds[1]); close(bogusPipefds[0]); });

278 279 280 281
  UnixEventPort::FdObserver bogusObserver(port, bogusPipefds[0],
      UnixEventPort::FdObserver::OBSERVE_READ);

  bogusObserver.whenBecomesReadable().then([]() {
282
    ADD_FAILURE() << "Received wrong poll.";
283
  }).detach([](kj::Exception&& exception) {
284 285
    ADD_FAILURE() << kj::str(exception).cStr();
  });
286 287 288 289

  int pipefds[2];
  KJ_SYSCALL(pipe(pipefds));
  KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
290

291 292
  UnixEventPort::FdObserver observer(port, pipefds[0],
      UnixEventPort::FdObserver::OBSERVE_READ);
293 294
  KJ_SYSCALL(write(pipefds[1], "foo", 3));

295
  observer.whenBecomesReadable().wait(waitScope);
296 297
}

298 299
TEST(AsyncUnixTest, ReadObserverMultiReceive) {
  captureSignals();
300 301
  UnixEventPort port;
  EventLoop loop(port);
302
  WaitScope waitScope(loop);
303 304 305 306

  int pipefds[2];
  KJ_SYSCALL(pipe(pipefds));
  KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
307

308 309
  UnixEventPort::FdObserver observer(port, pipefds[0],
      UnixEventPort::FdObserver::OBSERVE_READ);
310 311 312 313 314
  KJ_SYSCALL(write(pipefds[1], "foo", 3));

  int pipefds2[2];
  KJ_SYSCALL(pipe(pipefds2));
  KJ_DEFER({ close(pipefds2[1]); close(pipefds2[0]); });
315

316 317
  UnixEventPort::FdObserver observer2(port, pipefds2[0],
      UnixEventPort::FdObserver::OBSERVE_READ);
318 319
  KJ_SYSCALL(write(pipefds2[1], "bar", 3));

320 321 322 323
  auto promise1 = observer.whenBecomesReadable();
  auto promise2 = observer2.whenBecomesReadable();
  promise1.wait(waitScope);
  promise2.wait(waitScope);
324 325
}

326 327
TEST(AsyncUnixTest, ReadObserverAsync) {
  captureSignals();
328 329
  UnixEventPort port;
  EventLoop loop(port);
330
  WaitScope waitScope(loop);
331

332
  // Make a pipe and wait on its read end while another thread writes to it.
333 334
  int pipefds[2];
  KJ_SYSCALL(pipe(pipefds));
335
  KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
336 337
  UnixEventPort::FdObserver observer(port, pipefds[0],
      UnixEventPort::FdObserver::OBSERVE_READ);
338

339 340 341
  Thread thread([&]() {
    delay();
    KJ_SYSCALL(write(pipefds[1], "foo", 3));
342 343
  });

344
  // Wait for the event in this thread.
345
  observer.whenBecomesReadable().wait(waitScope);
346 347
}

348
TEST(AsyncUnixTest, ReadObserverNoWait) {
349 350
  // Verify that UnixEventPort::poll() correctly receives pending FD events.

351
  captureSignals();
352 353
  UnixEventPort port;
  EventLoop loop(port);
354
  WaitScope waitScope(loop);
355 356 357 358

  int pipefds[2];
  KJ_SYSCALL(pipe(pipefds));
  KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
359 360
  UnixEventPort::FdObserver observer(port, pipefds[0],
      UnixEventPort::FdObserver::OBSERVE_READ);
361 362 363 364

  int pipefds2[2];
  KJ_SYSCALL(pipe(pipefds2));
  KJ_DEFER({ close(pipefds2[1]); close(pipefds2[0]); });
365 366
  UnixEventPort::FdObserver observer2(port, pipefds2[0],
      UnixEventPort::FdObserver::OBSERVE_READ);
367 368

  int receivedCount = 0;
369
  observer.whenBecomesReadable().then([&]() {
370
    receivedCount++;
371
  }).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
372
  observer2.whenBecomesReadable().then([&]() {
373
    receivedCount++;
374
  }).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391

  KJ_SYSCALL(write(pipefds[1], "foo", 3));
  KJ_SYSCALL(write(pipefds2[1], "bar", 3));

  EXPECT_EQ(0, receivedCount);

  loop.run();

  EXPECT_EQ(0, receivedCount);

  port.poll();

  EXPECT_EQ(0, receivedCount);

  loop.run();

  EXPECT_EQ(2, receivedCount);
392 393
}

394 395 396 397 398 399 400 401
static void setNonblocking(int fd) {
  int flags;
  KJ_SYSCALL(flags = fcntl(fd, F_GETFL));
  if ((flags & O_NONBLOCK) == 0) {
    KJ_SYSCALL(fcntl(fd, F_SETFL, flags | O_NONBLOCK));
  }
}

402 403
TEST(AsyncUnixTest, WriteObserver) {
  captureSignals();
404 405 406 407 408 409 410 411 412
  UnixEventPort port;
  EventLoop loop(port);
  WaitScope waitScope(loop);

  int pipefds[2];
  KJ_SYSCALL(pipe(pipefds));
  kj::AutoCloseFd infd(pipefds[0]), outfd(pipefds[1]);
  setNonblocking(outfd);

413
  UnixEventPort::FdObserver observer(port, outfd, UnixEventPort::FdObserver::OBSERVE_WRITE);
414 415 416 417 418 419 420 421

  // Fill buffer.
  ssize_t n;
  do {
    KJ_NONBLOCKING_SYSCALL(n = write(outfd, "foo", 3));
  } while (n >= 0);

  bool writable = false;
422
  auto promise = observer.whenBecomesWritable()
423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440
      .then([&]() { writable = true; }).eagerlyEvaluate(nullptr);

  loop.run();
  port.poll();
  loop.run();

  EXPECT_FALSE(writable);

  char buffer[4096];
  KJ_SYSCALL(read(infd, &buffer, sizeof(buffer)));

  loop.run();
  port.poll();
  loop.run();

  EXPECT_TRUE(writable);
}

441 442
TEST(AsyncUnixTest, SteadyTimers) {
  captureSignals();
443 444 445 446 447
  UnixEventPort port;
  EventLoop loop(port);
  WaitScope waitScope(loop);

  auto start = port.steadyTime();
448 449
  kj::Vector<TimePoint> expected;
  kj::Vector<TimePoint> actual;
450

451 452
  auto addTimer = [&](Duration delay) {
    expected.add(max(start + delay, start));
453
    port.atSteadyTime(start + delay).then([&]() {
454
      actual.add(port.steadyTime());
455 456 457
    }).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
  };

458 459 460 461 462
  addTimer(30 * MILLISECONDS);
  addTimer(40 * MILLISECONDS);
  addTimer(20350 * MICROSECONDS);
  addTimer(30 * MILLISECONDS);
  addTimer(-10 * MILLISECONDS);
463 464

  std::sort(expected.begin(), expected.end());
465
  port.atSteadyTime(expected.back() + MILLISECONDS).wait(waitScope);
466 467 468

  ASSERT_EQ(expected.size(), actual.size());
  for (int i = 0; i < expected.size(); ++i) {
469 470
    KJ_EXPECT(expected[i] <= actual[i], "Actual time for timer i is too early.",
              i, ((expected[i] - actual[i]) / NANOSECONDS));
471 472 473
  }
}

474 475
TEST(AsyncUnixTest, Wake) {
  captureSignals();
476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502
  UnixEventPort port;
  EventLoop loop(port);
  WaitScope waitScope(loop);

  EXPECT_FALSE(port.poll());
  port.wake();
  EXPECT_TRUE(port.poll());
  EXPECT_FALSE(port.poll());

  port.wake();
  EXPECT_TRUE(port.wait());

  {
    auto promise = port.atSteadyTime(port.steadyTime());
    EXPECT_FALSE(port.wait());
  }

  bool woken = false;
  Thread thread([&]() {
    delay();
    woken = true;
    port.wake();
  });

  EXPECT_TRUE(port.wait());
}

503
}  // namespace kj