Commit dbbbc4f5 authored by gejun's avatar gejun

patch svn r34926

Change-Id: Id2f0ed5939650dbf968ad4a3c19378d8fe73a976
parent c6c700b6
...@@ -661,7 +661,7 @@ void RtmpContext::SetState(const base::EndPoint& remote_side, State new_state) { ...@@ -661,7 +661,7 @@ void RtmpContext::SetState(const base::EndPoint& remote_side, State new_state) {
RPC_VLOG << remote_side << ": " << state2str(old_state) RPC_VLOG << remote_side << ": " << state2str(old_state)
<< " -> " << state2str(new_state); << " -> " << state2str(new_state);
} }
RtmpUnsentMessage* MakeUnsentControlMessage( RtmpUnsentMessage* MakeUnsentControlMessage(
uint8_t message_type, uint32_t chunk_stream_id, uint8_t message_type, uint32_t chunk_stream_id,
const void* data, size_t n) { const void* data, size_t n) {
......
...@@ -1489,6 +1489,13 @@ void RtmpClientStream::CleanupSocketForStream( ...@@ -1489,6 +1489,13 @@ void RtmpClientStream::CleanupSocketForStream(
void RtmpClientStream::ReplaceSocketForStream( void RtmpClientStream::ReplaceSocketForStream(
SocketUniquePtr* inout, Controller* cntl) { SocketUniquePtr* inout, Controller* cntl) {
{
std::unique_lock<pthread_mutex_t> mu(_state_mutex);
if (_state == STATE_ERROR || _state == STATE_DESTROYING) {
cntl->SetFailed(EINVAL, "Fail to replace socket for stream, _state is error or destroying");
return;
}
}
SocketId esid; SocketId esid;
if (cntl->connection_type() == CONNECTION_TYPE_SHORT) { if (cntl->connection_type() == CONNECTION_TYPE_SHORT) {
if (_client_impl->CreateSocket((*inout)->remote_side(), &esid) != 0) { if (_client_impl->CreateSocket((*inout)->remote_side(), &esid) != 0) {
...@@ -1581,28 +1588,23 @@ void RtmpClientStream::OnStreamCreationDone(SocketUniquePtr& sending_sock, ...@@ -1581,28 +1588,23 @@ void RtmpClientStream::OnStreamCreationDone(SocketUniquePtr& sending_sock,
return OnFailedToCreateStream(); return OnFailedToCreateStream();
} }
// NOTE: set _rtmpsock before any error checking to make sure that int rc = 0;
// deleteStream command will be sent over _rtmpsock to release the bthread_id_t onfail_id = INVALID_BTHREAD_ID;
// server-side stream on this stream's stop.
CHECK(_rtmpsock);
const int rc = bthread_id_create(&_onfail_id, this, RunOnFailed);
if (rc) {
cntl->SetFailed(ENOMEM, "Fail to create _onfail_id: %s", berror(rc));
// no need to care about management of socket_map or setting
// sending_sock to be failed. The socket is OK to be kept in socket_map
// or let Controller::Call::OnComplete close it.
return OnFailedToCreateStream();
}
// Add a ref for RunOnFailed.
base::intrusive_ptr<RtmpClientStream>(this).detach();
// Check _state
{ {
std::unique_lock<pthread_mutex_t> mu(_state_mutex); std::unique_lock<pthread_mutex_t> mu(_state_mutex);
switch (_state) { switch (_state) {
case STATE_CREATING: case STATE_CREATING:
CHECK(_rtmpsock);
rc = bthread_id_create(&onfail_id, this, RunOnFailed);
if (rc) {
cntl->SetFailed(ENOMEM, "Fail to create _onfail_id: %s", berror(rc));
mu.unlock();
return OnFailedToCreateStream();
}
// Add a ref for RunOnFailed.
base::intrusive_ptr<RtmpClientStream>(this).detach();
_state = STATE_CREATED; _state = STATE_CREATED;
_onfail_id = onfail_id;
break; break;
case STATE_UNINITIALIZED: case STATE_UNINITIALIZED:
case STATE_CREATED: case STATE_CREATED:
...@@ -1616,7 +1618,9 @@ void RtmpClientStream::OnStreamCreationDone(SocketUniquePtr& sending_sock, ...@@ -1616,7 +1618,9 @@ void RtmpClientStream::OnStreamCreationDone(SocketUniquePtr& sending_sock,
return OnStopInternal(); return OnStopInternal();
} }
} }
_rtmpsock->NotifyOnFailed(_onfail_id); if (onfail_id != INVALID_BTHREAD_ID) {
_rtmpsock->NotifyOnFailed(onfail_id);
}
} }
void RtmpClientStream::OnStopInternal() { void RtmpClientStream::OnStopInternal() {
...@@ -1913,11 +1917,14 @@ void RtmpClientStream::Init(const RtmpClient* client, ...@@ -1913,11 +1917,14 @@ void RtmpClientStream::Init(const RtmpClient* client,
LOG(FATAL) << "RtmpClient is not initialized"; LOG(FATAL) << "RtmpClient is not initialized";
return OnStopInternal(); return OnStopInternal();
} }
if (_state == STATE_DESTROYING || _state == STATE_ERROR) { {
// already Destroy()-ed or SignalError()-ed std::unique_lock<pthread_mutex_t> mu(_state_mutex);
LOG(WARNING) << "RtmpClientStream=" << this << " was already " if (_state == STATE_DESTROYING || _state == STATE_ERROR) {
"Destroy()-ed, stop Init()"; // already Destroy()-ed or SignalError()-ed
return; LOG(WARNING) << "RtmpClientStream=" << this << " was already "
"Destroy()-ed, stop Init()";
return;
}
} }
_client_impl = client->_impl; _client_impl = client->_impl;
_options = options; _options = options;
...@@ -2105,6 +2112,7 @@ void InitSubStream::Run(const RtmpClient* client) { ...@@ -2105,6 +2112,7 @@ void InitSubStream::Run(const RtmpClient* client) {
void RtmpRetryingClientStream::Recreate() { void RtmpRetryingClientStream::Recreate() {
base::intrusive_ptr<SubStream> sub_stream(new SubStream(this)); base::intrusive_ptr<SubStream> sub_stream(new SubStream(this));
base::intrusive_ptr<SubStream> old_sub_stream;
bool destroying = false; bool destroying = false;
{ {
BAIDU_SCOPED_LOCK(_stream_mutex); BAIDU_SCOPED_LOCK(_stream_mutex);
...@@ -2115,10 +2123,14 @@ void RtmpRetryingClientStream::Recreate() { ...@@ -2115,10 +2123,14 @@ void RtmpRetryingClientStream::Recreate() {
// and Destroy() may be called, making new sub_stream leaked. // and Destroy() may be called, making new sub_stream leaked.
destroying = _destroying.load(base::memory_order_relaxed); destroying = _destroying.load(base::memory_order_relaxed);
if (!destroying) { if (!destroying) {
_using_sub_stream.swap(old_sub_stream);
_using_sub_stream = sub_stream; _using_sub_stream = sub_stream;
_changed_stream = true; _changed_stream = true;
} }
} }
if (old_sub_stream) {
old_sub_stream->Destroy();
}
if (destroying) { if (destroying) {
sub_stream->Destroy(); sub_stream->Destroy();
return; return;
......
...@@ -2308,7 +2308,7 @@ void Socket::ListPooledSockets(std::vector<SocketId>* out, size_t max_count) { ...@@ -2308,7 +2308,7 @@ void Socket::ListPooledSockets(std::vector<SocketId>* out, size_t max_count) {
} }
int Socket::GetShortSocket(Socket* main_socket, int Socket::GetShortSocket(Socket* main_socket,
SocketUniquePtr* short_socket) { SocketUniquePtr* short_socket) {
if (main_socket == NULL || short_socket == NULL) { if (main_socket == NULL || short_socket == NULL) {
LOG(ERROR) << "main_socket or short_socket is NULL"; LOG(ERROR) << "main_socket or short_socket is NULL";
return -1; return -1;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment