Commit d7edf05a authored by wangyao02's avatar wangyao02

- Add cntl->set_pb_jsonify_empty_array(bool). Convert the repeated field that…

- Add cntl->set_pb_jsonify_empty_array(bool). Convert the repeated field that has no entry to a empty array of json in HTTP response.
parent b332e0b7
......@@ -130,6 +130,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
static const uint32_t FLAGS_ALLOW_DONE_TO_RUN_IN_PLACE = (1 << 12);
static const uint32_t FLAGS_USED_BY_RPC = (1 << 13);
static const uint32_t FLAGS_REQUEST_WITH_AUTH = (1 << 15);
static const uint32_t FLAGS_PB_JSONIFY_EMPTY_ARRAY = (1 << 16);
public:
Controller();
......@@ -278,6 +279,11 @@ public:
void set_pb_bytes_to_base64(bool f) { set_flag(FLAGS_PB_BYTES_TO_BASE64, f); }
bool has_pb_bytes_to_base64() const { return has_flag(FLAGS_PB_BYTES_TO_BASE64); }
// Set if convert the repeated field that has no entry to a empty array
// of json in HTTP response.
void set_pb_jsonify_empty_array(bool f) { set_flag(FLAGS_PB_JSONIFY_EMPTY_ARRAY, f); }
bool has_pb_jsonify_empty_array() const { return has_flag(FLAGS_PB_JSONIFY_EMPTY_ARRAY); }
// Tell RPC that done of the RPC can be run in the same thread where
// the RPC is issued, otherwise done is always run in a different thread.
// In current implementation, this option only affects RPC that fails
......@@ -457,6 +463,9 @@ public:
const std::string& thrift_method_name() { return _thrift_method_name; }
// Get sock option. .e.g get vip info through ttm kernel module hook,
int GetSockOption(int level, int optname, void* optval, socklen_t* optlen);
private:
struct CompletionInfo {
CallId id; // call_id of the corresponding request
......@@ -592,9 +601,6 @@ private:
void set_used_by_rpc() { add_flag(FLAGS_USED_BY_RPC); }
bool is_used_by_rpc() const { return has_flag(FLAGS_USED_BY_RPC); }
// Get sock option. .e.g get vip info through ttm kernel module hook,
int GetSockOption(int level, int optname, void* optval, socklen_t* optlen);
private:
// NOTE: align and group fields to make Controller as compact as possible.
......
......@@ -678,6 +678,8 @@ void ParallelChannel::CallMethod(
for (int i = 0, j = 0; i < nchan; ++i) {
if (!aps[i].is_skip()) {
ParallelChannelDone::SubDone* sd = d->sub_done(j++);
// Forward the attachment to each sub call
sd->cntl.request_attachment().append(cntl->request_attachment());
_chans[i].chan->CallMethod(sd->ap.method, &sd->cntl,
sd->ap.request, sd->ap.response, sd);
}
......
......@@ -400,6 +400,7 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/,
std::string err;
json2pb::Pb2JsonOptions opt;
opt.bytes_to_base64 = cntl->has_pb_bytes_to_base64();
opt.jsonify_empty_array = cntl->has_pb_jsonify_empty_array();
opt.enum_option = (FLAGS_pb_enum_as_number
? json2pb::OUTPUT_ENUM_BY_NUMBER
: json2pb::OUTPUT_ENUM_BY_NAME);
......@@ -604,6 +605,7 @@ static void SendHttpResponse(Controller *cntl,
std::string err;
json2pb::Pb2JsonOptions opt;
opt.bytes_to_base64 = cntl->has_pb_bytes_to_base64();
opt.jsonify_empty_array = cntl->has_pb_jsonify_empty_array();
opt.enum_option = (FLAGS_pb_enum_as_number
? json2pb::OUTPUT_ENUM_BY_NUMBER
: json2pb::OUTPUT_ENUM_BY_NAME);
......
......@@ -49,7 +49,7 @@ namespace policy {
// 3. Use service->name() (rather than service->full_name()) + method_index
// to locate method defined in .proto file
// 4. 'user_message_size' is the size of protobuf request,
// and should be set iff request/response has attachment
// and should be set if request/response has attachment
// 5. Not supported:
// chunk_info - hulu doesn't support either
// TalkType - nobody has use this so far in hulu
......@@ -355,7 +355,12 @@ void ProcessHuluRequest(InputMessageBase* msg_base) {
sample->set_method_index(meta.method_index());
sample->set_compress_type(req_cmp_type);
sample->set_protocol_type(PROTOCOL_HULU_PBRPC);
sample->set_attachment_size(meta.user_message_size());
sample->set_user_data(meta.user_data());
if (meta.has_user_message_size()
&& static_cast<size_t>(meta.user_message_size()) < msg->payload.size()) {
size_t attachment_size = msg->payload.size() - meta.user_message_size();
sample->set_attachment_size(attachment_size);
}
sample->request = msg->payload;
sample->submit(start_parse_us);
}
......@@ -645,6 +650,7 @@ void PackHuluRequest(butil::IOBuf* req_buf,
meta.set_method_index(cntl->rpc_dump_meta()->method_index());
meta.set_compress_type(
CompressType2Hulu(cntl->rpc_dump_meta()->compress_type()));
meta.set_user_data(cntl->rpc_dump_meta()->user_data());
} else {
return cntl->SetFailed(ENOMETHOD, "method is NULL");
}
......
......@@ -22,4 +22,7 @@ message RpcDumpMeta {
// baidu_std
optional bytes authentication_data = 7;
// hulu_pbrpc
optional bytes user_data = 8;
}
......@@ -321,6 +321,8 @@ int Sender::IssueRPC(int64_t start_realtime_us) {
sub_cntl->set_request_compress_type(_main_cntl->request_compress_type());
sub_cntl->set_log_id(_main_cntl->log_id());
sub_cntl->set_request_code(_main_cntl->request_code());
// Forward request attachment to the subcall
sub_cntl->request_attachment().append(_main_cntl->request_attachment());
sel_out.channel()->CallMethod(_main_cntl->_method,
&r.sub_done->_cntl,
......
......@@ -276,7 +276,8 @@ int Stream::AppendIfNotFull(const butil::IOBuf &data) {
butil::IOBuf copied_data(data);
const int rc = _fake_socket_weak_ref->Write(&copied_data);
if (rc != 0) {
CHECK_EQ(0, rc) << "Fail to write to _fake_socket, " << berror();
// Stream may be closed by peer before
LOG(WARNING) << "Fail to write to _fake_socket, " << berror();
BAIDU_SCOPED_LOCK(_congestion_control_mutex);
_produced -= data.length();
return -1;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment