io.h 11.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
// Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
//    list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
//    this list of conditions and the following disclaimer in the documentation
//    and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

24 25
#ifndef KJ_IO_H_
#define KJ_IO_H_
26

Kenton Varda's avatar
Kenton Varda committed
27
#include <stddef.h>
Kenton Varda's avatar
Kenton Varda committed
28
#include "common.h"
29
#include "array.h"
30
#include "exception.h"
31

32
namespace kj {
33 34 35 36 37 38

// =======================================================================================
// Abstract interfaces

class InputStream {
public:
39
  virtual ~InputStream() noexcept(false);
40

41
  size_t read(void* buffer, size_t minBytes, size_t maxBytes);
42
  // Reads at least minBytes and at most maxBytes, copying them into the given buffer.  Returns
43
  // the size read.  Throws an exception on errors.  Implemented in terms of tryRead().
44 45 46 47 48 49 50
  //
  // maxBytes is the number of bytes the caller really wants, but minBytes is the minimum amount
  // needed by the caller before it can start doing useful processing.  If the stream returns less
  // than maxBytes, the caller will usually call read() again later to get the rest.  Returning
  // less than maxBytes is useful when it makes sense for the caller to parallelize processing
  // with I/O.
  //
51 52 53 54
  // Never blocks if minBytes is zero.  If minBytes is zero and maxBytes is non-zero, this may
  // attempt a non-blocking read or may just return zero.  To force a read, use a non-zero minBytes.
  // To detect EOF without throwing an exception, use tryRead().
  //
55 56 57 58 59 60
  // Cap'n Proto never asks for more bytes than it knows are part of the message.  Therefore, if
  // the InputStream happens to know that the stream will never reach maxBytes -- even if it has
  // reached minBytes -- it should throw an exception to avoid wasting time processing an incomplete
  // message.  If it can't even reach minBytes, it MUST throw an exception, as the caller is not
  // expected to understand how to deal with partial reads.

61 62 63
  virtual size_t tryRead(void* buffer, size_t minBytes, size_t maxBytes) = 0;
  // Like read(), but may return fewer than minBytes on EOF.

64 65 66 67 68 69 70 71 72 73
  inline void read(void* buffer, size_t bytes) { read(buffer, bytes, bytes); }
  // Convenience method for reading an exact number of bytes.

  virtual void skip(size_t bytes);
  // Skips past the given number of bytes, discarding them.  The default implementation read()s
  // into a scratch buffer.
};

class OutputStream {
public:
74
  virtual ~OutputStream() noexcept(false);
75 76 77 78

  virtual void write(const void* buffer, size_t size) = 0;
  // Always writes the full size.  Throws exception on error.

79
  virtual void write(ArrayPtr<const ArrayPtr<const byte>> pieces);
80 81 82 83 84 85 86 87 88 89 90 91
  // Equivalent to write()ing each byte array in sequence, which is what the default implementation
  // does.  Override if you can do something better, e.g. use writev() to do the write in a single
  // syscall.
};

class BufferedInputStream: public InputStream {
  // An input stream which buffers some bytes in memory to reduce system call overhead.
  // - OR -
  // An input stream that actually reads from some in-memory data structure and wants to give its
  // caller a direct pointer to that memory to potentially avoid a copy.

public:
92
  virtual ~BufferedInputStream() noexcept(false);
93

94
  ArrayPtr<const byte> getReadBuffer();
95 96
  // Get a direct pointer into the read buffer, which contains the next bytes in the input.  If the
  // caller consumes any bytes, it should then call skip() to indicate this.  This always returns a
97 98 99 100
  // non-empty buffer or throws an exception.  Implemented in terms of tryGetReadBuffer().

  virtual ArrayPtr<const byte> tryGetReadBuffer() = 0;
  // Like getReadBuffer() but may return an empty buffer on EOF.
101 102 103 104 105 106 107 108 109
};

class BufferedOutputStream: public OutputStream {
  // An output stream which buffers some bytes in memory to reduce system call overhead.
  // - OR -
  // An output stream that actually writes into some in-memory data structure and wants to give its
  // caller a direct pointer to that memory to potentially avoid a copy.

public:
110
  virtual ~BufferedOutputStream() noexcept(false);
111

112
  virtual ArrayPtr<byte> getWriteBuffer() = 0;
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
  // Get a direct pointer into the write buffer.  The caller may choose to fill in some prefix of
  // this buffer and then pass it to write(), in which case write() may avoid a copy.  It is
  // incorrect to pass to write any slice of this buffer which is not a prefix.
};

// =======================================================================================
// Buffered streams implemented as wrappers around regular streams

class BufferedInputStreamWrapper: public BufferedInputStream {
  // Implements BufferedInputStream in terms of an InputStream.
  //
  // Note that the underlying stream's position is unpredictable once the wrapper is destroyed,
  // unless the entire stream was consumed.  To read a predictable number of bytes in a buffered
  // way without going over, you'd need this wrapper to wrap some other wrapper which itself
  // implements an artificial EOF at the desired point.  Such a stream should be trivial to write
  // but is not provided by the library at this time.

public:
131
  explicit BufferedInputStreamWrapper(InputStream& inner, ArrayPtr<byte> buffer = nullptr);
132 133 134 135 136 137 138
  // Creates a buffered stream wrapping the given non-buffered stream.  No guarantee is made about
  // the position of the inner stream after a buffered wrapper has been created unless the entire
  // input is read.
  //
  // If the second parameter is non-null, the stream uses the given buffer instead of allocating
  // its own.  This may improve performance if the buffer can be reused.

139
  KJ_DISALLOW_COPY(BufferedInputStreamWrapper);
140
  ~BufferedInputStreamWrapper() noexcept(false);
141 142

  // implements BufferedInputStream ----------------------------------
143 144
  ArrayPtr<const byte> tryGetReadBuffer() override;
  size_t tryRead(void* buffer, size_t minBytes, size_t maxBytes) override;
145 146 147 148
  void skip(size_t bytes) override;

private:
  InputStream& inner;
149 150 151
  Array<byte> ownedBuffer;
  ArrayPtr<byte> buffer;
  ArrayPtr<byte> bufferAvailable;
152 153 154 155 156 157 158
};

class BufferedOutputStreamWrapper: public BufferedOutputStream {
  // Implements BufferedOutputStream in terms of an OutputStream.  Note that writes to the
  // underlying stream may be delayed until flush() is called or the wrapper is destroyed.

public:
159
  explicit BufferedOutputStreamWrapper(OutputStream& inner, ArrayPtr<byte> buffer = nullptr);
160 161 162 163 164
  // Creates a buffered stream wrapping the given non-buffered stream.
  //
  // If the second parameter is non-null, the stream uses the given buffer instead of allocating
  // its own.  This may improve performance if the buffer can be reused.

165
  KJ_DISALLOW_COPY(BufferedOutputStreamWrapper);
166
  ~BufferedOutputStreamWrapper() noexcept(false);
167 168 169 170 171 172 173

  void flush();
  // Force the wrapper to write any remaining bytes in its buffer to the inner stream.  Note that
  // this only flushes this object's buffer; this object has no idea how to flush any other buffers
  // that may be present in the underlying stream.

  // implements BufferedOutputStream ---------------------------------
174
  ArrayPtr<byte> getWriteBuffer() override;
175 176 177 178
  void write(const void* buffer, size_t size) override;

private:
  OutputStream& inner;
179 180
  Array<byte> ownedBuffer;
  ArrayPtr<byte> buffer;
181
  byte* bufferPos;
182
  UnwindDetector unwindDetector;
183 184 185 186 187 188 189
};

// =======================================================================================
// Array I/O

class ArrayInputStream: public BufferedInputStream {
public:
190
  explicit ArrayInputStream(ArrayPtr<const byte> array);
191
  KJ_DISALLOW_COPY(ArrayInputStream);
192
  ~ArrayInputStream() noexcept(false);
193 194

  // implements BufferedInputStream ----------------------------------
195 196
  ArrayPtr<const byte> tryGetReadBuffer() override;
  size_t tryRead(void* buffer, size_t minBytes, size_t maxBytes) override;
197 198 199
  void skip(size_t bytes) override;

private:
200
  ArrayPtr<const byte> array;
201 202 203 204
};

class ArrayOutputStream: public BufferedOutputStream {
public:
205
  explicit ArrayOutputStream(ArrayPtr<byte> array);
206
  KJ_DISALLOW_COPY(ArrayOutputStream);
207
  ~ArrayOutputStream() noexcept(false);
208

209
  ArrayPtr<byte> getArray() {
210
    // Get the portion of the array which has been filled in.
211
    return arrayPtr(array.begin(), fillPos);
212 213 214
  }

  // implements BufferedInputStream ----------------------------------
215
  ArrayPtr<byte> getWriteBuffer() override;
216 217 218
  void write(const void* buffer, size_t size) override;

private:
219
  ArrayPtr<byte> array;
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
  byte* fillPos;
};

// =======================================================================================
// File descriptor I/O

class AutoCloseFd {
  // A wrapper around a file descriptor which automatically closes the descriptor when destroyed.
  // The wrapper supports move construction for transferring ownership of the descriptor.  If
  // close() returns an error, the destructor throws an exception, UNLESS the destructor is being
  // called during unwind from another exception, in which case the close error is ignored.
  //
  // If your code is not exception-safe, you should not use AutoCloseFd.  In this case you will
  // have to call close() yourself and handle errors appropriately.

public:
  inline AutoCloseFd(): fd(-1) {}
Kenton Varda's avatar
Kenton Varda committed
237
  inline AutoCloseFd(decltype(nullptr)): fd(-1) {}
238 239
  inline explicit AutoCloseFd(int fd): fd(fd) {}
  inline AutoCloseFd(AutoCloseFd&& other): fd(other.fd) { other.fd = -1; }
240
  KJ_DISALLOW_COPY(AutoCloseFd);
241
  ~AutoCloseFd() noexcept(false);
242 243 244 245

  inline operator int() { return fd; }
  inline int get() { return fd; }

Kenton Varda's avatar
Kenton Varda committed
246 247
  inline bool operator==(decltype(nullptr)) { return fd < 0; }
  inline bool operator!=(decltype(nullptr)) { return fd >= 0; }
248 249 250

private:
  int fd;
251
  UnwindDetector unwindDetector;
252 253 254 255 256 257
};

class FdInputStream: public InputStream {
  // An InputStream wrapping a file descriptor.

public:
258
  explicit FdInputStream(int fd): fd(fd) {}
Kenton Varda's avatar
Kenton Varda committed
259
  explicit FdInputStream(AutoCloseFd fd): fd(fd), autoclose(mv(fd)) {}
260
  KJ_DISALLOW_COPY(FdInputStream);
261
  ~FdInputStream() noexcept(false);
262

263
  size_t tryRead(void* buffer, size_t minBytes, size_t maxBytes) override;
264 265 266 267 268 269 270 271 272 273

private:
  int fd;
  AutoCloseFd autoclose;
};

class FdOutputStream: public OutputStream {
  // An OutputStream wrapping a file descriptor.

public:
274
  explicit FdOutputStream(int fd): fd(fd) {}
Kenton Varda's avatar
Kenton Varda committed
275
  explicit FdOutputStream(AutoCloseFd fd): fd(fd), autoclose(mv(fd)) {}
276
  KJ_DISALLOW_COPY(FdOutputStream);
277
  ~FdOutputStream() noexcept(false);
278 279

  void write(const void* buffer, size_t size) override;
280
  void write(ArrayPtr<const ArrayPtr<const byte>> pieces) override;
281 282 283 284 285 286

private:
  int fd;
  AutoCloseFd autoclose;
};

287
}  // namespace kj
288

289
#endif  // KJ_IO_H_