io.c++ 11.1 KB
Newer Older
Kenton Varda's avatar
Kenton Varda committed
1 2
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
3
//
Kenton Varda's avatar
Kenton Varda committed
4 5 6 7 8 9
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
10
//
Kenton Varda's avatar
Kenton Varda committed
11 12
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
13
//
Kenton Varda's avatar
Kenton Varda committed
14 15 16 17 18 19 20
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
21 22

#include "io.h"
Kenton Varda's avatar
Kenton Varda committed
23
#include "debug.h"
24
#include "miniposix.h"
Kenton Varda's avatar
Kenton Varda committed
25
#include <algorithm>
26
#include <errno.h>
27
#include <limits.h>
28

29 30 31 32
#if !_WIN32
#include <sys/uio.h>
#endif

33
namespace kj {
34

35 36 37 38
InputStream::~InputStream() noexcept(false) {}
OutputStream::~OutputStream() noexcept(false) {}
BufferedInputStream::~BufferedInputStream() noexcept(false) {}
BufferedOutputStream::~BufferedOutputStream() noexcept(false) {}
39

40 41 42 43 44 45 46 47 48 49
size_t InputStream::read(void* buffer, size_t minBytes, size_t maxBytes) {
  size_t n = tryRead(buffer, minBytes, maxBytes);
  KJ_REQUIRE(n >= minBytes, "Premature EOF") {
    // Pretend we read zeros from the input.
    memset(reinterpret_cast<byte*>(buffer) + n, 0, minBytes - n);
    return minBytes;
  }
  return n;
}

50 51 52 53 54 55 56 57 58
void InputStream::skip(size_t bytes) {
  char scratch[8192];
  while (bytes > 0) {
    size_t amount = std::min(bytes, sizeof(scratch));
    read(scratch, amount);
    bytes -= amount;
  }
}

59
void OutputStream::write(ArrayPtr<const ArrayPtr<const byte>> pieces) {
60 61 62 63 64
  for (auto piece: pieces) {
    write(piece.begin(), piece.size());
  }
}

65 66 67 68 69 70
ArrayPtr<const byte> BufferedInputStream::getReadBuffer() {
  auto result = tryGetReadBuffer();
  KJ_REQUIRE(result.size() > 0, "Premature EOF");
  return result;
}

71 72
// =======================================================================================

73
BufferedInputStreamWrapper::BufferedInputStreamWrapper(InputStream& inner, ArrayPtr<byte> buffer)
Kenton Varda's avatar
Kenton Varda committed
74
    : inner(inner), ownedBuffer(buffer == nullptr ? heapArray<byte>(8192) : nullptr),
75 76
      buffer(buffer == nullptr ? ownedBuffer : buffer) {}

77
BufferedInputStreamWrapper::~BufferedInputStreamWrapper() noexcept(false) {}
78

79
ArrayPtr<const byte> BufferedInputStreamWrapper::tryGetReadBuffer() {
80
  if (bufferAvailable.size() == 0) {
81
    size_t n = inner.tryRead(buffer.begin(), 1, buffer.size());
82 83 84 85 86 87
    bufferAvailable = buffer.slice(0, n);
  }

  return bufferAvailable;
}

88
size_t BufferedInputStreamWrapper::tryRead(void* dst, size_t minBytes, size_t maxBytes) {
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
  if (minBytes <= bufferAvailable.size()) {
    // Serve from current buffer.
    size_t n = std::min(bufferAvailable.size(), maxBytes);
    memcpy(dst, bufferAvailable.begin(), n);
    bufferAvailable = bufferAvailable.slice(n, bufferAvailable.size());
    return n;
  } else {
    // Copy current available into destination.
    memcpy(dst, bufferAvailable.begin(), bufferAvailable.size());
    size_t fromFirstBuffer = bufferAvailable.size();

    dst = reinterpret_cast<byte*>(dst) + fromFirstBuffer;
    minBytes -= fromFirstBuffer;
    maxBytes -= fromFirstBuffer;

    if (maxBytes <= buffer.size()) {
      // Read the next buffer-full.
      size_t n = inner.read(buffer.begin(), minBytes, buffer.size());
      size_t fromSecondBuffer = std::min(n, maxBytes);
      memcpy(dst, buffer.begin(), fromSecondBuffer);
      bufferAvailable = buffer.slice(fromSecondBuffer, n);
      return fromFirstBuffer + fromSecondBuffer;
    } else {
      // Forward large read to the underlying stream.
      bufferAvailable = nullptr;
      return fromFirstBuffer + inner.read(dst, minBytes, maxBytes);
    }
  }
}

void BufferedInputStreamWrapper::skip(size_t bytes) {
  if (bytes <= bufferAvailable.size()) {
    bufferAvailable = bufferAvailable.slice(bytes, bufferAvailable.size());
  } else {
    bytes -= bufferAvailable.size();
    if (bytes <= buffer.size()) {
      // Read the next buffer-full.
      size_t n = inner.read(buffer.begin(), bytes, buffer.size());
      bufferAvailable = buffer.slice(bytes, n);
    } else {
      // Forward large skip to the underlying stream.
      bufferAvailable = nullptr;
      inner.skip(bytes - bufferAvailable.size());
    }
  }
}

// -------------------------------------------------------------------

138
BufferedOutputStreamWrapper::BufferedOutputStreamWrapper(OutputStream& inner, ArrayPtr<byte> buffer)
139
    : inner(inner),
Kenton Varda's avatar
Kenton Varda committed
140
      ownedBuffer(buffer == nullptr ? heapArray<byte>(8192) : nullptr),
141 142 143
      buffer(buffer == nullptr ? ownedBuffer : buffer),
      bufferPos(this->buffer.begin()) {}

144 145 146 147
BufferedOutputStreamWrapper::~BufferedOutputStreamWrapper() noexcept(false) {
  unwindDetector.catchExceptionsIfUnwinding([&]() {
    flush();
  });
148 149 150 151 152 153 154 155 156
}

void BufferedOutputStreamWrapper::flush() {
  if (bufferPos > buffer.begin()) {
    inner.write(buffer.begin(), bufferPos - buffer.begin());
    bufferPos = buffer.begin();
  }
}

157 158
ArrayPtr<byte> BufferedOutputStreamWrapper::getWriteBuffer() {
  return arrayPtr(bufferPos, buffer.end());
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
}

void BufferedOutputStreamWrapper::write(const void* src, size_t size) {
  if (src == bufferPos) {
    // Oh goody, the caller wrote directly into our buffer.
    bufferPos += size;
  } else {
    size_t available = buffer.end() - bufferPos;

    if (size <= available) {
      memcpy(bufferPos, src, size);
      bufferPos += size;
    } else if (size <= buffer.size()) {
      // Too much for this buffer, but not a full buffer's worth, so we'll go ahead and copy.
      memcpy(bufferPos, src, available);
      inner.write(buffer.begin(), buffer.size());

      size -= available;
      src = reinterpret_cast<const byte*>(src) + available;

      memcpy(buffer.begin(), src, size);
      bufferPos = buffer.begin() + size;
    } else {
      // Writing so much data that we might as well write directly to avoid a copy.
      inner.write(buffer.begin(), bufferPos - buffer.begin());
      bufferPos = buffer.begin();
      inner.write(src, size);
    }
  }
}

// =======================================================================================

192
ArrayInputStream::ArrayInputStream(ArrayPtr<const byte> array): array(array) {}
193
ArrayInputStream::~ArrayInputStream() noexcept(false) {}
194

195
ArrayPtr<const byte> ArrayInputStream::tryGetReadBuffer() {
196 197 198
  return array;
}

199
size_t ArrayInputStream::tryRead(void* dst, size_t minBytes, size_t maxBytes) {
200 201 202
  size_t n = std::min(maxBytes, array.size());
  memcpy(dst, array.begin(), n);
  array = array.slice(n, array.size());
203
  return n;
204 205 206
}

void ArrayInputStream::skip(size_t bytes) {
207
  KJ_REQUIRE(array.size() >= bytes, "ArrayInputStream ended prematurely.") {
208
    bytes = array.size();
209
    break;
210 211 212 213 214 215
  }
  array = array.slice(bytes, array.size());
}

// -------------------------------------------------------------------

216
ArrayOutputStream::ArrayOutputStream(ArrayPtr<byte> array): array(array), fillPos(array.begin()) {}
217
ArrayOutputStream::~ArrayOutputStream() noexcept(false) {}
218

219 220
ArrayPtr<byte> ArrayOutputStream::getWriteBuffer() {
  return arrayPtr(fillPos, array.end());
221 222 223 224 225 226 227
}

void ArrayOutputStream::write(const void* src, size_t size) {
  if (src == fillPos) {
    // Oh goody, the caller wrote directly into our buffer.
    fillPos += size;
  } else {
228
    KJ_REQUIRE(size <= (size_t)(array.end() - fillPos),
229
            "ArrayOutputStream's backing array was not large enough for the data written.");
230 231 232 233 234 235 236
    memcpy(fillPos, src, size);
    fillPos += size;
  }
}

// =======================================================================================

237
AutoCloseFd::~AutoCloseFd() noexcept(false) {
238 239 240
  if (fd >= 0) {
    unwindDetector.catchExceptionsIfUnwinding([&]() {
      // Don't use SYSCALL() here because close() should not be repeated on EINTR.
241
      if (miniposix::close(fd) < 0) {
242 243 244
        KJ_FAIL_SYSCALL("close", errno, fd) {
          break;
        }
245
      }
246 247
    });
  }
248 249
}

250
FdInputStream::~FdInputStream() noexcept(false) {}
251

252
size_t FdInputStream::tryRead(void* buffer, size_t minBytes, size_t maxBytes) {
253 254 255 256 257
  byte* pos = reinterpret_cast<byte*>(buffer);
  byte* min = pos + minBytes;
  byte* max = pos + maxBytes;

  while (pos < min) {
258 259
    miniposix::ssize_t n;
    KJ_SYSCALL(n = miniposix::read(fd, pos, max - pos), fd);
260 261
    if (n == 0) {
      break;
262 263 264 265 266 267 268
    }
    pos += n;
  }

  return pos - reinterpret_cast<byte*>(buffer);
}

269
FdOutputStream::~FdOutputStream() noexcept(false) {}
270 271 272 273 274

void FdOutputStream::write(const void* buffer, size_t size) {
  const char* pos = reinterpret_cast<const char*>(buffer);

  while (size > 0) {
275 276
    miniposix::ssize_t n;
    KJ_SYSCALL(n = miniposix::write(fd, pos, size), fd);
277
    KJ_ASSERT(n > 0, "write() returned zero.");
278 279 280 281 282
    pos += n;
    size -= n;
  }
}

283
void FdOutputStream::write(ArrayPtr<const ArrayPtr<const byte>> pieces) {
284 285 286 287 288 289 290 291 292
#if _WIN32
  // Windows has no reasonable writev(). It has WriteFileGather, but this call has the unreasonable
  // restriction that each segment must be page-aligned. So, fall back to write().

  for (auto piece: pieces) {
    write(piece.begin(), piece.size());
  }

#else
293 294 295 296 297 298 299 300 301 302 303
  // Apparently, there is a maximum number of iovecs allowed per call.  I don't understand why.
  // Also, most platforms define IOV_MAX but Linux defines only UIO_MAXIOV.  Unfortunately, Solaris
  // defines a constant UIO_MAXIOV with a different meaning, so we check for IOV_MAX first.
#if !defined(IOV_MAX) && defined(UIO_MAXIOV)
#define IOV_MAX UIO_MAXIOV
#endif
  while (pieces.size() > IOV_MAX) {
    write(pieces.slice(0, IOV_MAX));
    pieces = pieces.slice(IOV_MAX, pieces.size());
  }

304
  KJ_STACK_ARRAY(struct iovec, iov, pieces.size(), 16, 128);
305 306 307 308 309 310 311 312 313

  for (uint i = 0; i < pieces.size(); i++) {
    // writev() interface is not const-correct.  :(
    iov[i].iov_base = const_cast<byte*>(pieces[i].begin());
    iov[i].iov_len = pieces[i].size();
  }

  struct iovec* current = iov.begin();

314 315
  // Advance past any leading empty buffers so that a write full of only empty buffers does not
  // cause a syscall at all.
316 317 318 319 320
  while (current < iov.end() && current->iov_len == 0) {
    ++current;
  }

  while (current < iov.end()) {
321
    // Issue the write.
322
    ssize_t n = 0;
323
    KJ_SYSCALL(n = ::writev(fd, current, iov.end() - current), fd);
324
    KJ_ASSERT(n > 0, "writev() returned zero.");
325

326 327
    // Advance past all buffers that were fully-written.
    while (current < iov.end() && static_cast<size_t>(n) >= current->iov_len) {
328 329 330 331
      n -= current->iov_len;
      ++current;
    }

332 333
    // If we only partially-wrote one of the buffers, adjust the pointer and size to include only
    // the unwritten part.
334 335 336 337 338
    if (n > 0) {
      current->iov_base = reinterpret_cast<byte*>(current->iov_base) + n;
      current->iov_len -= n;
    }
  }
339
#endif
340 341
}

342
}  // namespace kj