1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
// recordio - A binary format to transport data from end to end.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Date: Thu Nov 22 13:57:56 CST 2012
#ifndef BUTIL_RECORDIO_H
#define BUTIL_RECORDIO_H
#include "butil/iobuf.h"
#include <memory>
namespace butil {
// 0-or-1 Payload + 0-or-multiple Metas.
// Payload and metas are often serialized form of protobuf messages. As a
// correspondence, the implementation is not optimized for very small blobs,
// which should be batched properly before inserting(e.g. using repeated
// field in pb)
class Record {
public:
struct NamedMeta {
std::string name;
std::shared_ptr<butil::IOBuf> data;
};
// Number of metas. Could be 0.
size_t MetaCount() const { return _metas.size(); }
// Get i-th Meta, out-of-range accesses may crash.
// This method is mainly for iterating all metas.
const NamedMeta& MetaAt(size_t i) const { return _metas[i]; }
// Get meta by |name|. NULL on not found.
const butil::IOBuf* Meta(const char* name) const;
// Returns a mutable pointer to the meta with |name|. If the meta does
// not exist, add it first.
// If |null_on_found| is true and meta with |name| is present, NULL is
// returned. This is useful for detecting uniqueness of meta names in some
// scenarios.
// NOTE: With the assumption that there won't be many metas, the impl.
// tests presence by scaning all fields, which may perform badly when metas
// are a lot.
butil::IOBuf* MutableMeta(const char* name, bool null_on_found = false);
butil::IOBuf* MutableMeta(const std::string& name, bool null_on_found = false);
// Remove meta with the name. The impl. may scan all fields.
// Returns true on erased, false on absent.
bool RemoveMeta(const butil::StringPiece& name);
// Get the payload. Empty by default.
const butil::IOBuf& Payload() const { return _payload; }
// Get a mutable pointer to the payload.
butil::IOBuf* MutablePayload() { return &_payload; }
// Clear payload and remove all meta.
void Clear();
// Serialized size of this record.
size_t ByteSize() const;
private:
butil::IOBuf _payload;
std::vector<NamedMeta> _metas;
};
// Parse records from the IReader, corrupted records will be skipped.
// Example:
// RecordReader rd(...);
// Record rec;
// while (rd.ReadNext(&rec)) {
// // Handle the rec
// }
// if (rd.last_error() != RecordReader::END_OF_READER) {
// LOG(FATAL) << "Critical error occurred";
// }
class RecordReader {
public:
// A special error code to mark end of input data.
static const int END_OF_READER = -1;
explicit RecordReader(IReader* reader);
// Returns true on success and |out| is overwritten by the record.
// False otherwise and last_error() is the error which is treated as permanent.
bool ReadNext(Record* out);
// 0 means no error.
// END_OF_READER means all data in the IReader are successfully consumed.
int last_error() const { return _last_error; }
// Total bytes consumed.
// NOTE: this value may not equal to read bytes from the IReader even if
// the reader runs out, due to parsing errors.
size_t offset() const { return _ncut; }
private:
bool CutUntilNextRecordCandidate();
int CutRecord(Record* rec);
private:
IReader* _reader;
IOPortal _portal;
IOBufCutter _cutter;
size_t _ncut;
int _last_error;
};
// Write records into the IWriter.
class RecordWriter {
public:
explicit RecordWriter(IWriter* writer);
// Serialize |record| into internal buffer and NOT flush into the IWriter.
int WriteWithoutFlush(const Record& record);
// Serialize |record| into internal buffer and flush into the IWriter.
int Write(const Record& record);
// Flush internal buffer into the IWriter.
// Returns 0 on success, error code otherwise.
int Flush();
private:
IOBuf _buf;
IWriter* _writer;
};
} // namespace butil
#endif // BUTIL_RECORDIO_H