Commit d7f59d26 authored by gejun's avatar gejun

Port svn CI r34915 r34921 r34922

Change-Id: I5597926701445b7e2b75bbfd2f1d9320f5cdb9bd
parent a613c32d
......@@ -237,17 +237,19 @@ void Controller::InternalReset(bool in_constructor) {
_stream_creator = NULL;
}
Controller::Call::Call(Controller::Call& rhs)
: nretry(rhs.nretry)
, need_feedback(rhs.need_feedback)
, peer_id(rhs.peer_id)
, begin_time_us(rhs.begin_time_us)
, sending_sock(rhs.sending_sock.release()) {
Controller::Call::Call(Controller::Call* rhs)
: nretry(rhs->nretry)
, need_feedback(rhs->need_feedback)
, touched_by_stream_creator(rhs->touched_by_stream_creator)
, peer_id(rhs->peer_id)
, begin_time_us(rhs->begin_time_us)
, sending_sock(rhs->sending_sock.release()) {
// NOTE: fields in rhs should be reset because RPC could fail before
// setting all the fields to next call and _current_call.OnComplete
// will behave incorrectly.
rhs.need_feedback = false;
rhs.peer_id = (SocketId)-1;
rhs->need_feedback = false;
rhs->touched_by_stream_creator = false;
rhs->peer_id = (SocketId)-1;
}
Controller::Call::~Call() {
......@@ -257,6 +259,7 @@ Controller::Call::~Call() {
void Controller::Call::Reset() {
nretry = 0;
need_feedback = false;
touched_by_stream_creator = false;
peer_id = (SocketId)-1;
begin_time_us = 0;
sending_sock.reset(NULL);
......@@ -513,7 +516,8 @@ void Controller::OnVersionedRPCReturned(const CompletionInfo& info,
CHECK_EQ(0, bthread_id_unlock(info.id));
return;
}
if (!_error_code || _current_call.nretry >= _max_retry) {
if ((!_error_code && _retry_policy == NULL) ||
_current_call.nretry >= _max_retry) {
goto END_OF_RPC;
}
if (_error_code == EBACKUPREQUEST) {
......@@ -542,7 +546,7 @@ void Controller::OnVersionedRPCReturned(const CompletionInfo& info,
}
// _current_call does not end yet.
CHECK(_unfinished_call == NULL); // only one backup request now.
_unfinished_call = new (std::nothrow) Call(_current_call);
_unfinished_call = new (std::nothrow) Call(&_current_call);
if (_unfinished_call == NULL) {
SetFailed(ENOMEM, "Fail to new Call");
goto END_OF_RPC;
......@@ -693,9 +697,15 @@ void Controller::Call::OnComplete(Controller* c, int error_code/*note*/,
sock->SetLogOff();
}
}
if (touched_by_stream_creator) {
touched_by_stream_creator = false;
CHECK(c->stream_creator());
c->stream_creator()->CleanupSocketForStream(
sending_sock.get(), c, error_code);
}
// Release the `Socket' we used to send/receive data
sending_sock.reset(NULL);
if (need_feedback) {
LoadBalancer::CallInfo info;
info.in.begin_time_us = begin_time_us;
......@@ -727,6 +737,7 @@ void Controller::EndRPC(const CompletionInfo& info) {
_current_call.sending_sock, this);
}
_current_call.OnComplete(this, _error_code, info.responded);
if (_unfinished_call != NULL) {
// When _current_call is successful, mark _unfinished_call as
// EBACKUPREQUEST, we can't use 0 because the server possibly
......@@ -790,7 +801,18 @@ void Controller::EndRPC(const CompletionInfo& info) {
// No need to retry or can't retry, just call user's `done'.
const CallId saved_cid = _correlation_id;
if (_done) {
if (!FLAGS_usercode_in_pthread) {
if (!FLAGS_usercode_in_pthread || _done == DoNothing()/*Note*/) {
// Note: no need to run DoNothing in backup thread when pthread
// mode is on. Otherwise there's a tricky deadlock:
// void SomeService::CallMethod(...) { // -usercode_in_pthread=true
// ...
// channel.CallMethod(...., baidu::rpc::DoNothing());
// baidu::rpc::Join(cntl.call_id());
// ...
// }
// Join is not signalled when the done does not Run() and the done
// can't Run() because all backup threads are blocked by Join().
// Call OnRPCEnd for async RPC. The one for sync RPC is called in
// Channel::CallMethod to count in latency of the context-switch.
OnRPCEnd(base::gettimeofday_us());
......@@ -807,6 +829,7 @@ void Controller::EndRPC(const CompletionInfo& info) {
RunUserCode(RunDoneInBackupThread, this);
}
} else {
// OnRPCEnd() of sync RPC is called in the caller's thread.
// FIXME: We're assuming the calling thread is about to quit.
bthread_about_to_quit();
add_flag(FLAGS_DESTROYED_CID);
......@@ -860,7 +883,6 @@ void Controller::HandleSendFailed() {
void Controller::IssueRPC(int64_t start_realtime_us) {
_current_call.begin_time_us = start_realtime_us;
// Clear last error, Don't clear _error_text because we append to it.
const int prev_error_code = _error_code;
_error_code = 0;
// Make versioned correlation_id.
......@@ -919,7 +941,8 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
_remote_side = tmp_sock->remote_side();
}
if (_stream_creator) {
_stream_creator->ReplaceSocketForStream(&tmp_sock, this, prev_error_code);
_current_call.touched_by_stream_creator = true;
_stream_creator->ReplaceSocketForStream(&tmp_sock, this);
if (FailedInline()) {
return HandleSendFailed();
}
......
......@@ -506,13 +506,14 @@ private:
// One RPC to a channel may send several requests due to retrying.
struct Call {
Call() { Reset(); }
Call(Call&);
Call(Call*); //move semantics
~Call();
void Reset();
void OnComplete(Controller*, int error_code, bool responded);
void OnComplete(Controller* c, int error_code, bool responded);
int nretry; // sent in nretry-th retry.
bool need_feedback; // The LB needs feedback.
bool touched_by_stream_creator;
SocketId peer_id; // main server id
int64_t begin_time_us; // sent real time.
// The actual `Socket' for sending RPC. It's socket id will be
......
......@@ -15,6 +15,9 @@ class RpcRetryPolicy : public RetryPolicy {
public:
bool DoRetry(const Controller* controller) const {
const int error_code = controller->ErrorCode();
if (!error_code) {
return false;
}
return (EFAILEDSOCKET == error_code
|| EEOF == error_code
|| EHOSTDOWN == error_code
......
......@@ -1475,8 +1475,20 @@ void RtmpClientStream::SignalError() {
}
}
void RtmpClientStream::CleanupSocketForStream(
Socket* prev_sock, Controller*, int /*error_code*/) {
if (prev_sock) {
if (_from_socketmap) {
_client_impl->socket_map().Remove(prev_sock->remote_side(),
prev_sock->id());
} else {
prev_sock->SetFailed(); // not necessary, already failed.
}
}
}
void RtmpClientStream::ReplaceSocketForStream(
SocketUniquePtr* inout, Controller* cntl, int /*prev_error_code*/) {
SocketUniquePtr* inout, Controller* cntl) {
SocketId esid;
if (cntl->connection_type() == CONNECTION_TYPE_SHORT) {
if (_client_impl->CreateSocket((*inout)->remote_side(), &esid) != 0) {
......@@ -1507,6 +1519,8 @@ int RtmpClientStream::RunOnFailed(bthread_id_t id, void* data, int) {
base::intrusive_ptr<RtmpClientStream> stream(
static_cast<RtmpClientStream*>(data), false);
CHECK(stream->_rtmpsock);
// Must happen after NotifyOnFailed which is after all other callsites
// to OnStopInternal().
stream->OnStopInternal();
bthread_id_unlock_and_destroy(id);
return 0;
......@@ -1535,8 +1549,15 @@ void RtmpClientStream::OnFailedToCreateStream() {
void RtmpClientStream::OnStreamCreationDone(SocketUniquePtr& sending_sock,
Controller* cntl) {
// Always move sending_sock into _rtmpsock.
// - If the RPC is successful, moving sending_sock prevents it from
// setfailed in Controller after calling this method.
// - If the RPC is failed, OnStopInternal() can clean up the socket_map
// inserted in ReplaceSocketForStream().
_rtmpsock.swap(sending_sock);
if (cntl->Failed()) {
if (sending_sock != NULL &&
if (_rtmpsock != NULL &&
// ^ If sending_sock is NULL, the RPC fails before _pack_request
// which calls AddTransaction, in another word, RemoveTransaction
// is not needed.
......@@ -1546,7 +1567,7 @@ void RtmpClientStream::OnStreamCreationDone(SocketUniquePtr& sending_sock,
CHECK_LT(cntl->log_id(), (uint64_t)std::numeric_limits<uint32_t>::max());
const uint32_t transaction_id = cntl->log_id();
policy::RtmpContext* rtmp_ctx =
static_cast<policy::RtmpContext*>(sending_sock->parsing_context());
static_cast<policy::RtmpContext*>(_rtmpsock->parsing_context());
if (rtmp_ctx == NULL) {
LOG(FATAL) << "RtmpContext must be created";
} else {
......@@ -1563,11 +1584,7 @@ void RtmpClientStream::OnStreamCreationDone(SocketUniquePtr& sending_sock,
// NOTE: set _rtmpsock before any error checking to make sure that
// deleteStream command will be sent over _rtmpsock to release the
// server-side stream on this stream's stop.
CHECK(sending_sock);
_from_socketmap = (cntl->connection_type() == CONNECTION_TYPE_SINGLE);
// move sending_sock to prevent it from being setfailed by default
// behaviors in Controller (when connection_type is short).
_rtmpsock.swap(sending_sock);
CHECK(_rtmpsock);
const int rc = bthread_id_create(&_onfail_id, this, RunOnFailed);
if (rc) {
......@@ -1607,50 +1624,51 @@ void RtmpClientStream::OnStopInternal() {
return CallOnStop();
}
// SRS requires closeStream which is sent over this stream.
base::IOBuf req_buf1;
{
base::IOBufAsZeroCopyOutputStream zc_stream(&req_buf1);
AMFOutputStream ostream(&zc_stream);
WriteAMFString(RTMP_AMF0_COMMAND_CLOSE_STREAM, &ostream);
WriteAMFUint32(0, &ostream);
WriteAMFNull(&ostream);
CHECK(ostream.good());
}
SocketMessagePtr<policy::RtmpUnsentMessage> msg1(new policy::RtmpUnsentMessage);
msg1->header.message_length = req_buf1.size();
msg1->header.message_type = policy::RTMP_MESSAGE_COMMAND_AMF0;
msg1->header.stream_id = _message_stream_id;
msg1->chunk_stream_id = _chunk_stream_id;
msg1->body = req_buf1;
if (!_rtmpsock->Failed()) {
// SRS requires closeStream which is sent over this stream.
base::IOBuf req_buf1;
{
base::IOBufAsZeroCopyOutputStream zc_stream(&req_buf1);
AMFOutputStream ostream(&zc_stream);
WriteAMFString(RTMP_AMF0_COMMAND_CLOSE_STREAM, &ostream);
WriteAMFUint32(0, &ostream);
WriteAMFNull(&ostream);
CHECK(ostream.good());
}
SocketMessagePtr<policy::RtmpUnsentMessage> msg1(new policy::RtmpUnsentMessage);
msg1->header.message_length = req_buf1.size();
msg1->header.message_type = policy::RTMP_MESSAGE_COMMAND_AMF0;
msg1->header.stream_id = _message_stream_id;
msg1->chunk_stream_id = _chunk_stream_id;
msg1->body = req_buf1;
// Send deleteStream over the control stream.
base::IOBuf req_buf2;
{
base::IOBufAsZeroCopyOutputStream zc_stream(&req_buf2);
AMFOutputStream ostream(&zc_stream);
WriteAMFString(RTMP_AMF0_COMMAND_DELETE_STREAM, &ostream);
WriteAMFUint32(0, &ostream);
WriteAMFNull(&ostream);
WriteAMFUint32(_message_stream_id, &ostream);
CHECK(ostream.good());
}
policy::RtmpUnsentMessage* msg2 = policy::MakeUnsentControlMessage(
policy::RTMP_MESSAGE_COMMAND_AMF0, req_buf2);
msg1->next.reset(msg2);
// Send deleteStream over the control stream.
base::IOBuf req_buf2;
{
base::IOBufAsZeroCopyOutputStream zc_stream(&req_buf2);
AMFOutputStream ostream(&zc_stream);
WriteAMFString(RTMP_AMF0_COMMAND_DELETE_STREAM, &ostream);
WriteAMFUint32(0, &ostream);
WriteAMFNull(&ostream);
WriteAMFUint32(_message_stream_id, &ostream);
CHECK(ostream.good());
}
policy::RtmpUnsentMessage* msg2 = policy::MakeUnsentControlMessage(
policy::RTMP_MESSAGE_COMMAND_AMF0, req_buf2);
msg1->next.reset(msg2);
if (policy::WriteWithoutOvercrowded(_rtmpsock.get(), msg1) != 0) {
if (errno != EFAILEDSOCKET) {
PLOG(WARNING) << "Fail to send closeStream/deleteStream to "
<< _rtmpsock->remote_side() << "["
<< _message_stream_id << "]";
// Close the connection to make sure the server-side knows the
// closing event, however this may terminate other streams over
// the connection as well.
_rtmpsock->SetFailed(EFAILEDSOCKET, "Fail to send closeStream/deleteStream");
if (policy::WriteWithoutOvercrowded(_rtmpsock.get(), msg1) != 0) {
if (errno != EFAILEDSOCKET) {
PLOG(WARNING) << "Fail to send closeStream/deleteStream to "
<< _rtmpsock->remote_side() << "["
<< _message_stream_id << "]";
// Close the connection to make sure the server-side knows the
// closing event, however this may terminate other streams over
// the connection as well.
_rtmpsock->SetFailed(EFAILEDSOCKET, "Fail to send closeStream/deleteStream");
}
}
}
policy::RtmpContext* ctx =
static_cast<policy::RtmpContext*>(_rtmpsock->parsing_context());
if (ctx != NULL) {
......@@ -1909,6 +1927,7 @@ void RtmpClientStream::Init(const RtmpClient* client,
done->cntl.set_connection_type(_options.share_connection ?
CONNECTION_TYPE_SINGLE :
CONNECTION_TYPE_SHORT);
_from_socketmap = (done->cntl.connection_type() == CONNECTION_TYPE_SINGLE);
done->cntl.set_max_retry(_options.create_stream_max_retry);
if (_options.hash_code.has_been_set()) {
done->cntl.set_request_code(_options.hash_code);
......@@ -2085,13 +2104,26 @@ void InitSubStream::Run(const RtmpClient* client) {
}
void RtmpRetryingClientStream::Recreate() {
_last_creation_time_us = base::gettimeofday_us();
base::intrusive_ptr<SubStream> sub_stream(new SubStream(this));
bool destroying = false;
{
BAIDU_SCOPED_LOCK(_stream_mutex);
_using_sub_stream = sub_stream;
_changed_stream = true;
// Need to check _destroying to avoid setting the new sub_stream to a
// destroying retrying stream.
// Note: the load of _destroying and the setting of _using_sub_stream
// must be in the same lock, otherwise current bthread may be scheduled
// and Destroy() may be called, making new sub_stream leaked.
destroying = _destroying.load(base::memory_order_relaxed);
if (!destroying) {
_using_sub_stream = sub_stream;
_changed_stream = true;
}
}
if (destroying) {
sub_stream->Destroy();
return;
}
_last_creation_time_us = base::gettimeofday_us();
RtmpClientStreamOptions modified_options = _options;
if (_options.stream_name_manipulator) {
if (!modified_options.play_name.empty()) {
......
......@@ -750,9 +750,9 @@ friend class RtmpRetryingClientStream;
int Publish(const base::StringPiece& name, RtmpPublishType type);
// @StreamCreator
void ReplaceSocketForStream(SocketUniquePtr* inout, Controller* cntl,
int prev_error_code);
void ReplaceSocketForStream(SocketUniquePtr* inout, Controller* cntl);
void OnStreamCreationDone(SocketUniquePtr& sending_sock, Controller* cntl);
void CleanupSocketForStream(Socket* prev_sock, Controller*, int error_code);
void OnFailedToCreateStream();
......
......@@ -747,11 +747,14 @@ int Socket::SetFailed(int error_code, const char* error_fmt, ...) {
&_id_wait_list_mutex));
ResetAllStreams();
if (_app_connect) {
AppConnect* const saved_app_connect = _app_connect;
_app_connect = NULL;
saved_app_connect->StopConnect(this);
}
// _app_connect shouldn't be set to NULL in SetFailed otherwise
// HC is always not supported.
// FIXME: Design a better interface for AppConnect
// if (_app_connect) {
// AppConnect* const saved_app_connect = _app_connect;
// _app_connect = NULL;
// saved_app_connect->StopConnect(this);
// }
// Deref additionally which is added at creation so that this
// Socket's reference will hit 0(recycle) when no one addresses it.
......
......@@ -14,6 +14,8 @@ namespace brpc {
class Controller;
// Abstract creation of "user-level connection" over a RPC-like process.
// Lifetime of this object should be guaranteed by user during the RPC,
// generally this object is created before RPC and destroyed after RPC.
class StreamCreator {
public:
// Replace the socket in `inout' with another one (or keep as it is).
......@@ -26,18 +28,26 @@ public:
// since the framework will send request by the replaced socket directly
// when stream_creator is present.
virtual void ReplaceSocketForStream(SocketUniquePtr* inout,
Controller* cntl,
int previous_error_code) = 0;
Controller* cntl) = 0;
// Called when one interation with the server completes. A RPC for
// creating a stream may interact with servers more than once.
// `cntl' contains necessary information about the call. `sending_sock'
// is the socket to the server interacted with. If the RPC was failed,
// `sending_sock' is prossibly NULL(fail before choosing a server). User
// can own `sending_sock' and set the unique pointer to NULL, otherwise
// the socket is cleaned by framework when connection_type is non-single.
// the socket is cleaned-up by framework and then CleanupSocketForStream()
virtual void OnStreamCreationDone(SocketUniquePtr& sending_sock,
Controller* cntl) = 0;
// Called when one interation with the server completes. A RPC for
// creating a stream may interact with servers more than once.
// This method is paired with _each_ ReplaceSocketForStream().
// OnStreamCreationDone() is called _before_ last CleanupSocketForStream(),
// If OnStreamCreationDone() moved the `sending_sock', `prev_sock' to this
// method is NULL.
// Use `error_code' instead of cntl->ErrorCode().
virtual void CleanupSocketForStream(Socket* prev_sock,
Controller* cntl,
int error_code) = 0;
};
} // namespace brpc
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment