Commit 40f48d66 authored by Kenton Varda's avatar Kenton Varda

Fix RPC tests to pass on Windows.

parent 2b4f46a3
......@@ -28,6 +28,12 @@
#include <kj/miniposix.h>
#include "test-util.h"
#if !_WIN32
// This test is super-slow on Windows seemingly due to generating exception stack traces being
// expensive.
//
// TODO(perf): Maybe create an API to disable stack traces, and use it here.
namespace capnp {
namespace _ { // private
namespace {
......@@ -257,3 +263,5 @@ KJ_TEST("fuzz-test double-far pointer") {
} // namespace
} // namespace _ (private)
} // namespace capnp
#endif
......@@ -22,7 +22,6 @@
#include "rpc-twoparty.h"
#include "test-util.h"
#include <capnp/rpc.capnp.h>
#include <kj/async-unix.h>
#include <kj/debug.h>
#include <kj/thread.h>
#include <kj/compat/gtest.h>
......
......@@ -27,10 +27,22 @@
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include "test-util.h"
#include <kj/compat/gtest.h>
#if _WIN32
#define WIN32_LEAN_AND_MEAN
#include <winsock2.h>
#include <kj/windows-sanity.h>
namespace kj {
namespace _ {
int win32Socketpair(SOCKET socks[2]);
}
}
#else
#include <sys/socket.h>
#endif
namespace capnp {
namespace _ { // private
namespace {
......@@ -86,11 +98,21 @@ private:
class PipeWithSmallBuffer {
public:
#ifdef _WIN32
#define KJ_SOCKCALL KJ_WINSOCK
#ifndef SHUT_WR
#define SHUT_WR SD_SEND
#endif
#define socketpair(family, type, flags, fds) kj::_::win32Socketpair(fds)
#else
#define KJ_SOCKCALL KJ_SYSCALL
#endif
PipeWithSmallBuffer() {
// Use a socketpair rather than a pipe so that we can set the buffer size extremely small.
KJ_SYSCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, fds));
KJ_SOCKCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, fds));
KJ_SYSCALL(shutdown(fds[0], SHUT_WR));
KJ_SOCKCALL(shutdown(fds[0], SHUT_WR));
// Note: OSX reports ENOTCONN if we also try to shutdown(fds[1], SHUT_RD).
// Request that the buffer size be as small as possible, to force the event loop to kick in.
......@@ -106,25 +128,82 @@ public:
// Anyway, we now use 127 to avoid these issues (but also to screw around with non-word-boundary
// writes).
uint small = 127;
KJ_SYSCALL(setsockopt(fds[0], SOL_SOCKET, SO_RCVBUF, &small, sizeof(small)));
KJ_SYSCALL(setsockopt(fds[1], SOL_SOCKET, SO_SNDBUF, &small, sizeof(small)));
KJ_SOCKCALL(setsockopt(fds[0], SOL_SOCKET, SO_RCVBUF, (const char*)&small, sizeof(small)));
KJ_SOCKCALL(setsockopt(fds[1], SOL_SOCKET, SO_SNDBUF, (const char*)&small, sizeof(small)));
}
~PipeWithSmallBuffer() {
#if _WIN32
closesocket(fds[0]);
closesocket(fds[1]);
#else
close(fds[0]);
close(fds[1]);
#endif
}
inline int operator[](uint index) { return fds[index]; }
private:
#ifdef _WIN32
SOCKET fds[2];
#else
int fds[2];
#endif
};
#if _WIN32
// Sockets on win32 are not file descriptors. Ugh.
//
// TODO(cleanup): Maybe put these somewhere reusable? kj/io.h is inappropriate since we don't
// really want to link against winsock.
class SocketOutputStream: public kj::OutputStream {
public:
explicit SocketOutputStream(SOCKET fd): fd(fd) {}
void write(const void* buffer, size_t size) override {
const char* ptr = reinterpret_cast<const char*>(buffer);
while (size > 0) {
ssize_t n;
KJ_SOCKCALL(n = send(fd, ptr, size, 0));
size -= n;
ptr += n;
}
}
private:
SOCKET fd;
};
class SocketInputStream: public kj::InputStream {
public:
explicit SocketInputStream(SOCKET fd): fd(fd) {}
size_t tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
char* ptr = reinterpret_cast<char*>(buffer);
size_t total = 0;
while (total < minBytes) {
ssize_t n;
KJ_SOCKCALL(n = recv(fd, ptr, maxBytes, 0));
total += n;
maxBytes -= n;
ptr += n;
}
}
private:
SOCKET fd;
};
#else // _WIN32
typedef kj::FdOutputStream SocketOutputStream;
typedef kj::FdInputStream SocketInputStream;
#endif // _WIN32, else
TEST(SerializeAsyncTest, ParseAsync) {
PipeWithSmallBuffer fds;
auto ioContext = kj::setupAsyncIo();
auto input = ioContext.lowLevelProvider->wrapInputFd(fds[0]);
kj::FdOutputStream rawOutput(fds[1]);
SocketOutputStream rawOutput(fds[1]);
FragmentingOutputStream output(rawOutput);
TestMessageBuilder message(1);
......@@ -143,7 +222,7 @@ TEST(SerializeAsyncTest, ParseAsyncOddSegmentCount) {
PipeWithSmallBuffer fds;
auto ioContext = kj::setupAsyncIo();
auto input = ioContext.lowLevelProvider->wrapInputFd(fds[0]);
kj::FdOutputStream rawOutput(fds[1]);
SocketOutputStream rawOutput(fds[1]);
FragmentingOutputStream output(rawOutput);
TestMessageBuilder message(7);
......@@ -162,7 +241,7 @@ TEST(SerializeAsyncTest, ParseAsyncEvenSegmentCount) {
PipeWithSmallBuffer fds;
auto ioContext = kj::setupAsyncIo();
auto input = ioContext.lowLevelProvider->wrapInputFd(fds[0]);
kj::FdOutputStream rawOutput(fds[1]);
SocketOutputStream rawOutput(fds[1]);
FragmentingOutputStream output(rawOutput);
TestMessageBuilder message(10);
......@@ -190,7 +269,8 @@ TEST(SerializeAsyncTest, WriteAsync) {
}
kj::Thread thread([&]() {
StreamFdMessageReader reader(fds[0]);
SocketInputStream input(fds[0]);
InputStreamMessageReader reader(input);
auto listReader = reader.getRoot<TestAllTypes>().getStructList();
EXPECT_EQ(list.size(), listReader.size());
for (auto element: listReader) {
......@@ -214,7 +294,8 @@ TEST(SerializeAsyncTest, WriteAsyncOddSegmentCount) {
}
kj::Thread thread([&]() {
StreamFdMessageReader reader(fds[0]);
SocketInputStream input(fds[0]);
InputStreamMessageReader reader(input);
auto listReader = reader.getRoot<TestAllTypes>().getStructList();
EXPECT_EQ(list.size(), listReader.size());
for (auto element: listReader) {
......@@ -238,7 +319,8 @@ TEST(SerializeAsyncTest, WriteAsyncEvenSegmentCount) {
}
kj::Thread thread([&]() {
StreamFdMessageReader reader(fds[0]);
SocketInputStream input(fds[0]);
InputStreamMessageReader reader(input);
auto listReader = reader.getRoot<TestAllTypes>().getStructList();
EXPECT_EQ(list.size(), listReader.size());
for (auto element: listReader) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment