serialize-async-test.c++ 10.1 KB
Newer Older
Kenton Varda's avatar
Kenton Varda committed
1 2
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
3
//
Kenton Varda's avatar
Kenton Varda committed
4 5 6 7 8 9
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
10
//
Kenton Varda's avatar
Kenton Varda committed
11 12
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
13
//
Kenton Varda's avatar
Kenton Varda committed
14 15 16 17 18 19 20
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
21

22
#ifndef _GNU_SOURCE
Ivan Shynkarenka's avatar
Ivan Shynkarenka committed
23 24 25
#define _GNU_SOURCE
#endif

26 27 28 29 30
#include "serialize-async.h"
#include "serialize.h"
#include <kj/debug.h>
#include <kj/thread.h>
#include <stdlib.h>
31
#include <kj/miniposix.h>
32
#include "test-util.h"
33
#include <kj/compat/gtest.h>
34

35 36 37 38 39 40 41 42 43 44 45 46 47
#if _WIN32
#define WIN32_LEAN_AND_MEAN
#include <winsock2.h>
#include <kj/windows-sanity.h>
namespace kj {
  namespace _ {
    int win32Socketpair(SOCKET socks[2]);
  }
}
#else
#include <sys/socket.h>
#endif

48 49 50 51
namespace capnp {
namespace _ {  // private
namespace {

52 53 54 55 56 57
#if _WIN32
inline void delay() { Sleep(5); }
#else
inline void delay() { usleep(5000); }
#endif

58 59 60 61 62 63
class FragmentingOutputStream: public kj::OutputStream {
public:
  FragmentingOutputStream(kj::OutputStream& inner): inner(inner) {}

  void write(const void* buffer, size_t size) override {
    while (size > 0) {
64
      delay();
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
      size_t n = rand() % size + 1;
      inner.write(buffer, n);
      buffer = reinterpret_cast<const byte*>(buffer) + n;
      size -= n;
    }
  }

private:
  kj::OutputStream& inner;
};

class TestMessageBuilder: public MallocMessageBuilder {
  // A MessageBuilder that tries to allocate an exact number of total segments, by allocating
  // minimum-size segments until it reaches the number, then allocating one large segment to
  // finish.

public:
  explicit TestMessageBuilder(uint desiredSegmentCount)
      : MallocMessageBuilder(0, AllocationStrategy::FIXED_SIZE),
        desiredSegmentCount(desiredSegmentCount) {}
  ~TestMessageBuilder() {
    EXPECT_EQ(0u, desiredSegmentCount);
  }

  kj::ArrayPtr<word> allocateSegment(uint minimumSize) override {
    if (desiredSegmentCount <= 1) {
      if (desiredSegmentCount < 1) {
        ADD_FAILURE() << "Allocated more segments than desired.";
      } else {
        --desiredSegmentCount;
      }
      return MallocMessageBuilder::allocateSegment(8192);
    } else {
      --desiredSegmentCount;
      return MallocMessageBuilder::allocateSegment(minimumSize);
    }
  }

private:
  uint desiredSegmentCount;
};

107 108
class PipeWithSmallBuffer {
public:
109 110 111 112 113 114 115 116 117 118
#ifdef _WIN32
#define KJ_SOCKCALL KJ_WINSOCK
#ifndef SHUT_WR
#define SHUT_WR SD_SEND
#endif
#define socketpair(family, type, flags, fds) kj::_::win32Socketpair(fds)
#else
#define KJ_SOCKCALL KJ_SYSCALL
#endif

119
  PipeWithSmallBuffer() {
120
    // Use a socketpair rather than a pipe so that we can set the buffer size extremely small.
121
    KJ_SOCKCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, fds));
122

123
    KJ_SOCKCALL(shutdown(fds[0], SHUT_WR));
Kenton Varda's avatar
Kenton Varda committed
124
    // Note:  OSX reports ENOTCONN if we also try to shutdown(fds[1], SHUT_RD).
125 126

    // Request that the buffer size be as small as possible, to force the event loop to kick in.
127 128 129 130 131 132 133 134
    // FUN STUFF:
    // - On Linux, the kernel rounds up to the smallest size it permits, so we can ask for a size of
    //   zero.
    // - On OSX, the kernel reports EINVAL on zero, but will dutifully use a 1-byte buffer if we
    //   set the size to 1.  This tends to cause stack overflows due to ridiculously long promise
    //   chains.
    // - Cygwin will apparently actually use a buffer size of 0 and therefore block forever waiting
    //   for buffer space.
135 136 137
    // - GNU HURD throws ENOPROTOOPT for SO_RCVBUF. Apparently, technically, a Unix domain socket
    //   has only one buffer, and it's controlled via SO_SNDBUF on the other end. OK, we'll ignore
    //   errors on SO_RCVBUF, then.
138 139 140 141
    //
    // Anyway, we now use 127 to avoid these issues (but also to screw around with non-word-boundary
    // writes).
    uint small = 127;
142
    setsockopt(fds[0], SOL_SOCKET, SO_RCVBUF, (const char*)&small, sizeof(small));
143
    KJ_SOCKCALL(setsockopt(fds[1], SOL_SOCKET, SO_SNDBUF, (const char*)&small, sizeof(small)));
144
  }
145
  ~PipeWithSmallBuffer() {
146 147 148 149
#if _WIN32
    closesocket(fds[0]);
    closesocket(fds[1]);
#else
150 151
    close(fds[0]);
    close(fds[1]);
152
#endif
153
  }
154 155 156 157

  inline int operator[](uint index) { return fds[index]; }

private:
158 159 160
#ifdef _WIN32
  SOCKET fds[2];
#else
161
  int fds[2];
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
#endif
};

#if _WIN32
// Sockets on win32 are not file descriptors. Ugh.
//
// TODO(cleanup): Maybe put these somewhere reusable? kj/io.h is inappropriate since we don't
//   really want to link against winsock.

class SocketOutputStream: public kj::OutputStream {
public:
  explicit SocketOutputStream(SOCKET fd): fd(fd) {}

  void write(const void* buffer, size_t size) override {
    const char* ptr = reinterpret_cast<const char*>(buffer);
    while (size > 0) {
178
      kj::miniposix::ssize_t n;
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
      KJ_SOCKCALL(n = send(fd, ptr, size, 0));
      size -= n;
      ptr += n;
    }
  }

private:
  SOCKET fd;
};

class SocketInputStream: public kj::InputStream {
public:
  explicit SocketInputStream(SOCKET fd): fd(fd) {}

  size_t tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
    char* ptr = reinterpret_cast<char*>(buffer);
    size_t total = 0;
    while (total < minBytes) {
197
      kj::miniposix::ssize_t n;
198 199 200 201 202
      KJ_SOCKCALL(n = recv(fd, ptr, maxBytes, 0));
      total += n;
      maxBytes -= n;
      ptr += n;
    }
203
    return total;
204 205 206 207
  }

private:
  SOCKET fd;
208
};
209 210 211 212
#else  // _WIN32
typedef kj::FdOutputStream SocketOutputStream;
typedef kj::FdInputStream SocketInputStream;
#endif  // _WIN32, else
213

214 215
TEST(SerializeAsyncTest, ParseAsync) {
  PipeWithSmallBuffer fds;
216 217
  auto ioContext = kj::setupAsyncIo();
  auto input = ioContext.lowLevelProvider->wrapInputFd(fds[0]);
218
  SocketOutputStream rawOutput(fds[1]);
219 220 221 222 223 224 225 226 227
  FragmentingOutputStream output(rawOutput);

  TestMessageBuilder message(1);
  initTestMessage(message.getRoot<TestAllTypes>());

  kj::Thread thread([&]() {
    writeMessage(output, message);
  });

228
  auto received = readMessage(*input).wait(ioContext.waitScope);
229

230 231 232
  checkTestMessage(received->getRoot<TestAllTypes>());
}

233 234
TEST(SerializeAsyncTest, ParseAsyncOddSegmentCount) {
  PipeWithSmallBuffer fds;
235 236
  auto ioContext = kj::setupAsyncIo();
  auto input = ioContext.lowLevelProvider->wrapInputFd(fds[0]);
237
  SocketOutputStream rawOutput(fds[1]);
238 239 240 241 242 243 244 245 246
  FragmentingOutputStream output(rawOutput);

  TestMessageBuilder message(7);
  initTestMessage(message.getRoot<TestAllTypes>());

  kj::Thread thread([&]() {
    writeMessage(output, message);
  });

247
  auto received = readMessage(*input).wait(ioContext.waitScope);
248

249 250 251
  checkTestMessage(received->getRoot<TestAllTypes>());
}

252 253
TEST(SerializeAsyncTest, ParseAsyncEvenSegmentCount) {
  PipeWithSmallBuffer fds;
254 255
  auto ioContext = kj::setupAsyncIo();
  auto input = ioContext.lowLevelProvider->wrapInputFd(fds[0]);
256
  SocketOutputStream rawOutput(fds[1]);
257 258 259 260 261 262 263 264 265
  FragmentingOutputStream output(rawOutput);

  TestMessageBuilder message(10);
  initTestMessage(message.getRoot<TestAllTypes>());

  kj::Thread thread([&]() {
    writeMessage(output, message);
  });

266
  auto received = readMessage(*input).wait(ioContext.waitScope);
267

268 269 270
  checkTestMessage(received->getRoot<TestAllTypes>());
}

271 272
TEST(SerializeAsyncTest, WriteAsync) {
  PipeWithSmallBuffer fds;
273 274
  auto ioContext = kj::setupAsyncIo();
  auto output = ioContext.lowLevelProvider->wrapOutputFd(fds[1]);
275 276 277 278 279 280 281 282 283

  TestMessageBuilder message(1);
  auto root = message.getRoot<TestAllTypes>();
  auto list = root.initStructList(16);
  for (auto element: list) {
    initTestMessage(element);
  }

  kj::Thread thread([&]() {
284 285
    SocketInputStream input(fds[0]);
    InputStreamMessageReader reader(input);
286 287 288 289 290 291 292
    auto listReader = reader.getRoot<TestAllTypes>().getStructList();
    EXPECT_EQ(list.size(), listReader.size());
    for (auto element: listReader) {
      checkTestMessage(element);
    }
  });

293
  writeMessage(*output, message).wait(ioContext.waitScope);
294 295
}

296 297
TEST(SerializeAsyncTest, WriteAsyncOddSegmentCount) {
  PipeWithSmallBuffer fds;
298 299
  auto ioContext = kj::setupAsyncIo();
  auto output = ioContext.lowLevelProvider->wrapOutputFd(fds[1]);
300 301 302 303 304 305 306 307 308

  TestMessageBuilder message(7);
  auto root = message.getRoot<TestAllTypes>();
  auto list = root.initStructList(16);
  for (auto element: list) {
    initTestMessage(element);
  }

  kj::Thread thread([&]() {
309 310
    SocketInputStream input(fds[0]);
    InputStreamMessageReader reader(input);
311 312 313 314 315 316 317
    auto listReader = reader.getRoot<TestAllTypes>().getStructList();
    EXPECT_EQ(list.size(), listReader.size());
    for (auto element: listReader) {
      checkTestMessage(element);
    }
  });

318
  writeMessage(*output, message).wait(ioContext.waitScope);
319 320
}

321 322
TEST(SerializeAsyncTest, WriteAsyncEvenSegmentCount) {
  PipeWithSmallBuffer fds;
323 324
  auto ioContext = kj::setupAsyncIo();
  auto output = ioContext.lowLevelProvider->wrapOutputFd(fds[1]);
325 326 327 328 329 330 331 332 333

  TestMessageBuilder message(10);
  auto root = message.getRoot<TestAllTypes>();
  auto list = root.initStructList(16);
  for (auto element: list) {
    initTestMessage(element);
  }

  kj::Thread thread([&]() {
334 335
    SocketInputStream input(fds[0]);
    InputStreamMessageReader reader(input);
336 337 338 339 340 341 342
    auto listReader = reader.getRoot<TestAllTypes>().getStructList();
    EXPECT_EQ(list.size(), listReader.size());
    for (auto element: listReader) {
      checkTestMessage(element);
    }
  });

343
  writeMessage(*output, message).wait(ioContext.waitScope);
344 345 346 347 348
}

}  // namespace
}  // namespace _ (private)
}  // namespace capnp