protobuf-common.h 12.1 KB
Newer Older
Kenton Varda's avatar
Kenton Varda committed
1 2
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
3
//
Kenton Varda's avatar
Kenton Varda committed
4 5 6 7 8 9
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
10
//
Kenton Varda's avatar
Kenton Varda committed
11 12
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
13
//
Kenton Varda's avatar
Kenton Varda committed
14 15 16 17 18 19 20
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
21

22
#include "common.h"
23
#include <google/protobuf/io/zero_copy_stream_impl.h>
Kenton Varda's avatar
Kenton Varda committed
24
#include <google/protobuf/io/coded_stream.h>
25
#include <thread>
26
#if HAVE_SNAPPY
Kenton Varda's avatar
Kenton Varda committed
27 28
#include <snappy/snappy.h>
#include <snappy/snappy-sinksource.h>
29
#endif  // HAVE_SNAPPY
30

31
namespace capnp {
32 33 34 35 36
namespace benchmark {
namespace protobuf {

// =======================================================================================

37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
struct SingleUseMessages {
  template <typename MessageType>
  struct Message {
    struct Reusable {};
    struct SingleUse: public MessageType {
      inline SingleUse(Reusable&) {}
    };
  };

  struct ReusableString {};
  struct SingleUseString: std::string {
    inline SingleUseString(ReusableString&) {}
  };

  template <typename MessageType>
  static inline void doneWith(MessageType& message) {
    // Don't clear -- single-use.
  }
};

struct ReusableMessages {
  template <typename MessageType>
  struct Message {
    struct Reusable: public MessageType {};
    typedef MessageType& SingleUse;
  };

  typedef std::string ReusableString;
  typedef std::string& SingleUseString;

  template <typename MessageType>
  static inline void doneWith(MessageType& message) {
    message.Clear();
  }
};

// =======================================================================================
// The protobuf Java library defines a format for writing multiple protobufs to a stream, in which
// each message is prefixed by a varint size.  This was never added to the C++ library.  It's easy
// to do naively, but tricky to implement without accidentally losing various optimizations.  These
// two functions should be optimal.

Kenton Varda's avatar
Kenton Varda committed
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
struct Uncompressed {
  typedef google::protobuf::io::FileInputStream InputStream;
  typedef google::protobuf::io::FileOutputStream OutputStream;

  static uint64_t write(const google::protobuf::MessageLite& message,
                        google::protobuf::io::FileOutputStream* rawOutput) {
    google::protobuf::io::CodedOutputStream output(rawOutput);
    const int size = message.ByteSize();
    output.WriteVarint32(size);
    uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size);
    if (buffer != NULL) {
      message.SerializeWithCachedSizesToArray(buffer);
    } else {
      message.SerializeWithCachedSizes(&output);
      if (output.HadError()) {
        throw OsException(rawOutput->GetErrno());
      }
96
    }
Kenton Varda's avatar
Kenton Varda committed
97 98

    return size;
99 100
  }

Kenton Varda's avatar
Kenton Varda committed
101
  static void read(google::protobuf::io::ZeroCopyInputStream* rawInput,
102
                   google::protobuf::MessageLite* message) {
Kenton Varda's avatar
Kenton Varda committed
103 104 105 106 107 108 109 110 111 112
    google::protobuf::io::CodedInputStream input(rawInput);
    uint32_t size;
    GOOGLE_CHECK(input.ReadVarint32(&size));

    auto limit = input.PushLimit(size);

    GOOGLE_CHECK(message->MergePartialFromCodedStream(&input) &&
                 input.ConsumedEntireMessage());

    input.PopLimit(limit);
113 114
  }

Kenton Varda's avatar
Kenton Varda committed
115 116 117 118
  static void flush(google::protobuf::io::FileOutputStream* output) {
    if (!output->Flush()) throw OsException(output->GetErrno());
  }
};
119

Kenton Varda's avatar
Kenton Varda committed
120 121 122 123 124
// =======================================================================================
// The Snappy interface is really obnoxious.  I gave up here and am just reading/writing flat
// arrays in some static scratch space.  This probably gives protobufs an edge that it doesn't
// deserve.

125 126
#if HAVE_SNAPPY

Kenton Varda's avatar
Kenton Varda committed
127 128
static char scratch[1 << 20];
static char scratch2[1 << 20];
Kenton Varda's avatar
Kenton Varda committed
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163

struct SnappyCompressed {
  typedef int InputStream;
  typedef int OutputStream;

