io.c++ 10.7 KB
Newer Older
Kenton Varda's avatar
Kenton Varda committed
1 2
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
3
//
Kenton Varda's avatar
Kenton Varda committed
4 5 6 7 8 9
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
10
//
Kenton Varda's avatar
Kenton Varda committed
11 12
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
13
//
Kenton Varda's avatar
Kenton Varda committed
14 15 16 17 18 19 20
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
21 22

#include "io.h"
Kenton Varda's avatar
Kenton Varda committed
23
#include "debug.h"
24
#include "miniposix.h"
Kenton Varda's avatar
Kenton Varda committed
25
#include <algorithm>
26
#include <errno.h>
27

28 29 30 31
#if !_WIN32
#include <sys/uio.h>
#endif

32
namespace kj {
33

34 35 36 37
InputStream::~InputStream() noexcept(false) {}
OutputStream::~OutputStream() noexcept(false) {}
BufferedInputStream::~BufferedInputStream() noexcept(false) {}
BufferedOutputStream::~BufferedOutputStream() noexcept(false) {}
38

39 40 41 42 43 44 45 46 47 48
size_t InputStream::read(void* buffer, size_t minBytes, size_t maxBytes) {
  size_t n = tryRead(buffer, minBytes, maxBytes);
  KJ_REQUIRE(n >= minBytes, "Premature EOF") {
    // Pretend we read zeros from the input.
    memset(reinterpret_cast<byte*>(buffer) + n, 0, minBytes - n);
    return minBytes;
  }
  return n;
}

49 50 51 52 53 54 55 56 57
void InputStream::skip(size_t bytes) {
  char scratch[8192];
  while (bytes > 0) {
    size_t amount = std::min(bytes, sizeof(scratch));
    read(scratch, amount);
    bytes -= amount;
  }
}

58
void OutputStream::write(ArrayPtr<const ArrayPtr<const byte>> pieces) {
59 60 61 62 63
  for (auto piece: pieces) {
    write(piece.begin(), piece.size());
  }
}

64 65 66 67 68 69
ArrayPtr<const byte> BufferedInputStream::getReadBuffer() {
  auto result = tryGetReadBuffer();
  KJ_REQUIRE(result.size() > 0, "Premature EOF");
  return result;
}

70 71
// =======================================================================================

72
BufferedInputStreamWrapper::BufferedInputStreamWrapper(InputStream& inner, ArrayPtr<byte> buffer)
Kenton Varda's avatar
Kenton Varda committed
73
    : inner(inner), ownedBuffer(buffer == nullptr ? heapArray<byte>(8192) : nullptr),
74 75
      buffer(buffer == nullptr ? ownedBuffer : buffer) {}

76
BufferedInputStreamWrapper::~BufferedInputStreamWrapper() noexcept(false) {}
77

78
ArrayPtr<const byte> BufferedInputStreamWrapper::tryGetReadBuffer() {
79
  if (bufferAvailable.size() == 0) {
80
    size_t n = inner.tryRead(buffer.begin(), 1, buffer.size());
81 82 83 84 85 86
    bufferAvailable = buffer.slice(0, n);
  }

  return bufferAvailable;
}

87
size_t BufferedInputStreamWrapper::tryRead(void* dst, size_t minBytes, size_t maxBytes) {
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
  if (minBytes <= bufferAvailable.size()) {
    // Serve from current buffer.
    size_t n = std::min(bufferAvailable.size(), maxBytes);
    memcpy(dst, bufferAvailable.begin(), n);
    bufferAvailable = bufferAvailable.slice(n, bufferAvailable.size());
    return n;
  } else {
    // Copy current available into destination.
    memcpy(dst, bufferAvailable.begin(), bufferAvailable.size());
    size_t fromFirstBuffer = bufferAvailable.size();

    dst = reinterpret_cast<byte*>(dst) + fromFirstBuffer;
    minBytes -= fromFirstBuffer;
    maxBytes -= fromFirstBuffer;

    if (maxBytes <= buffer.size()) {
      // Read the next buffer-full.
      size_t n = inner.read(buffer.begin(), minBytes, buffer.size());
      size_t fromSecondBuffer = std::min(n, maxBytes);
      memcpy(dst, buffer.begin(), fromSecondBuffer);
      bufferAvailable = buffer.slice(fromSecondBuffer, n);
      return fromFirstBuffer + fromSecondBuffer;
    } else {
      // Forward large read to the underlying stream.
      bufferAvailable = nullptr;
      return fromFirstBuffer + inner.read(dst, minBytes, maxBytes);
    }
  }
}

void BufferedInputStreamWrapper::skip(size_t bytes) {
  if (bytes <= bufferAvailable.size()) {
    bufferAvailable = bufferAvailable.slice(bytes, bufferAvailable.size());
  } else {
    bytes -= bufferAvailable.size();
    if (bytes <= buffer.size()) {
      // Read the next buffer-full.
      size_t n = inner.read(buffer.begin(), bytes, buffer.size());
      bufferAvailable = buffer.slice(bytes, n);
    } else {
      // Forward large skip to the underlying stream.
      bufferAvailable = nullptr;
130
      inner.skip(bytes);
131 132 133 134 135 136
    }
  }
}

// -------------------------------------------------------------------

137
BufferedOutputStreamWrapper::BufferedOutputStreamWrapper(OutputStream& inner, ArrayPtr<byte> buffer)
138
    : inner(inner),
Kenton Varda's avatar
Kenton Varda committed
139
      ownedBuffer(buffer == nullptr ? heapArray<byte>(8192) : nullptr),
140 141 142
      buffer(buffer == nullptr ? ownedBuffer : buffer),
      bufferPos(this->buffer.begin()) {}

143 144 145 146
BufferedOutputStreamWrapper::~BufferedOutputStreamWrapper() noexcept(false) {
  unwindDetector.catchExceptionsIfUnwinding([&]() {
    flush();
  });
147 148 149 150 151 152 153 154 155
}

void BufferedOutputStreamWrapper::flush() {
  if (bufferPos > buffer.begin()) {
    inner.write(buffer.begin(), bufferPos - buffer.begin());
    bufferPos = buffer.begin();
  }
}

156 157
ArrayPtr<byte> BufferedOutputStreamWrapper::getWriteBuffer() {
  return arrayPtr(bufferPos, buffer.end());
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
}

void BufferedOutputStreamWrapper::write(const void* src, size_t size) {
  if (src == bufferPos) {
    // Oh goody, the caller wrote directly into our buffer.
    bufferPos += size;
  } else {
    size_t available = buffer.end() - bufferPos;

    if (size <= available) {
      memcpy(bufferPos, src, size);
      bufferPos += size;
    } else if (size <= buffer.size()) {
      // Too much for this buffer, but not a full buffer's worth, so we'll go ahead and copy.
      memcpy(bufferPos, src, available);
      inner.write(buffer.begin(), buffer.size());

      size -= available;
      src = reinterpret_cast<const byte*>(src) + available;

      memcpy(buffer.begin(), src, size);
      bufferPos = buffer.begin() + size;
    } else {
      // Writing so much data that we might as well write directly to avoid a copy.
      inner.write(buffer.begin(), bufferPos - buffer.begin());
      bufferPos = buffer.begin();
      inner.write(src, size);
    }
  }
}

// =======================================================================================

191
ArrayInputStream::ArrayInputStream(ArrayPtr<const byte> array): array(array) {}
192
ArrayInputStream::~ArrayInputStream() noexcept(false) {}
193

194
ArrayPtr<const byte> ArrayInputStream::tryGetReadBuffer() {
195 196 197
  return array;
}

198
size_t ArrayInputStream::tryRead(void* dst, size_t minBytes, size_t maxBytes) {
199 200 201
  size_t n = std::min(maxBytes, array.size());
  memcpy(dst, array.begin(), n);
  array = array.slice(n, array.size());
202
  return n;
203 204 205
}

void ArrayInputStream::skip(size_t bytes) {
206
  KJ_REQUIRE(array.size() >= bytes, "ArrayInputStream ended prematurely.") {
207
    bytes = array.size();
208
    break;
209 210 211 212 213 214
  }
  array = array.slice(bytes, array.size());
}

// -------------------------------------------------------------------

215
ArrayOutputStream::ArrayOutputStream(ArrayPtr<byte> array): array(array), fillPos(array.begin()) {}
216
ArrayOutputStream::~ArrayOutputStream() noexcept(false) {}
217

218 219
ArrayPtr<byte> ArrayOutputStream::getWriteBuffer() {
  return arrayPtr(fillPos, array.end());
220 221 222 223 224 225 226
}

void ArrayOutputStream::write(const void* src, size_t size) {
  if (src == fillPos) {
    // Oh goody, the caller wrote directly into our buffer.
    fillPos += size;
  } else {
227
    KJ_REQUIRE(size <= (size_t)(array.end() - fillPos),
228
            "ArrayOutputStream's backing array was not large enough for the data written.");
229 230 231 232 233 234 235
    memcpy(fillPos, src, size);
    fillPos += size;
  }
}

// =======================================================================================

236
AutoCloseFd::~AutoCloseFd() noexcept(false) {
237 238 239
  if (fd >= 0) {
    unwindDetector.catchExceptionsIfUnwinding([&]() {
      // Don't use SYSCALL() here because close() should not be repeated on EINTR.
240
      if (miniposix::close(fd) < 0) {
241 242 243
        KJ_FAIL_SYSCALL("close", errno, fd) {
          break;
        }
244
      }
245 246
    });
  }
247 248
}

249
FdInputStream::~FdInputStream() noexcept(false) {}
250

251
size_t FdInputStream::tryRead(void* buffer, size_t minBytes, size_t maxBytes) {
252 253 254 255 256
  byte* pos = reinterpret_cast<byte*>(buffer);
  byte* min = pos + minBytes;
  byte* max = pos + maxBytes;

  while (pos < min) {
257 258
    miniposix::ssize_t n;
    KJ_SYSCALL(n = miniposix::read(fd, pos, max - pos), fd);
259 260
    if (n == 0) {
      break;
261 262 263 264 265 266 267
    }
    pos += n;
  }

  return pos - reinterpret_cast<byte*>(buffer);
}

268
FdOutputStream::~FdOutputStream() noexcept(false) {}
269 270 271 272 273

void FdOutputStream::write(const void* buffer, size_t size) {
  const char* pos = reinterpret_cast<const char*>(buffer);

  while (size > 0) {
274 275
    miniposix::ssize_t n;
    KJ_SYSCALL(n = miniposix::write(fd, pos, size), fd);
276
    KJ_ASSERT(n > 0, "write() returned zero.");
277 278 279 280 281
    pos += n;
    size -= n;
  }
}

282
void FdOutputStream::write(ArrayPtr<const ArrayPtr<const byte>> pieces) {
283 284 285 286 287 288 289 290 291
#if _WIN32
  // Windows has no reasonable writev(). It has WriteFileGather, but this call has the unreasonable
  // restriction that each segment must be page-aligned. So, fall back to write().

  for (auto piece: pieces) {
    write(piece.begin(), piece.size());
  }

#else
Tom Lee's avatar
Tom Lee committed
292
  const size_t iovmax = miniposix::iovMax(pieces.size());
293 294 295
  while (pieces.size() > iovmax) {
    write(pieces.slice(0, iovmax));
    pieces = pieces.slice(iovmax, pieces.size());
296 297
  }

298
  KJ_STACK_ARRAY(struct iovec, iov, pieces.size(), 16, 128);
299 300 301 302 303 304 305 306 307

  for (uint i = 0; i < pieces.size(); i++) {
    // writev() interface is not const-correct.  :(
    iov[i].iov_base = const_cast<byte*>(pieces[i].begin());
    iov[i].iov_len = pieces[i].size();
  }

  struct iovec* current = iov.begin();

308 309
  // Advance past any leading empty buffers so that a write full of only empty buffers does not
  // cause a syscall at all.
310 311 312 313 314
  while (current < iov.end() && current->iov_len == 0) {
    ++current;
  }

  while (current < iov.end()) {
315
    // Issue the write.
316
    ssize_t n = 0;
317
    KJ_SYSCALL(n = ::writev(fd, current, iov.end() - current), fd);
318
    KJ_ASSERT(n > 0, "writev() returned zero.");
319

320 321
    // Advance past all buffers that were fully-written.
    while (current < iov.end() && static_cast<size_t>(n) >= current->iov_len) {
322 323 324 325
      n -= current->iov_len;
      ++current;
    }

326 327
    // If we only partially-wrote one of the buffers, adjust the pointer and size to include only
    // the unwritten part.
328 329 330 331 332
    if (n > 0) {
      current->iov_base = reinterpret_cast<byte*>(current->iov_base) + n;
      current->iov_len -= n;
    }
  }
333
#endif
334 335
}

336
}  // namespace kj