io.c++ 13.1 KB
Newer Older
Kenton Varda's avatar
Kenton Varda committed
1 2
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
3
//
Kenton Varda's avatar
Kenton Varda committed
4 5 6 7 8 9
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
10
//
Kenton Varda's avatar
Kenton Varda committed
11 12
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
13
//
Kenton Varda's avatar
Kenton Varda committed
14 15 16 17 18 19 20
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
21 22

#include "io.h"
Kenton Varda's avatar
Kenton Varda committed
23
#include "debug.h"
24
#include "miniposix.h"
Kenton Varda's avatar
Kenton Varda committed
25
#include <algorithm>
26
#include <errno.h>
27

28 29 30 31 32 33 34 35
#if _WIN32
#ifndef NOMINMAX
#define NOMINMAX 1
#endif
#define WIN32_LEAN_AND_MEAN
#include <windows.h>
#include "windows-sanity.h"
#else
36 37 38
#include <sys/uio.h>
#endif

39
namespace kj {
40

41 42 43 44
InputStream::~InputStream() noexcept(false) {}
OutputStream::~OutputStream() noexcept(false) {}
BufferedInputStream::~BufferedInputStream() noexcept(false) {}
BufferedOutputStream::~BufferedOutputStream() noexcept(false) {}
45

46 47 48 49 50 51 52 53 54 55
size_t InputStream::read(void* buffer, size_t minBytes, size_t maxBytes) {
  size_t n = tryRead(buffer, minBytes, maxBytes);
  KJ_REQUIRE(n >= minBytes, "Premature EOF") {
    // Pretend we read zeros from the input.
    memset(reinterpret_cast<byte*>(buffer) + n, 0, minBytes - n);
    return minBytes;
  }
  return n;
}

56 57 58 59 60 61 62 63 64
void InputStream::skip(size_t bytes) {
  char scratch[8192];
  while (bytes > 0) {
    size_t amount = std::min(bytes, sizeof(scratch));
    read(scratch, amount);
    bytes -= amount;
  }
}

65
void OutputStream::write(ArrayPtr<const ArrayPtr<const byte>> pieces) {
66 67 68 69 70
  for (auto piece: pieces) {
    write(piece.begin(), piece.size());
  }
}

71 72 73 74 75 76
ArrayPtr<const byte> BufferedInputStream::getReadBuffer() {
  auto result = tryGetReadBuffer();
  KJ_REQUIRE(result.size() > 0, "Premature EOF");
  return result;
}

77 78
// =======================================================================================

79
BufferedInputStreamWrapper::BufferedInputStreamWrapper(InputStream& inner, ArrayPtr<byte> buffer)
Kenton Varda's avatar
Kenton Varda committed
80
    : inner(inner), ownedBuffer(buffer == nullptr ? heapArray<byte>(8192) : nullptr),
81 82
      buffer(buffer == nullptr ? ownedBuffer : buffer) {}

83
BufferedInputStreamWrapper::~BufferedInputStreamWrapper() noexcept(false) {}
84

85
ArrayPtr<const byte> BufferedInputStreamWrapper::tryGetReadBuffer() {
86
  if (bufferAvailable.size() == 0) {
87
    size_t n = inner.tryRead(buffer.begin(), 1, buffer.size());
88 89 90 91 92 93
    bufferAvailable = buffer.slice(0, n);
  }

  return bufferAvailable;
}

94
size_t BufferedInputStreamWrapper::tryRead(void* dst, size_t minBytes, size_t maxBytes) {
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
  if (minBytes <= bufferAvailable.size()) {
    // Serve from current buffer.
    size_t n = std::min(bufferAvailable.size(), maxBytes);
    memcpy(dst, bufferAvailable.begin(), n);
    bufferAvailable = bufferAvailable.slice(n, bufferAvailable.size());
    return n;
  } else {
    // Copy current available into destination.
    memcpy(dst, bufferAvailable.begin(), bufferAvailable.size());
    size_t fromFirstBuffer = bufferAvailable.size();

    dst = reinterpret_cast<byte*>(dst) + fromFirstBuffer;
    minBytes -= fromFirstBuffer;
    maxBytes -= fromFirstBuffer;

    if (maxBytes <= buffer.size()) {
      // Read the next buffer-full.
      size_t n = inner.read(buffer.begin(), minBytes, buffer.size());
      size_t fromSecondBuffer = std::min(n, maxBytes);
      memcpy(dst, buffer.begin(), fromSecondBuffer);
      bufferAvailable = buffer.slice(fromSecondBuffer, n);
      return fromFirstBuffer + fromSecondBuffer;
    } else {
      // Forward large read to the underlying stream.
      bufferAvailable = nullptr;
      return fromFirstBuffer + inner.read(dst, minBytes, maxBytes);
    }
  }
}

void BufferedInputStreamWrapper::skip(size_t bytes) {
  if (bytes <= bufferAvailable.size()) {
    bufferAvailable = bufferAvailable.slice(bytes, bufferAvailable.size());
  } else {
    bytes -= bufferAvailable.size();
    if (bytes <= buffer.size()) {
      // Read the next buffer-full.
      size_t n = inner.read(buffer.begin(), bytes, buffer.size());
      bufferAvailable = buffer.slice(bytes, n);
    } else {
      // Forward large skip to the underlying stream.
      bufferAvailable = nullptr;
137
      inner.skip(bytes);
138 139 140 141 142 143
    }
  }
}

// -------------------------------------------------------------------

144
BufferedOutputStreamWrapper::BufferedOutputStreamWrapper(OutputStream& inner, ArrayPtr<byte> buffer)
145
    : inner(inner),
Kenton Varda's avatar
Kenton Varda committed
146
      ownedBuffer(buffer == nullptr ? heapArray<byte>(8192) : nullptr),
147 148 149
      buffer(buffer == nullptr ? ownedBuffer : buffer),
      bufferPos(this->buffer.begin()) {}

150 151 152 153
BufferedOutputStreamWrapper::~BufferedOutputStreamWrapper() noexcept(false) {
  unwindDetector.catchExceptionsIfUnwinding([&]() {
    flush();
  });
154 155 156 157 158 159 160 161 162
}

void BufferedOutputStreamWrapper::flush() {
  if (bufferPos > buffer.begin()) {
    inner.write(buffer.begin(), bufferPos - buffer.begin());
    bufferPos = buffer.begin();
  }
}

163 164
ArrayPtr<byte> BufferedOutputStreamWrapper::getWriteBuffer() {
  return arrayPtr(bufferPos, buffer.end());
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
}

void BufferedOutputStreamWrapper::write(const void* src, size_t size) {
  if (src == bufferPos) {
    // Oh goody, the caller wrote directly into our buffer.
    bufferPos += size;
  } else {
    size_t available = buffer.end() - bufferPos;

    if (size <= available) {
      memcpy(bufferPos, src, size);
      bufferPos += size;
    } else if (size <= buffer.size()) {
      // Too much for this buffer, but not a full buffer's worth, so we'll go ahead and copy.
      memcpy(bufferPos, src, available);
      inner.write(buffer.begin(), buffer.size());

      size -= available;
      src = reinterpret_cast<const byte*>(src) + available;

      memcpy(buffer.begin(), src, size);
      bufferPos = buffer.begin() + size;
    } else {
      // Writing so much data that we might as well write directly to avoid a copy.
      inner.write(buffer.begin(), bufferPos - buffer.begin());
      bufferPos = buffer.begin();
      inner.write(src, size);
    }
  }
}

// =======================================================================================

198
ArrayInputStream::ArrayInputStream(ArrayPtr<const byte> array): array(array) {}
199
ArrayInputStream::~ArrayInputStream() noexcept(false) {}
200

201
ArrayPtr<const byte> ArrayInputStream::tryGetReadBuffer() {
202 203 204
  return array;
}

205
size_t ArrayInputStream::tryRead(void* dst, size_t minBytes, size_t maxBytes) {
206 207 208
  size_t n = std::min(maxBytes, array.size());
  memcpy(dst, array.begin(), n);
  array = array.slice(n, array.size());
209
  return n;
210 211 212
}

void ArrayInputStream::skip(size_t bytes) {
213
  KJ_REQUIRE(array.size() >= bytes, "ArrayInputStream ended prematurely.") {
214
    bytes = array.size();
215
    break;
216 217 218 219 220 221
  }
  array = array.slice(bytes, array.size());
}

// -------------------------------------------------------------------

222
ArrayOutputStream::ArrayOutputStream(ArrayPtr<byte> array): array(array), fillPos(array.begin()) {}
223
ArrayOutputStream::~ArrayOutputStream() noexcept(false) {}
224

225 226
ArrayPtr<byte> ArrayOutputStream::getWriteBuffer() {
  return arrayPtr(fillPos, array.end());
227 228 229 230 231
}

void ArrayOutputStream::write(const void* src, size_t size) {
  if (src == fillPos) {
    // Oh goody, the caller wrote directly into our buffer.
232
    KJ_REQUIRE(size <= array.end() - fillPos);
233 234
    fillPos += size;
  } else {
235
    KJ_REQUIRE(size <= (size_t)(array.end() - fillPos),
236
            "ArrayOutputStream's backing array was not large enough for the data written.");
237 238 239 240 241
    memcpy(fillPos, src, size);
    fillPos += size;
  }
}

242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280
// -------------------------------------------------------------------

VectorOutputStream::VectorOutputStream(size_t initialCapacity)
    : vector(heapArray<byte>(initialCapacity)), fillPos(vector.begin()) {}
VectorOutputStream::~VectorOutputStream() noexcept(false) {}

ArrayPtr<byte> VectorOutputStream::getWriteBuffer() {
  // Grow if needed.
  if (fillPos == vector.end()) {
    grow(vector.size() + 1);
  }

  return arrayPtr(fillPos, vector.end());
}

void VectorOutputStream::write(const void* src, size_t size) {
  if (src == fillPos) {
    // Oh goody, the caller wrote directly into our buffer.
    KJ_REQUIRE(size <= vector.end() - fillPos);
    fillPos += size;
  } else {
    if (vector.end() - fillPos < size) {
      grow(fillPos - vector.begin() + size);
    }

    memcpy(fillPos, src, size);
    fillPos += size;
  }
}

void VectorOutputStream::grow(size_t minSize) {
  size_t newSize = vector.size() * 2;
  while (newSize < minSize) newSize *= 2;
  auto newVector = heapArray<byte>(newSize);
  memcpy(newVector.begin(), vector.begin(), fillPos - vector.begin());
  fillPos = fillPos - vector.begin() + newVector.begin();
  vector = kj::mv(newVector);
}

281 282
// =======================================================================================

283
AutoCloseFd::~AutoCloseFd() noexcept(false) {
284 285 286
  if (fd >= 0) {
    unwindDetector.catchExceptionsIfUnwinding([&]() {
      // Don't use SYSCALL() here because close() should not be repeated on EINTR.
287
      if (miniposix::close(fd) < 0) {
288 289 290
        KJ_FAIL_SYSCALL("close", errno, fd) {
          break;
        }
291
      }
292 293
    });
  }
294 295
}

296
FdInputStream::~FdInputStream() noexcept(false) {}
297

298
size_t FdInputStream::tryRead(void* buffer, size_t minBytes, size_t maxBytes) {
299 300 301 302 303
  byte* pos = reinterpret_cast<byte*>(buffer);
  byte* min = pos + minBytes;
  byte* max = pos + maxBytes;

  while (pos < min) {
304 305
    miniposix::ssize_t n;
    KJ_SYSCALL(n = miniposix::read(fd, pos, max - pos), fd);
306 307
    if (n == 0) {
      break;
308 309 310 311 312 313 314
    }
    pos += n;
  }

  return pos - reinterpret_cast<byte*>(buffer);
}

315
FdOutputStream::~FdOutputStream() noexcept(false) {}
316 317 318 319 320

void FdOutputStream::write(const void* buffer, size_t size) {
  const char* pos = reinterpret_cast<const char*>(buffer);

  while (size > 0) {
321 322
    miniposix::ssize_t n;
    KJ_SYSCALL(n = miniposix::write(fd, pos, size), fd);
323
    KJ_ASSERT(n > 0, "write() returned zero.");
324 325 326 327 328
    pos += n;
    size -= n;
  }
}

329
void FdOutputStream::write(ArrayPtr<const ArrayPtr<const byte>> pieces) {
330 331
#if _WIN32
  // Windows has no reasonable writev(). It has WriteFileGather, but this call has the unreasonable
332
  // restriction that each segment must be page-aligned. So, fall back to the default implementation
333

334
  OutputStream::write(pieces);
335 336

#else
Tom Lee's avatar
Tom Lee committed
337
  const size_t iovmax = miniposix::iovMax(pieces.size());
338 339 340
  while (pieces.size() > iovmax) {
    write(pieces.slice(0, iovmax));
    pieces = pieces.slice(iovmax, pieces.size());
341 342
  }

343
  KJ_STACK_ARRAY(struct iovec, iov, pieces.size(), 16, 128);
344 345 346 347 348 349 350 351 352

  for (uint i = 0; i < pieces.size(); i++) {
    // writev() interface is not const-correct.  :(
    iov[i].iov_base = const_cast<byte*>(pieces[i].begin());
    iov[i].iov_len = pieces[i].size();
  }

  struct iovec* current = iov.begin();

353 354
  // Advance past any leading empty buffers so that a write full of only empty buffers does not
  // cause a syscall at all.
355 356 357 358 359
  while (current < iov.end() && current->iov_len == 0) {
    ++current;
  }

  while (current < iov.end()) {
360
    // Issue the write.
361
    ssize_t n = 0;
362
    KJ_SYSCALL(n = ::writev(fd, current, iov.end() - current), fd);
363
    KJ_ASSERT(n > 0, "writev() returned zero.");
364

365 366
    // Advance past all buffers that were fully-written.
    while (current < iov.end() && static_cast<size_t>(n) >= current->iov_len) {
367 368 369 370
      n -= current->iov_len;
      ++current;
    }

371 372
    // If we only partially-wrote one of the buffers, adjust the pointer and size to include only
    // the unwritten part.
373 374 375 376 377
    if (n > 0) {
      current->iov_base = reinterpret_cast<byte*>(current->iov_base) + n;
      current->iov_len -= n;
    }
  }
378
#endif
379 380
}

381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425
// =======================================================================================

#if _WIN32

AutoCloseHandle::~AutoCloseHandle() noexcept(false) {
  if (handle != (void*)-1) {
    KJ_WIN32(CloseHandle(handle));
  }
}

HandleInputStream::~HandleInputStream() noexcept(false) {}

size_t HandleInputStream::tryRead(void* buffer, size_t minBytes, size_t maxBytes) {
  byte* pos = reinterpret_cast<byte*>(buffer);
  byte* min = pos + minBytes;
  byte* max = pos + maxBytes;

  while (pos < min) {
    DWORD n;
    KJ_WIN32(ReadFile(handle, pos, kj::min(max - pos, DWORD(kj::maxValue)), &n, nullptr));
    if (n == 0) {
      break;
    }
    pos += n;
  }

  return pos - reinterpret_cast<byte*>(buffer);
}

HandleOutputStream::~HandleOutputStream() noexcept(false) {}

void HandleOutputStream::write(const void* buffer, size_t size) {
  const char* pos = reinterpret_cast<const char*>(buffer);

  while (size > 0) {
    DWORD n;
    KJ_WIN32(WriteFile(handle, pos, kj::min(size, DWORD(kj::maxValue)), &n, nullptr));
    KJ_ASSERT(n > 0, "write() returned zero.");
    pos += n;
    size -= n;
  }
}

#endif  // _WIN32

426
}  // namespace kj