  static uint64_t write(const google::protobuf::MessageLite& message, int* output) {
    size_t size = message.ByteSize();
    GOOGLE_CHECK_LE(size, sizeof(scratch));

    message.SerializeWithCachedSizesToArray(reinterpret_cast<uint8_t*>(scratch));

    size_t compressedSize = 0;
    snappy::RawCompress(scratch, size, scratch2 + sizeof(uint32_t), &compressedSize);
    uint32_t tag = compressedSize;
    memcpy(scratch2, &tag, sizeof(tag));

    writeAll(*output, scratch2, compressedSize + sizeof(tag));
    return compressedSize + sizeof(tag);
  }

  static void read(int* input, google::protobuf::MessageLite* message) {
    uint32_t size;
    readAll(*input, &size, sizeof(size));
    readAll(*input, scratch, size);

    size_t uncompressedSize;
    GOOGLE_CHECK(snappy::GetUncompressedLength(scratch, size, &uncompressedSize));
    GOOGLE_CHECK(snappy::RawUncompress(scratch, size, scratch2));

    GOOGLE_CHECK(message->ParsePartialFromArray(scratch2, uncompressedSize));
  }

  static void flush(OutputStream*) {}
};

164 165
#endif  // HAVE_SNAPPY

166 167 168 169 170 171 172
// =======================================================================================

#define REUSABLE(type) \
  typename ReuseStrategy::template Message<typename TestCase::type>::Reusable
#define SINGLE_USE(type) \
  typename ReuseStrategy::template Message<typename TestCase::type>::SingleUse

Kenton Varda's avatar
Kenton Varda committed
173
template <typename TestCase, typename ReuseStrategy, typename Compression>
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
struct BenchmarkMethods {
  static uint64_t syncClient(int inputFd, int outputFd, uint64_t iters) {
    uint64_t throughput = 0;

    typename Compression::OutputStream output(outputFd);
    typename Compression::InputStream input(inputFd);

    REUSABLE(Request) reusableRequest;
    REUSABLE(Response) reusableResponse;

    for (; iters > 0; --iters) {
      SINGLE_USE(Request) request(reusableRequest);
      typename TestCase::Expectation expected = TestCase::setupRequest(&request);
      throughput += Compression::write(request, &output);
      Compression::flush(&output);
      ReuseStrategy::doneWith(request);

      SINGLE_USE(Response) response(reusableResponse);
      Compression::read(&input, &response);
      if (!TestCase::checkResponse(response, expected)) {
        throw std::logic_error("Incorrect response.");
      }
      ReuseStrategy::doneWith(response);
197
    }
198 199

    return throughput;
200
  }
Kenton Varda's avatar
Kenton Varda committed
201

202 203 204 205
  static uint64_t asyncClientSender(
      int outputFd, ProducerConsumerQueue<typename TestCase::Expectation>* expectations,
      uint64_t iters) {
    uint64_t throughput = 0;
206

207 208
    typename Compression::OutputStream output(outputFd);
    REUSABLE(Request) reusableRequest;
Kenton Varda's avatar
Kenton Varda committed
209

210 211 212 213 214 215 216
    for (; iters > 0; --iters) {
      SINGLE_USE(Request) request(reusableRequest);
      expectations->post(TestCase::setupRequest(&request));
      throughput += Compression::write(request, &output);
      Compression::flush(&output);
      ReuseStrategy::doneWith(request);
    }
217

218
    return throughput;
219 220
  }

221 222 223 224 225
  static void asyncClientReceiver(
      int inputFd, ProducerConsumerQueue<typename TestCase::Expectation>* expectations,
      uint64_t iters) {
    typename Compression::InputStream input(inputFd);
    REUSABLE(Response) reusableResponse;
226

227 228 229 230 231 232 233 234
    for (; iters > 0; --iters) {
      typename TestCase::Expectation expected = expectations->next();
      SINGLE_USE(Response) response(reusableResponse);
      Compression::read(&input, &response);
      if (!TestCase::checkResponse(response, expected)) {
        throw std::logic_error("Incorrect response.");
      }
      ReuseStrategy::doneWith(response);
235 236
    }
  }
Kenton Varda's avatar
Kenton Varda committed
237

238 239 240 241 242
  static uint64_t asyncClient(int inputFd, int outputFd, uint64_t iters) {
    ProducerConsumerQueue<typename TestCase::Expectation> expectations;
    std::thread receiverThread(asyncClientReceiver, inputFd, &expectations, iters);
    uint64_t throughput = asyncClientSender(outputFd, &expectations, iters);
    receiverThread.join();
243

244
    return throughput;
245
  }
Kenton Varda's avatar
Kenton Varda committed
246

247 248
  static uint64_t server(int inputFd, int outputFd, uint64_t iters) {
    uint64_t throughput = 0;
249

250 251
    typename Compression::OutputStream output(outputFd);
    typename Compression::InputStream input(inputFd);
Kenton Varda's avatar
Kenton Varda committed
252

253 254
    REUSABLE(Request) reusableRequest;
    REUSABLE(Response) reusableResponse;
255

256 257 258
    for (; iters > 0; --iters) {
      SINGLE_USE(Request) request(reusableRequest);
      Compression::read(&input, &request);
259

260 261 262
      SINGLE_USE(Response) response(reusableResponse);
      TestCase::handleRequest(request, &response);
      ReuseStrategy::doneWith(request);
Kenton Varda's avatar
Kenton Varda committed
263

264 265 266
      throughput += Compression::write(response, &output);
      Compression::flush(&output);
      ReuseStrategy::doneWith(response);
Kenton Varda's avatar
Kenton Varda committed
267
    }
Kenton Varda's avatar
Kenton Varda committed
268

269 270
    return throughput;
  }
271

272 273
  static uint64_t passByObject(uint64_t iters, bool countObjectSize) {
    uint64_t throughput = 0;
274

275 276
    REUSABLE(Request) reusableRequest;
    REUSABLE(Response) reusableResponse;
277

278 279 280
    for (; iters > 0; --iters) {
      SINGLE_USE(Request) request(reusableRequest);
      typename TestCase::Expectation expected = TestCase::setupRequest(&request);
281

282 283 284 285 286 287 288
      SINGLE_USE(Response) response(reusableResponse);
      TestCase::handleRequest(request, &response);
      ReuseStrategy::doneWith(request);
      if (!TestCase::checkResponse(response, expected)) {
        throw std::logic_error("Incorrect response.");
      }
      ReuseStrategy::doneWith(response);
289

290 291 292 293
      if (countObjectSize) {
        throughput += request.SpaceUsed();
        throughput += response.SpaceUsed();
      }
294
    }
295 296

    return throughput;
297
  }
Kenton Varda's avatar
Kenton Varda committed
298

299 300
  static uint64_t passByBytes(uint64_t iters) {
    uint64_t throughput = 0;
301

302 303 304 305 306
    REUSABLE(Request) reusableClientRequest;
    REUSABLE(Request) reusableServerRequest;
    REUSABLE(Response) reusableServerResponse;
    REUSABLE(Response) reusableClientResponse;
    typename ReuseStrategy::ReusableString reusableRequestString, reusableResponseString;
307

308 309 310
    for (; iters > 0; --iters) {
      SINGLE_USE(Request) clientRequest(reusableClientRequest);
      typename TestCase::Expectation expected = TestCase::setupRequest(&clientRequest);
311

312 313 314 315
      typename ReuseStrategy::SingleUseString requestString(reusableRequestString);
      clientRequest.SerializePartialToString(&requestString);
      throughput += requestString.size();
      ReuseStrategy::doneWith(clientRequest);
Kenton Varda's avatar
Kenton Varda committed
316

317 318
      SINGLE_USE(Request) serverRequest(reusableServerRequest);
      serverRequest.ParsePartialFromString(requestString);
319

320 321 322
      SINGLE_USE(Response) serverResponse(reusableServerResponse);
      TestCase::handleRequest(serverRequest, &serverResponse);
      ReuseStrategy::doneWith(serverRequest);
Kenton Varda's avatar
Kenton Varda committed
323

324 325 326 327
      typename ReuseStrategy::SingleUseString responseString(reusableResponseString);
      serverResponse.SerializePartialToString(&responseString);
      throughput += responseString.size();
      ReuseStrategy::doneWith(serverResponse);
328

329 330 331 332 333 334 335
      SINGLE_USE(Response) clientResponse(reusableClientResponse);
      clientResponse.ParsePartialFromString(responseString);

      if (!TestCase::checkResponse(clientResponse, expected)) {
        throw std::logic_error("Incorrect response.");
      }
      ReuseStrategy::doneWith(clientResponse);
336
    }
Kenton Varda's avatar
Kenton Varda committed
337 338

    return throughput;
339
  }
340
};
341

342 343
struct BenchmarkTypes {
  typedef protobuf::Uncompressed Uncompressed;
344
  typedef protobuf::Uncompressed Packed;
345
#if HAVE_SNAPPY
346
  typedef protobuf::SnappyCompressed SnappyCompressed;
347
#endif  // HAVE_SNAPPY
348

349 350
  typedef protobuf::ReusableMessages ReusableResources;
  typedef protobuf::SingleUseMessages SingleUseResources;
Kenton Varda's avatar
Kenton Varda committed
351

352 353 354 355
  template <typename TestCase, typename ReuseStrategy, typename Compression>
  struct BenchmarkMethods
      : public protobuf::BenchmarkMethods<TestCase, ReuseStrategy, Compression> {};
};
356 357 358

}  // namespace protobuf
}  // namespace benchmark
359
}  // namespace capnp