capnproto-common.h 15 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
// Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
//    list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
//    this list of conditions and the following disclaimer in the documentation
//    and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

Kenton Varda's avatar
Kenton Varda committed
24 25
#ifndef CAPNP_BENCHMARK_CAPNP_COMMON_H_
#define CAPNP_BENCHMARK_CAPNP_COMMON_H_
26 27

#include "common.h"
28 29
#include <capnp/serialize.h>
#include <capnp/serialize-packed.h>
Kenton Varda's avatar
Kenton Varda committed
30
#include <kj/debug.h>
31
#if HAVE_SNAPPY
32
#include <capnp/serialize-snappy.h>
33
#endif  // HAVE_SNAPPY
34 35
#include <thread>

36
namespace capnp {
37 38 39
namespace benchmark {
namespace capnp {

40
class CountingOutputStream: public kj::FdOutputStream {
Kenton Varda's avatar
Kenton Varda committed
41 42 43 44 45 46 47 48 49 50
public:
  CountingOutputStream(int fd): FdOutputStream(fd), throughput(0) {}

  uint64_t throughput;

  void write(const void* buffer, size_t size) override {
    FdOutputStream::write(buffer, size);
    throughput += size;
  }

51
  void write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
Kenton Varda's avatar
Kenton Varda committed
52 53 54 55 56 57 58 59 60 61
    FdOutputStream::write(pieces);
    for (auto& piece: pieces) {
      throughput += piece.size();
    }
  }
};

// =======================================================================================

struct Uncompressed {
62
  typedef kj::FdInputStream& BufferedInput;
63 64 65 66
  typedef InputStreamMessageReader MessageReader;

  class ArrayMessageReader: public FlatArrayMessageReader {
  public:
67
    ArrayMessageReader(kj::ArrayPtr<const byte> array,
68
                       ReaderOptions options = ReaderOptions(),
69 70
                       kj::ArrayPtr<word> scratchSpace = nullptr)
      : FlatArrayMessageReader(kj::arrayPtr(
71 72 73
          reinterpret_cast<const word*>(array.begin()),
          reinterpret_cast<const word*>(array.end())), options) {}
  };
Kenton Varda's avatar
Kenton Varda committed
74

75
  static inline void write(kj::OutputStream& output, MessageBuilder& builder) {
Kenton Varda's avatar
Kenton Varda committed
76 77 78 79
    writeMessage(output, builder);
  }
};

80
struct Packed {
81
  typedef kj::BufferedInputStreamWrapper BufferedInput;
82 83
  typedef PackedMessageReader MessageReader;

84
  class ArrayMessageReader: private kj::ArrayInputStream, public PackedMessageReader {
85
  public:
86
    ArrayMessageReader(kj::ArrayPtr<const byte> array,
87
                       ReaderOptions options = ReaderOptions(),
88
                       kj::ArrayPtr<word> scratchSpace = nullptr)
89 90 91 92
      : ArrayInputStream(array),
        PackedMessageReader(*this, options, scratchSpace) {}
  };

93
  static inline void write(kj::OutputStream& output, MessageBuilder& builder) {
94 95 96
    writePackedMessage(output, builder);
  }

97
  static inline void write(kj::BufferedOutputStream& output, MessageBuilder& builder) {
98 99 100 101
    writePackedMessage(output, builder);
  }
};

102
#if HAVE_SNAPPY
Kenton Varda's avatar
Kenton Varda committed
103 104 105 106
static byte snappyReadBuffer[SNAPPY_BUFFER_SIZE];
static byte snappyWriteBuffer[SNAPPY_BUFFER_SIZE];
static byte snappyCompressedBuffer[SNAPPY_COMPRESSED_BUFFER_SIZE];

Kenton Varda's avatar
Kenton Varda committed
107
struct SnappyCompressed {
Kenton Varda's avatar
Kenton Varda committed
108 109
  typedef BufferedInputStreamWrapper BufferedInput;
  typedef SnappyPackedMessageReader MessageReader;
110

Kenton Varda's avatar
Kenton Varda committed
111
  class ArrayMessageReader: private ArrayInputStream, public SnappyPackedMessageReader {
112
  public:
113
    ArrayMessageReader(kj::ArrayPtr<const byte> array,
114
                       ReaderOptions options = ReaderOptions(),
115
                       kj::ArrayPtr<word> scratchSpace = nullptr)
116
      : ArrayInputStream(array),
Kenton Varda's avatar
Kenton Varda committed
117
        SnappyPackedMessageReader(static_cast<ArrayInputStream&>(*this), options, scratchSpace,
118
                                  kj::arrayPtr(snappyReadBuffer, SNAPPY_BUFFER_SIZE)) {}
119
  };
Kenton Varda's avatar
Kenton Varda committed
120 121

  static inline void write(OutputStream& output, MessageBuilder& builder) {
Kenton Varda's avatar
Kenton Varda committed
122
    writeSnappyPackedMessage(output, builder,
123 124
        kj::arrayPtr(snappyWriteBuffer, SNAPPY_BUFFER_SIZE),
        kj::arrayPtr(snappyCompressedBuffer, SNAPPY_COMPRESSED_BUFFER_SIZE));
125
  }
Kenton Varda's avatar
Kenton Varda committed
126
};
127
#endif  // HAVE_SNAPPY
Kenton Varda's avatar
Kenton Varda committed
128 129 130

// =======================================================================================

131 132 133
struct NoScratch {
  struct ScratchSpace {};

134
  template <typename Compression>
Kenton Varda's avatar
Kenton Varda committed
135
  class MessageReader: public Compression::MessageReader {
136
  public:
137 138 139 140 141 142 143
    inline MessageReader(typename Compression::BufferedInput& input, ScratchSpace& scratch)
        : Compression::MessageReader(input) {}
  };

  template <typename Compression>
  class ArrayMessageReader: public Compression::ArrayMessageReader {
  public:
144
    inline ArrayMessageReader(kj::ArrayPtr<const byte> input, ScratchSpace& scratch)
145
        : Compression::ArrayMessageReader(input) {}
146 147 148 149 150 151
  };

  class MessageBuilder: public MallocMessageBuilder {
  public:
    inline MessageBuilder(ScratchSpace& scratch): MallocMessageBuilder() {}
  };
Kenton Varda's avatar
Kenton Varda committed
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171

  class ObjectSizeCounter {
  public:
    ObjectSizeCounter(uint64_t iters): counter(0) {}

    template <typename RequestBuilder, typename ResponseBuilder>
    void add(RequestBuilder& request, ResponseBuilder& response) {
      for (auto segment: request.getSegmentsForOutput()) {
        counter += segment.size() * sizeof(word);
      }
      for (auto segment: response.getSegmentsForOutput()) {
        counter += segment.size() * sizeof(word);
      }
    }

    uint64_t get() { return counter; }

  private:
    uint64_t counter;
  };
172 173
};

Kenton Varda's avatar
Kenton Varda committed
174
constexpr size_t SCRATCH_SIZE = 128 * 1024;
175
word scratchSpace[6 * SCRATCH_SIZE];
Kenton Varda's avatar
Kenton Varda committed
176 177
int scratchCounter = 0;

178 179
struct UseScratch {
  struct ScratchSpace {
Kenton Varda's avatar
Kenton Varda committed
180 181 182
    word* words;

    ScratchSpace() {
183
      KJ_REQUIRE(scratchCounter < 6, "Too many scratch spaces needed at once.");
Kenton Varda's avatar
Kenton Varda committed
184 185
      words = scratchSpace + scratchCounter++ * SCRATCH_SIZE;
    }
186
    ~ScratchSpace() noexcept {
Kenton Varda's avatar
Kenton Varda committed
187 188
      --scratchCounter;
    }
189 190
  };

191
  template <typename Compression>
Kenton Varda's avatar
Kenton Varda committed
192
  class MessageReader: public Compression::MessageReader {
193
  public:
194 195
    inline MessageReader(typename Compression::BufferedInput& input, ScratchSpace& scratch)
        : Compression::MessageReader(
196
            input, ReaderOptions(), kj::arrayPtr(scratch.words, SCRATCH_SIZE)) {}
197 198 199 200 201
  };

  template <typename Compression>
  class ArrayMessageReader: public Compression::ArrayMessageReader {
  public:
202
    inline ArrayMessageReader(kj::ArrayPtr<const byte> input, ScratchSpace& scratch)
203
        : Compression::ArrayMessageReader(
204
            input, ReaderOptions(), kj::arrayPtr(scratch.words, SCRATCH_SIZE)) {}
205 206 207 208 209
  };

  class MessageBuilder: public MallocMessageBuilder {
  public:
    inline MessageBuilder(ScratchSpace& scratch)
210
        : MallocMessageBuilder(kj::arrayPtr(scratch.words, SCRATCH_SIZE)) {}
Kenton Varda's avatar
Kenton Varda committed
211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
  };

  class ObjectSizeCounter {
  public:
    ObjectSizeCounter(uint64_t iters): iters(iters), maxSize(0) {}

    template <typename RequestBuilder, typename ResponseBuilder>
    void add(RequestBuilder& request, ResponseBuilder& response) {
      size_t counter = 0;
      for (auto segment: request.getSegmentsForOutput()) {
        counter += segment.size() * sizeof(word);
      }
      for (auto segment: response.getSegmentsForOutput()) {
        counter += segment.size() * sizeof(word);
      }
      maxSize = std::max(counter, maxSize);
    }

    uint64_t get() { return iters * maxSize; }

  private:
    uint64_t iters;
    size_t maxSize;
234 235 236 237 238
  };
};

// =======================================================================================

Kenton Varda's avatar
Kenton Varda committed
239
template <typename TestCase, typename ReuseStrategy, typename Compression>
240 241
struct BenchmarkMethods {
  static uint64_t syncClient(int inputFd, int outputFd, uint64_t iters) {
242
    kj::FdInputStream inputStream(inputFd);
243 244
    typename Compression::BufferedInput bufferedInput(inputStream);

245
    CountingOutputStream output(outputFd);
246 247
    typename ReuseStrategy::ScratchSpace builderScratch;
    typename ReuseStrategy::ScratchSpace readerScratch;
248 249 250 251

    for (; iters > 0; --iters) {
      typename TestCase::Expectation expected;
      {
252
        typename ReuseStrategy::MessageBuilder builder(builderScratch);
253 254 255 256 257 258
        expected = TestCase::setupRequest(
            builder.template initRoot<typename TestCase::Request>());
        Compression::write(output, builder);
      }

      {
259 260
        typename ReuseStrategy::template MessageReader<Compression> reader(
            bufferedInput, readerScratch);
261 262 263 264 265 266
        if (!TestCase::checkResponse(
            reader.template getRoot<typename TestCase::Response>(), expected)) {
          throw std::logic_error("Incorrect response.");
        }
      }
    }
267

268 269 270 271 272 273 274 275 276 277
    return output.throughput;
  }

  static uint64_t asyncClientSender(
      int outputFd, ProducerConsumerQueue<typename TestCase::Expectation>* expectations,
      uint64_t iters) {
    CountingOutputStream output(outputFd);
    typename ReuseStrategy::ScratchSpace scratch;

    for (; iters > 0; --iters) {
278
      typename ReuseStrategy::MessageBuilder builder(scratch);
279 280
      expectations->post(TestCase::setupRequest(
          builder.template initRoot<typename TestCase::Request>()));
Kenton Varda's avatar
Kenton Varda committed
281
      Compression::write(output, builder);
282
    }
283

284 285 286 287 288 289
    return output.throughput;
  }

  static void asyncClientReceiver(
      int inputFd, ProducerConsumerQueue<typename TestCase::Expectation>* expectations,
      uint64_t iters) {
290
    kj::FdInputStream inputStream(inputFd);
291 292
    typename Compression::BufferedInput bufferedInput(inputStream);

293 294 295 296
    typename ReuseStrategy::ScratchSpace scratch;

    for (; iters > 0; --iters) {
      typename TestCase::Expectation expected = expectations->next();
297
      typename ReuseStrategy::template MessageReader<Compression> reader(bufferedInput, scratch);
298 299 300 301
      if (!TestCase::checkResponse(
          reader.template getRoot<typename TestCase::Response>(), expected)) {
        throw std::logic_error("Incorrect response.");
      }
302 303
    }
  }
Kenton Varda's avatar
Kenton Varda committed
304

305 306 307 308 309 310
  static uint64_t asyncClient(int inputFd, int outputFd, uint64_t iters) {
    ProducerConsumerQueue<typename TestCase::Expectation> expectations;
    std::thread receiverThread(asyncClientReceiver, inputFd, &expectations, iters);
    uint64_t throughput = asyncClientSender(outputFd, &expectations, iters);
    receiverThread.join();
    return throughput;
311
  }
Kenton Varda's avatar
Kenton Varda committed
312

313
  static uint64_t server(int inputFd, int outputFd, uint64_t iters) {
314
    kj::FdInputStream inputStream(inputFd);
315 316
    typename Compression::BufferedInput bufferedInput(inputStream);

317 318 319
    CountingOutputStream output(outputFd);
    typename ReuseStrategy::ScratchSpace builderScratch;
    typename ReuseStrategy::ScratchSpace readerScratch;
320

321 322
    for (; iters > 0; --iters) {
      typename ReuseStrategy::MessageBuilder builder(builderScratch);
323 324
      typename ReuseStrategy::template MessageReader<Compression> reader(
          bufferedInput, readerScratch);
325 326 327
      TestCase::handleRequest(reader.template getRoot<typename TestCase::Request>(),
                              builder.template initRoot<typename TestCase::Response>());
      Compression::write(output, builder);
328 329
    }

330
    return output.throughput;
331
  }
Kenton Varda's avatar
Kenton Varda committed
332

333 334 335
  static uint64_t passByObject(uint64_t iters, bool countObjectSize) {
    typename ReuseStrategy::ScratchSpace requestScratch;
    typename ReuseStrategy::ScratchSpace responseScratch;
336

337
    typename ReuseStrategy::ObjectSizeCounter counter(iters);
338

339 340 341 342
    for (; iters > 0; --iters) {
      typename ReuseStrategy::MessageBuilder requestMessage(requestScratch);
      auto request = requestMessage.template initRoot<typename TestCase::Request>();
      typename TestCase::Expectation expected = TestCase::setupRequest(request);
Kenton Varda's avatar
Kenton Varda committed
343

344 345 346
      typename ReuseStrategy::MessageBuilder responseMessage(responseScratch);
      auto response = responseMessage.template initRoot<typename TestCase::Response>();
      TestCase::handleRequest(request.asReader(), response);
347

348 349 350
      if (!TestCase::checkResponse(response.asReader(), expected)) {
        throw std::logic_error("Incorrect response.");
      }
351

352 353 354
      if (countObjectSize) {
        counter.add(requestMessage, responseMessage);
      }
355
    }
Kenton Varda's avatar
Kenton Varda committed
356

357
    return counter.get();
358
  }
Kenton Varda's avatar
Kenton Varda committed
359

360 361
  static uint64_t passByBytes(uint64_t iters) {
    uint64_t throughput = 0;
362 363 364 365 366 367
    typename ReuseStrategy::ScratchSpace clientRequestScratch;
    UseScratch::ScratchSpace requestBytesScratch;
    typename ReuseStrategy::ScratchSpace serverRequestScratch;
    typename ReuseStrategy::ScratchSpace serverResponseScratch;
    UseScratch::ScratchSpace responseBytesScratch;
    typename ReuseStrategy::ScratchSpace clientResponseScratch;
Kenton Varda's avatar
Kenton Varda committed
368

369
    for (; iters > 0; --iters) {
370
      typename ReuseStrategy::MessageBuilder requestBuilder(clientRequestScratch);
371 372
      typename TestCase::Expectation expected = TestCase::setupRequest(
          requestBuilder.template initRoot<typename TestCase::Request>());
373

374
      kj::ArrayOutputStream requestOutput(kj::arrayPtr(
375
          reinterpret_cast<byte*>(requestBytesScratch.words), SCRATCH_SIZE * sizeof(word)));
376 377 378 379 380 381
      Compression::write(requestOutput, requestBuilder);
      throughput += requestOutput.getArray().size();
      typename ReuseStrategy::template ArrayMessageReader<Compression> requestReader(
          requestOutput.getArray(), serverRequestScratch);

      typename ReuseStrategy::MessageBuilder responseBuilder(serverResponseScratch);
382 383
      TestCase::handleRequest(requestReader.template getRoot<typename TestCase::Request>(),
                              responseBuilder.template initRoot<typename TestCase::Response>());
384

385
      kj::ArrayOutputStream responseOutput(
386 387
          kj::arrayPtr(reinterpret_cast<byte*>(responseBytesScratch.words),
                       SCRATCH_SIZE * sizeof(word)));
388 389 390 391 392
      Compression::write(responseOutput, responseBuilder);
      throughput += responseOutput.getArray().size();
      typename ReuseStrategy::template ArrayMessageReader<Compression> responseReader(
          responseOutput.getArray(), clientResponseScratch);

393 394 395 396 397
      if (!TestCase::checkResponse(
          responseReader.template getRoot<typename TestCase::Response>(), expected)) {
        throw std::logic_error("Incorrect response.");
      }
     }
Kenton Varda's avatar
Kenton Varda committed
398 399

    return throughput;
400
  }
401
};
402

403 404
struct BenchmarkTypes {
  typedef capnp::Uncompressed Uncompressed;
405
  typedef capnp::Packed Packed;
406
#if HAVE_SNAPPY
407
  typedef capnp::SnappyCompressed SnappyCompressed;
408
#endif  // HAVE_SNAPPY
409

410 411
  typedef capnp::UseScratch ReusableResources;
  typedef capnp::NoScratch SingleUseResources;
Kenton Varda's avatar
Kenton Varda committed
412

413 414 415
  template <typename TestCase, typename ReuseStrategy, typename Compression>
  struct BenchmarkMethods: public capnp::BenchmarkMethods<TestCase, ReuseStrategy, Compression> {};
};
416

417
}  // namespace capnp
418
}  // namespace benchmark
419
}  // namespace capnp
420

Kenton Varda's avatar
Kenton Varda committed
421
#endif  // CAPNP_BENCHMARK_CAPNP_COMMON_H_