recordio.h 4.48 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
// recordio - A binary format to transport data from end to end.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Author: Ge,Jun (jge666@gmail.com)
// Date: Thu Nov 22 13:57:56 CST 2012

#ifndef BUTIL_RECORDIO_H
#define BUTIL_RECORDIO_H

#include "butil/iobuf.h"
#include <memory>

namespace butil {

gejun's avatar
gejun committed
26
// 0-or-1 Payload + 0-or-multiple Metas.
gejun's avatar
gejun committed
27 28 29 30
// Payload and metas are often serialized form of protobuf messages. As a
// correspondence, the implementation is not optimized for very small blobs,
// which should be batched properly before inserting(e.g. using repeated
// field in pb)
31 32 33 34 35 36 37
class Record {
public:
    struct NamedMeta {
        std::string name;
        std::shared_ptr<butil::IOBuf> data;
    };

gejun's avatar
gejun committed
38
    // Number of metas. Could be 0.
39 40 41
    size_t MetaCount() const { return _metas.size(); }

    // Get i-th Meta, out-of-range accesses may crash.
gejun's avatar
gejun committed
42
    // This method is mainly for iterating all metas.
43 44
    const NamedMeta& MetaAt(size_t i) const { return _metas[i]; }

gejun's avatar
gejun committed
45
    // Get meta by |name|. NULL on not found.
46 47
    const butil::IOBuf* Meta(const char* name) const;

gejun's avatar
gejun committed
48 49 50 51 52 53 54 55
    // Returns a mutable pointer to the meta with |name|. If the meta does
    // not exist, add it first.
    // If |null_on_found| is true and meta with |name| is present, NULL is
    // returned. This is useful for detecting uniqueness of meta names in some
    // scenarios.
    // NOTE: With the assumption that there won't be many metas, the impl.
    // tests presence by scaning all fields, which may perform badly when metas
    // are a lot.
56 57 58
    butil::IOBuf* MutableMeta(const char* name, bool null_on_found = false);
    butil::IOBuf* MutableMeta(const std::string& name, bool null_on_found = false);

gejun's avatar
gejun committed
59 60
    // Remove meta with the name. The impl. may scan all fields.
    // Returns true on erased, false on absent.
61 62
    bool RemoveMeta(const butil::StringPiece& name);

gejun's avatar
gejun committed
63
    // Get the payload. Empty by default.
64 65
    const butil::IOBuf& Payload() const { return _payload; }

gejun's avatar
gejun committed
66
    // Get a mutable pointer to the payload.
67 68 69 70 71
    butil::IOBuf* MutablePayload() { return &_payload; }

    // Clear payload and remove all meta.
    void Clear();

gejun's avatar
gejun committed
72
    // Serialized size of this record.
73 74 75 76 77 78 79 80 81
    size_t ByteSize() const;

private:
    butil::IOBuf _payload;
    std::vector<NamedMeta> _metas;
};

// Parse records from the IReader, corrupted records will be skipped.
// Example:
82
//    RecordReader rd(...);
83 84
//    Record rec;
//    while (rd.ReadNext(&rec)) {
gejun's avatar
gejun committed
85
//        // Handle the rec
86 87 88 89 90 91
//    }
//    if (rd.last_error() != RecordReader::END_OF_READER) {
//        LOG(FATAL) << "Critical error occurred";
//    }
class RecordReader {
public:
gejun's avatar
gejun committed
92
    // A special error code to mark end of input data.
93 94 95
    static const int END_OF_READER = -1;

    explicit RecordReader(IReader* reader);
gejun's avatar
gejun committed
96 97 98

    // Returns true on success and |out| is overwritten by the record.
    // False otherwise and last_error() is the error which is treated as permanent.
99 100 101
    bool ReadNext(Record* out);

    // 0 means no error.
gejun's avatar
gejun committed
102
    // END_OF_READER means all data in the IReader are successfully consumed.
103 104
    int last_error() const { return _last_error; }

105 106 107 108
    // Total bytes consumed.
    // NOTE: this value may not equal to read bytes from the IReader even if
    // the reader runs out, due to parsing errors.
    size_t offset() const { return _ncut; }
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126

private:
    bool CutUntilNextRecordCandidate();
    int CutRecord(Record* rec);

private:
    IReader* _reader;
    IOPortal _portal;
    IOBufCutter _cutter;
    size_t _ncut;
    int _last_error;
};

// Write records into the IWriter.
class RecordWriter {
public:
    explicit RecordWriter(IWriter* writer);

gejun's avatar
gejun committed
127 128 129 130 131
    // Serialize |record| into internal buffer and NOT flush into the IWriter.
    int WriteWithoutFlush(const Record& record);

    // Serialize |record| into internal buffer and flush into the IWriter.
    int Write(const Record& record);
132 133 134 135 136 137 138 139 140 141 142 143 144

    // Flush internal buffer into the IWriter.
    // Returns 0 on success, error code otherwise.
    int Flush();

private:
    IOBuf _buf;
    IWriter* _writer;
};

} // namespace butil

#endif  // BUTIL_RECORDIO_H