Unverified Commit b72be50f authored by Ge Jun's avatar Ge Jun Committed by GitHub

Merge pull request #472 from ipconfigme/master

merge some bugfix from baidu
parents cc49eff7 a62f7946
......@@ -129,6 +129,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
static const uint32_t FLAGS_ALLOW_DONE_TO_RUN_IN_PLACE = (1 << 12);
static const uint32_t FLAGS_USED_BY_RPC = (1 << 13);
static const uint32_t FLAGS_REQUEST_WITH_AUTH = (1 << 15);
static const uint32_t FLAGS_PB_JSONIFY_EMPTY_ARRAY = (1 << 16);
public:
Controller();
......@@ -277,6 +278,11 @@ public:
void set_pb_bytes_to_base64(bool f) { set_flag(FLAGS_PB_BYTES_TO_BASE64, f); }
bool has_pb_bytes_to_base64() const { return has_flag(FLAGS_PB_BYTES_TO_BASE64); }
// Set if convert the repeated field that has no entry to a empty array
// of json in HTTP response.
void set_pb_jsonify_empty_array(bool f) { set_flag(FLAGS_PB_JSONIFY_EMPTY_ARRAY, f); }
bool has_pb_jsonify_empty_array() const { return has_flag(FLAGS_PB_JSONIFY_EMPTY_ARRAY); }
// Tell RPC that done of the RPC can be run in the same thread where
// the RPC is issued, otherwise done is always run in a different thread.
// In current implementation, this option only affects RPC that fails
......@@ -456,6 +462,9 @@ public:
const std::string& thrift_method_name() { return _thrift_method_name; }
// Get sock option. .e.g get vip info through ttm kernel module hook,
int GetSockOption(int level, int optname, void* optval, socklen_t* optlen);
private:
struct CompletionInfo {
CallId id; // call_id of the corresponding request
......@@ -591,9 +600,6 @@ private:
void set_used_by_rpc() { add_flag(FLAGS_USED_BY_RPC); }
bool is_used_by_rpc() const { return has_flag(FLAGS_USED_BY_RPC); }
// Get sock option. .e.g get vip info through ttm kernel module hook,
int GetSockOption(int level, int optname, void* optval, socklen_t* optlen);
private:
// NOTE: align and group fields to make Controller as compact as possible.
......
......@@ -678,6 +678,8 @@ void ParallelChannel::CallMethod(
for (int i = 0, j = 0; i < nchan; ++i) {
if (!aps[i].is_skip()) {
ParallelChannelDone::SubDone* sd = d->sub_done(j++);
// Forward the attachment to each sub call
sd->cntl.request_attachment().append(cntl->request_attachment());
_chans[i].chan->CallMethod(sd->ap.method, &sd->cntl,
sd->ap.request, sd->ap.response, sd);
}
......
......@@ -400,6 +400,7 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/,
std::string err;
json2pb::Pb2JsonOptions opt;
opt.bytes_to_base64 = cntl->has_pb_bytes_to_base64();
opt.jsonify_empty_array = cntl->has_pb_jsonify_empty_array();
opt.enum_option = (FLAGS_pb_enum_as_number
? json2pb::OUTPUT_ENUM_BY_NUMBER
: json2pb::OUTPUT_ENUM_BY_NAME);
......@@ -603,6 +604,7 @@ static void SendHttpResponse(Controller *cntl,
std::string err;
json2pb::Pb2JsonOptions opt;
opt.bytes_to_base64 = cntl->has_pb_bytes_to_base64();
opt.jsonify_empty_array = cntl->has_pb_jsonify_empty_array();
opt.enum_option = (FLAGS_pb_enum_as_number
? json2pb::OUTPUT_ENUM_BY_NUMBER
: json2pb::OUTPUT_ENUM_BY_NAME);
......
......@@ -49,7 +49,7 @@ namespace policy {
// 3. Use service->name() (rather than service->full_name()) + method_index
// to locate method defined in .proto file
// 4. 'user_message_size' is the size of protobuf request,
// and should be set iff request/response has attachment
// and should be set if request/response has attachment
// 5. Not supported:
// chunk_info - hulu doesn't support either
// TalkType - nobody has use this so far in hulu
......@@ -350,7 +350,12 @@ void ProcessHuluRequest(InputMessageBase* msg_base) {
sample->set_method_index(meta.method_index());
sample->set_compress_type(req_cmp_type);
sample->set_protocol_type(PROTOCOL_HULU_PBRPC);
sample->set_attachment_size(meta.user_message_size());
sample->set_user_data(meta.user_data());
if (meta.has_user_message_size()
&& static_cast<size_t>(meta.user_message_size()) < msg->payload.size()) {
size_t attachment_size = msg->payload.size() - meta.user_message_size();
sample->set_attachment_size(attachment_size);
}
sample->request = msg->payload;
sample->submit(start_parse_us);
}
......@@ -640,6 +645,7 @@ void PackHuluRequest(butil::IOBuf* req_buf,
meta.set_method_index(cntl->rpc_dump_meta()->method_index());
meta.set_compress_type(
CompressType2Hulu(cntl->rpc_dump_meta()->compress_type()));
meta.set_user_data(cntl->rpc_dump_meta()->user_data());
} else {
return cntl->SetFailed(ENOMETHOD, "method is NULL");
}
......
......@@ -22,4 +22,7 @@ message RpcDumpMeta {
// baidu_std
optional bytes authentication_data = 7;
// hulu_pbrpc
optional bytes user_data = 8;
}
......@@ -321,6 +321,8 @@ int Sender::IssueRPC(int64_t start_realtime_us) {
sub_cntl->set_request_compress_type(_main_cntl->request_compress_type());
sub_cntl->set_log_id(_main_cntl->log_id());
sub_cntl->set_request_code(_main_cntl->request_code());
// Forward request attachment to the subcall
sub_cntl->request_attachment().append(_main_cntl->request_attachment());
sel_out.channel()->CallMethod(_main_cntl->_method,
&r.sub_done->_cntl,
......
......@@ -276,7 +276,8 @@ int Stream::AppendIfNotFull(const butil::IOBuf &data) {
butil::IOBuf copied_data(data);
const int rc = _fake_socket_weak_ref->Write(&copied_data);
if (rc != 0) {
CHECK_EQ(0, rc) << "Fail to write to _fake_socket, " << berror();
// Stream may be closed by peer before
LOG(WARNING) << "Fail to write to _fake_socket, " << berror();
BAIDU_SCOPED_LOCK(_congestion_control_mutex);
_produced -= data.length();
return -1;
......
......@@ -402,13 +402,13 @@ bool JsonValueToProtoMessage(const BUTIL_RAPIDJSON_NAMESPACE::Value& json_value,
google::protobuf::Message* message,
const Json2PbOptions& options,
std::string* err) {
const google::protobuf::Descriptor* descriptor = message->GetDescriptor();
if (!json_value.IsObject()) {
J2PERROR(err, "`json_value' is not a json object");
J2PERROR(err, "`json_value' is not a json object. %s", descriptor->name().c_str());
return false;
}
const google::protobuf::Reflection* reflection = message->GetReflection();
const google::protobuf::Descriptor* descriptor = message->GetDescriptor();
std::vector<const google::protobuf::FieldDescriptor*> fields;
fields.reserve(64);
......@@ -491,6 +491,10 @@ inline bool JsonToProtoMessageInline(const std::string& json_string,
}
BUTIL_RAPIDJSON_NAMESPACE::Document d;
d.Parse<0>(json_string.c_str());
if (d.HasParseError()) {
J2PERROR(error, "Invalid json format");
return false;
}
return json2pb::JsonValueToProtoMessage(d, message, options, error);
}
......
......@@ -20,10 +20,11 @@ Pb2JsonOptions::Pb2JsonOptions()
, pretty_json(false)
, enable_protobuf_map(true)
#ifdef BAIDU_INTERNAL
, bytes_to_base64(false) {
, bytes_to_base64(false)
#else
, bytes_to_base64(true) {
, bytes_to_base64(true)
#endif
, jsonify_empty_array(false) {
}
class PbToJsonConverter {
......@@ -89,7 +90,8 @@ bool PbToJsonConverter::Convert(const google::protobuf::Message& message, Handle
}
continue;
} else if (field->is_repeated()
&& reflection->FieldSize(message, field) == 0) {
&& reflection->FieldSize(message, field) == 0
&& !_option.jsonify_empty_array) {
// Repeated field that has no entry
continue;
}
......
......@@ -38,6 +38,11 @@ struct Pb2JsonOptions {
// encoding when this option is turned on.
// Default: false for baidu-internal, true otherwise.
bool bytes_to_base64;
// Convert the repeated field that has no entry
// to a empty array of json when this option is turned on.
// Default: false
bool jsonify_empty_array;
};
// Convert protobuf `messge' to `json' according to `options'.
......
......@@ -13,6 +13,9 @@ bool IsProtobufMap(const FieldDescriptor* field) {
return false;
}
const Descriptor* entry_desc = field->message_type();
if (entry_desc == NULL) {
return false;
}
if (entry_desc->field_count() != 2) {
return false;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment