serialize-snappy.c++ 7.05 KB
Newer Older
Kenton Varda's avatar
Kenton Varda committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
// Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
//    list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
//    this list of conditions and the following disclaimer in the documentation
//    and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

#include "serialize-snappy.h"
Kenton Varda's avatar
Kenton Varda committed
25
#include <kj/debug.h>
26
#include "layout.h"
Kenton Varda's avatar
Kenton Varda committed
27 28 29 30
#include <snappy/snappy.h>
#include <snappy/snappy-sinksource.h>
#include <vector>

31
namespace capnp {
Kenton Varda's avatar
Kenton Varda committed
32

33
class SnappyInputStream::InputStreamSnappySource: public snappy::Source {
Kenton Varda's avatar
Kenton Varda committed
34
public:
35 36
  inline InputStreamSnappySource(BufferedInputStream& inputStream)
      : inputStream(inputStream) {}
37
  inline ~InputStreamSnappySource() noexcept {}
Kenton Varda's avatar
Kenton Varda committed
38

39 40 41 42
  bool atEnd() {
    return inputStream.getReadBuffer().size() == 0;
  }

Kenton Varda's avatar
Kenton Varda committed
43 44 45
  // implements snappy::Source ---------------------------------------

  size_t Available() const override {
46
    KJ_FAIL_ASSERT("Snappy doesn't actually call this.");
47
    return 0;
Kenton Varda's avatar
Kenton Varda committed
48 49 50
  }

  const char* Peek(size_t* len) override {
51
    kj::ArrayPtr<const byte> buffer = inputStream.getReadBuffer();
52 53
    *len = buffer.size();
    return reinterpret_cast<const char*>(buffer.begin());
Kenton Varda's avatar
Kenton Varda committed
54 55 56
  }

  void Skip(size_t n) override {
57
    inputStream.skip(n);
Kenton Varda's avatar
Kenton Varda committed
58 59 60
  }

private:
61
  BufferedInputStream& inputStream;
Kenton Varda's avatar
Kenton Varda committed
62 63
};

64
SnappyInputStream::SnappyInputStream(BufferedInputStream& inner, kj::ArrayPtr<byte> buffer)
65 66
    : inner(inner) {
  if (buffer.size() < SNAPPY_BUFFER_SIZE) {
Kenton Varda's avatar
Kenton Varda committed
67
    ownedBuffer = kj::heapArray<byte>(SNAPPY_BUFFER_SIZE);
68
    buffer = ownedBuffer;
Kenton Varda's avatar
Kenton Varda committed
69
  }
70
  this->buffer = buffer;
Kenton Varda's avatar
Kenton Varda committed
71 72
}

73
SnappyInputStream::~SnappyInputStream() noexcept(false) {}
Kenton Varda's avatar
Kenton Varda committed
74

75
kj::ArrayPtr<const byte> SnappyInputStream::tryGetReadBuffer() {
76 77
  if (bufferAvailable.size() == 0) {
    refill();
Kenton Varda's avatar
Kenton Varda committed
78 79
  }

80 81
  return bufferAvailable;
}
Kenton Varda's avatar
Kenton Varda committed
82

83 84
size_t SnappyInputStream::tryRead(void* dst, size_t minBytes, size_t maxBytes) {
  size_t total = 0;
85 86
  while (minBytes > bufferAvailable.size()) {
    memcpy(dst, bufferAvailable.begin(), bufferAvailable.size());
Kenton Varda's avatar
Kenton Varda committed
87

88
    dst = reinterpret_cast<byte*>(dst) + bufferAvailable.size();
89
    total += bufferAvailable.size();
90 91
    minBytes -= bufferAvailable.size();
    maxBytes -= bufferAvailable.size();
Kenton Varda's avatar
Kenton Varda committed
92

93 94 95
    if (!refill()) {
      return total;
    }
Kenton Varda's avatar
Kenton Varda committed
96 97
  }

98 99 100 101
  // Serve from current buffer.
  size_t n = std::min(bufferAvailable.size(), maxBytes);
  memcpy(dst, bufferAvailable.begin(), n);
  bufferAvailable = bufferAvailable.slice(n, bufferAvailable.size());
102
  return total + n;
103
}
Kenton Varda's avatar
Kenton Varda committed
104

105 106 107
void SnappyInputStream::skip(size_t bytes) {
  while (bytes > bufferAvailable.size()) {
    bytes -= bufferAvailable.size();
108
    KJ_REQUIRE(refill(), "Premature EOF");
Kenton Varda's avatar
Kenton Varda committed
109
  }
110 111
  bufferAvailable = bufferAvailable.slice(bytes, bufferAvailable.size());
}
Kenton Varda's avatar
Kenton Varda committed
112

113
bool SnappyInputStream::refill() {
114 115
  uint32_t length = 0;
  InputStreamSnappySource snappySource(inner);
116 117 118 119 120

  if (snappySource.atEnd()) {
    return false;
  }

121
  KJ_REQUIRE(
122 123
      snappy::RawUncompress(
          &snappySource, reinterpret_cast<char*>(buffer.begin()), buffer.size(), &length),
124
      "Snappy decompression failed.") {
125
    return false;
126
  }
Kenton Varda's avatar
Kenton Varda committed
127

128
  bufferAvailable = buffer.slice(0, length);
129
  return true;
130
}
Kenton Varda's avatar
Kenton Varda committed
131

132
// =======================================================================================
Kenton Varda's avatar
Kenton Varda committed
133

134
SnappyOutputStream::SnappyOutputStream(
135
    OutputStream& inner, kj::ArrayPtr<byte> buffer, kj::ArrayPtr<byte> compressedBuffer)
136
    : inner(inner) {
137
  KJ_DASSERT(SNAPPY_COMPRESSED_BUFFER_SIZE >= snappy::MaxCompressedLength(snappy::kBlockSize),
138
      "snappy::MaxCompressedLength() changed?");
Kenton Varda's avatar
Kenton Varda committed
139

140
  if (buffer.size() < SNAPPY_BUFFER_SIZE) {
Kenton Varda's avatar
Kenton Varda committed
141
    ownedBuffer = kj::heapArray<byte>(SNAPPY_BUFFER_SIZE);
142 143 144 145
    buffer = ownedBuffer;
  }
  this->buffer = buffer;
  bufferPos = buffer.begin();
Kenton Varda's avatar
Kenton Varda committed
146

147
  if (compressedBuffer.size() < SNAPPY_COMPRESSED_BUFFER_SIZE) {
Kenton Varda's avatar
Kenton Varda committed
148
    ownedCompressedBuffer = kj::heapArray<byte>(SNAPPY_COMPRESSED_BUFFER_SIZE);
149
    compressedBuffer = ownedCompressedBuffer;
Kenton Varda's avatar
Kenton Varda committed
150
  }
151 152
  this->compressedBuffer = compressedBuffer;
}
Kenton Varda's avatar
Kenton Varda committed
153

154
SnappyOutputStream::~SnappyOutputStream() noexcept(false) {
155
  if (bufferPos > buffer.begin()) {
156
    unwindDetector.catchExceptionsIfUnwinding([&]() {
157
      flush();
158
    });
Kenton Varda's avatar
Kenton Varda committed
159
  }
160
}
Kenton Varda's avatar
Kenton Varda committed
161

162 163 164 165 166
void SnappyOutputStream::flush() {
  if (bufferPos > buffer.begin()) {
    snappy::ByteArraySource source(
        reinterpret_cast<char*>(buffer.begin()), bufferPos - buffer.begin());
    snappy::UncheckedByteArraySink sink(reinterpret_cast<char*>(compressedBuffer.begin()));
Kenton Varda's avatar
Kenton Varda committed
167

168
    size_t n = snappy::Compress(&source, &sink);
169
    KJ_ASSERT(n <= compressedBuffer.size(),
170 171
        "Critical security bug:  Snappy compression overran its output buffer.");
    inner.write(compressedBuffer.begin(), n);
Kenton Varda's avatar
Kenton Varda committed
172

173
    bufferPos = buffer.begin();
Kenton Varda's avatar
Kenton Varda committed
174
  }
175
}
Kenton Varda's avatar
Kenton Varda committed
176

177 178
kj::ArrayPtr<byte> SnappyOutputStream::getWriteBuffer() {
  return kj::arrayPtr(bufferPos, buffer.end());
179
}
Kenton Varda's avatar
Kenton Varda committed
180

181 182 183 184 185 186 187 188 189 190
void SnappyOutputStream::write(const void* src, size_t size) {
  if (src == bufferPos) {
    // Oh goody, the caller wrote directly into our buffer.
    bufferPos += size;
  } else {
    for (;;) {
      size_t available = buffer.end() - bufferPos;
      if (size < available) break;
      memcpy(bufferPos, src, available);
      size -= available;
191
      src = reinterpret_cast<const byte*>(src) + available;
192 193 194
      bufferPos = buffer.end();
      flush();
    }
Kenton Varda's avatar
Kenton Varda committed
195

196 197
    memcpy(bufferPos, src, size);
    bufferPos += size;
Kenton Varda's avatar
Kenton Varda committed
198
  }
199
}
Kenton Varda's avatar
Kenton Varda committed
200

201
// =======================================================================================
Kenton Varda's avatar
Kenton Varda committed
202

203 204
SnappyPackedMessageReader::SnappyPackedMessageReader(
    BufferedInputStream& inputStream, ReaderOptions options,
205
    kj::ArrayPtr<word> scratchSpace, kj::ArrayPtr<byte> buffer)
206 207
    : SnappyInputStream(inputStream, buffer),
      PackedMessageReader(static_cast<SnappyInputStream&>(*this), options, scratchSpace) {}
Kenton Varda's avatar
Kenton Varda committed
208

209
SnappyPackedMessageReader::~SnappyPackedMessageReader() noexcept(false) {}
Kenton Varda's avatar
Kenton Varda committed
210

211
void writeSnappyPackedMessage(kj::OutputStream& output,
212 213
                              kj::ArrayPtr<const kj::ArrayPtr<const word>> segments,
                              kj::ArrayPtr<byte> buffer, kj::ArrayPtr<byte> compressedBuffer) {
214 215
  SnappyOutputStream snappyOut(output, buffer, compressedBuffer);
  writePackedMessage(snappyOut, segments);
Kenton Varda's avatar
Kenton Varda committed
216 217
}

218
}  // namespace capnp