Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
B
brpc
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
brpc
Commits
a403ced6
Commit
a403ced6
authored
Aug 28, 2017
by
zhujiashun
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Patch r35170
Change-Id: I9f8a4b282ea9f8c69605cb6706d3d4b3dc46e45d
parent
389bc006
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
171 additions
and
155 deletions
+171
-155
rtmp.cpp
src/brpc/rtmp.cpp
+83
-94
rtmp.h
src/brpc/rtmp.h
+88
-61
No files found.
src/brpc/rtmp.cpp
View file @
a403ced6
...
...
@@ -1205,12 +1205,17 @@ RtmpStreamBase::RtmpStreamBase(bool is_client)
,
_has_data_ever
(
false
)
,
_message_stream_id
(
0
)
,
_chunk_stream_id
(
0
)
,
_create_realtime_us
(
base
::
gettimeofday_us
())
{
,
_create_realtime_us
(
base
::
gettimeofday_us
())
,
_is_server_accepted
(
false
)
{
}
RtmpStreamBase
::~
RtmpStreamBase
()
{
}
void
RtmpStreamBase
::
Destroy
()
{
return
;
}
int
RtmpStreamBase
::
SendMessage
(
uint32_t
timestamp
,
uint8_t
message_type
,
const
base
::
IOBuf
&
body
)
{
...
...
@@ -1328,6 +1333,11 @@ int RtmpStreamBase::SendAACMessage(const RtmpAACMessage& msg) {
return
_rtmpsock
->
Write
(
msg2
);
}
int
RtmpStreamBase
::
SendUserMessage
(
void
*
msg
)
{
CHECK
(
false
)
<<
"You should implement your own SendUserMessage"
;
return
0
;
}
int
RtmpStreamBase
::
SendVideoMessage
(
const
RtmpVideoMessage
&
msg
)
{
if
(
_rtmpsock
==
NULL
)
{
errno
=
EPERM
;
...
...
@@ -1413,8 +1423,17 @@ const char* RtmpObjectEncoding2Str(RtmpObjectEncoding e) {
return
"Unknown RtmpObjectEncoding"
;
}
void
RtmpStreamBase
::
SignalError
()
{
return
;
}
void
RtmpStreamBase
::
OnFirstMessage
()
{}
void
RtmpStreamBase
::
OnUserData
(
void
*
data
)
{
LOG
(
INFO
)
<<
remote_side
()
<<
'['
<<
stream_id
()
<<
"] ignored UserData{}"
;
}
void
RtmpStreamBase
::
OnMetaData
(
AMFObject
*
metadata
,
const
base
::
StringPiece
&
name
)
{
LOG
(
INFO
)
<<
remote_side
()
<<
'['
<<
stream_id
()
<<
"] ignored MetaData{"
<<
*
metadata
<<
'}'
...
...
@@ -1467,6 +1486,13 @@ void RtmpStreamBase::EndProcessingMessage() {
}
}
void
RtmpStreamBase
::
CallOnUserData
(
void
*
data
)
{
if
(
BeginProcessingMessage
(
"OnUserData()"
))
{
OnUserData
(
data
);
EndProcessingMessage
();
}
}
void
RtmpStreamBase
::
CallOnMetaData
(
AMFObject
*
obj
,
const
base
::
StringPiece
&
name
)
{
if
(
BeginProcessingMessage
(
"OnMetaData()"
))
{
OnMetaData
(
obj
,
name
);
...
...
@@ -1526,7 +1552,6 @@ RtmpClientStream::RtmpClientStream()
,
_create_stream_rpc_id
(
INVALID_BTHREAD_ID
)
,
_from_socketmap
(
true
)
,
_created_stream_with_play_or_publish
(
false
)
,
_is_server_accepted
(
false
)
,
_state
(
STATE_UNINITIALIZED
)
{
get_rtmp_bvars
()
->
client_stream_count
<<
1
;
_self_ref
.
reset
(
this
);
...
...
@@ -2128,14 +2153,14 @@ RtmpRetryingClientStream::RtmpRetryingClientStream()
,
_last_creation_time_us
(
0
)
,
_last_retry_start_time_us
(
0
)
,
_create_timer_id
(
0
)
,
_
client_selec
tor
(
NULL
)
{
,
_
sub_stream_crea
tor
(
NULL
)
{
get_rtmp_bvars
()
->
retrying_client_stream_count
<<
1
;
_self_ref
.
reset
(
this
);
}
RtmpRetryingClientStream
::~
RtmpRetryingClientStream
()
{
delete
_
client_selec
tor
;
_
client_selec
tor
=
NULL
;
delete
_
sub_stream_crea
tor
;
_
sub_stream_crea
tor
=
NULL
;
get_rtmp_bvars
()
->
retrying_client_stream_count
<<
-
1
;
}
...
...
@@ -2160,7 +2185,7 @@ void RtmpRetryingClientStream::Destroy() {
base
::
intrusive_ptr
<
RtmpRetryingClientStream
>
self_ref
;
_self_ref
.
swap
(
self_ref
);
base
::
intrusive_ptr
<
SubStream
>
old_sub_stream
;
base
::
intrusive_ptr
<
RtmpStreamBase
>
old_sub_stream
;
{
BAIDU_SCOPED_LOCK
(
_stream_mutex
);
// swap instead of reset(NULL) to make the stream destructed
...
...
@@ -2182,33 +2207,18 @@ void RtmpRetryingClientStream::Destroy() {
}
void
RtmpRetryingClientStream
::
Init
(
const
RtmpClient
*
client
,
const
RtmpRetryingClientStreamOptions
&
options
)
{
if
(
_destroying
.
load
(
base
::
memory_order_relaxed
))
{
LOG
(
WARNING
)
<<
"RtmpRetryingClientStream="
<<
this
<<
" was already "
"Destroy()-ed, stop Init()"
;
return
;
}
_client_copy
=
*
client
;
_options
=
options
;
// retrying stream does not support this option.
_options
.
wait_until_play_or_publish_is_sent
=
false
;
_last_retry_start_time_us
=
base
::
gettimeofday_us
();
Recreate
();
}
void
RtmpRetryingClientStream
::
Init
(
RtmpClientSelector
*
client_selector
,
SubStreamCreator
*
sub_stream_creator
,
const
RtmpRetryingClientStreamOptions
&
options
)
{
if
(
client_selec
tor
==
NULL
)
{
LOG
(
ERROR
)
<<
"
client_selec
tor is NULL"
;
if
(
sub_stream_crea
tor
==
NULL
)
{
LOG
(
ERROR
)
<<
"
sub_stream_crea
tor is NULL"
;
return
CallOnStopIfNeeded
();
}
_sub_stream_creator
=
sub_stream_creator
;
if
(
_destroying
.
load
(
base
::
memory_order_relaxed
))
{
LOG
(
WARNING
)
<<
"RtmpRetryingClientStream="
<<
this
<<
" was already "
"Destroy()-ed, stop Init()"
;
return
;
}
_client_selector
=
client_selector
;
_options
=
options
;
// retrying stream does not support this option.
_options
.
wait_until_play_or_publish_is_sent
=
false
;
...
...
@@ -2216,26 +2226,41 @@ void RtmpRetryingClientStream::Init(
Recreate
();
}
class
InitSubStream
:
public
OnGetRtmpClient
{
public
:
void
Run
(
const
RtmpClient
*
);
public
:
base
::
intrusive_ptr
<
RtmpRetryingClientStream
::
SubStream
>
sub_stream
;
RtmpClientStreamOptions
options
;
};
void
RetryingClientMessageHandler
::
OnPlayable
()
{
_parent
->
OnPlayable
();
}
void
InitSubStream
::
Run
(
const
RtmpClient
*
client
)
{
std
::
unique_ptr
<
InitSubStream
>
delete_self
(
this
);
if
(
client
==
NULL
)
{
sub_stream
->
Destroy
();
}
else
{
sub_stream
->
Init
(
client
,
options
);
}
void
RetryingClientMessageHandler
::
OnUserData
(
void
*
msg
)
{
_parent
->
CallOnUserData
(
msg
);
}
void
RetryingClientMessageHandler
::
OnMetaData
(
brpc
::
AMFObject
*
metadata
,
const
base
::
StringPiece
&
name
)
{
_parent
->
CallOnMetaData
(
metadata
,
name
);
}
void
RetryingClientMessageHandler
::
OnAudioMessage
(
brpc
::
RtmpAudioMessage
*
msg
)
{
_parent
->
CallOnAudioMessage
(
msg
);
}
void
RetryingClientMessageHandler
::
OnVideoMessage
(
brpc
::
RtmpVideoMessage
*
msg
)
{
_parent
->
CallOnVideoMessage
(
msg
);
}
void
RetryingClientMessageHandler
::
OnSharedObjectMessage
(
RtmpSharedObjectMessage
*
msg
)
{
_parent
->
CallOnSharedObjectMessage
(
msg
);
}
void
RetryingClientMessageHandler
::
OnSubStreamStop
(
RtmpStreamBase
*
sub_stream
)
{
_parent
->
OnSubStreamStop
(
sub_stream
);
}
RetryingClientMessageHandler
::
RetryingClientMessageHandler
(
RtmpRetryingClientStream
*
parent
)
:
_parent
(
parent
)
{}
void
RtmpRetryingClientStream
::
Recreate
()
{
base
::
intrusive_ptr
<
SubStream
>
sub_stream
(
new
SubStream
(
this
));
base
::
intrusive_ptr
<
SubStream
>
old_sub_stream
;
base
::
intrusive_ptr
<
RtmpStreamBase
>
sub_stream
;
_sub_stream_creator
->
NewSubStream
(
new
RetryingClientMessageHandler
(
this
),
&
sub_stream
);
base
::
intrusive_ptr
<
RtmpStreamBase
>
old_sub_stream
;
bool
destroying
=
false
;
{
BAIDU_SCOPED_LOCK
(
_stream_mutex
);
...
...
@@ -2259,28 +2284,11 @@ void RtmpRetryingClientStream::Recreate() {
return
;
}
_last_creation_time_us
=
base
::
gettimeofday_us
();
RtmpClientStreamOptions
modified_options
=
_options
;
if
(
_options
.
stream_name_manipulator
)
{
if
(
!
modified_options
.
play_name
.
empty
())
{
_options
.
stream_name_manipulator
->
ModifyStreamName
(
&
modified_options
.
play_name
);
}
if
(
!
modified_options
.
publish_name
.
empty
())
{
_options
.
stream_name_manipulator
->
ModifyStreamName
(
&
modified_options
.
publish_name
);
}
}
// If Init() of sub_stream is called before setting _using_sub_stream,
// OnStop() may happen before _using_sub_stream is set and the stopped
// stream is wrongly left in the variable.
if
(
_client_selector
)
{
InitSubStream
*
done
=
new
InitSubStream
;
done
->
sub_stream
=
sub_stream
;
done
->
options
=
modified_options
;
_client_selector
->
StartGettingRtmpClient
(
done
);
}
else
{
sub_stream
->
Init
(
&
_client_copy
,
modified_options
);
}
_sub_stream_creator
->
LaunchSubStream
(
sub_stream
.
get
(),
&
_options
);
}
void
RtmpRetryingClientStream
::
OnRecreateTimer
(
void
*
arg
)
{
...
...
@@ -2290,11 +2298,11 @@ void RtmpRetryingClientStream::OnRecreateTimer(void* arg) {
ptr
->
Recreate
();
}
void
RtmpRetryingClientStream
::
OnSubStreamStop
(
SubStream
*
sub_stream
)
{
void
RtmpRetryingClientStream
::
OnSubStreamStop
(
RtmpStreamBase
*
sub_stream
)
{
// Make sure the sub_stream is destroyed after this function.
DestroyingPtr
<
SubStream
>
sub_stream_guard
(
sub_stream
);
DestroyingPtr
<
RtmpStreamBase
>
sub_stream_guard
(
sub_stream
);
base
::
intrusive_ptr
<
SubStream
>
removed_sub_stream
;
base
::
intrusive_ptr
<
RtmpStreamBase
>
removed_sub_stream
;
{
BAIDU_SCOPED_LOCK
(
_stream_mutex
);
if
(
sub_stream
==
_using_sub_stream
)
{
...
...
@@ -2368,7 +2376,7 @@ void RtmpRetryingClientStream::OnSubStreamStop(SubStream* sub_stream) {
}
int
RtmpRetryingClientStream
::
AcquireStreamToSend
(
base
::
intrusive_ptr
<
SubStream
>*
ptr
)
{
base
::
intrusive_ptr
<
RtmpStreamBase
>*
ptr
)
{
BAIDU_SCOPED_LOCK
(
_stream_mutex
);
if
(
!
_using_sub_stream
)
{
errno
=
EPERM
;
...
...
@@ -2389,7 +2397,7 @@ int RtmpRetryingClientStream::AcquireStreamToSend(
}
int
RtmpRetryingClientStream
::
SendMetaData
(
const
AMFObject
&
obj
,
const
base
::
StringPiece
&
name
)
{
base
::
intrusive_ptr
<
SubStream
>
ptr
;
base
::
intrusive_ptr
<
RtmpStreamBase
>
ptr
;
if
(
AcquireStreamToSend
(
&
ptr
)
!=
0
)
{
return
-
1
;
}
...
...
@@ -2398,7 +2406,7 @@ int RtmpRetryingClientStream::SendMetaData(const AMFObject& obj, const base::Str
int
RtmpRetryingClientStream
::
SendSharedObjectMessage
(
const
RtmpSharedObjectMessage
&
msg
)
{
base
::
intrusive_ptr
<
SubStream
>
ptr
;
base
::
intrusive_ptr
<
RtmpStreamBase
>
ptr
;
if
(
AcquireStreamToSend
(
&
ptr
)
!=
0
)
{
return
-
1
;
}
...
...
@@ -2406,7 +2414,7 @@ int RtmpRetryingClientStream::SendSharedObjectMessage(
}
int
RtmpRetryingClientStream
::
SendAudioMessage
(
const
RtmpAudioMessage
&
msg
)
{
base
::
intrusive_ptr
<
SubStream
>
ptr
;
base
::
intrusive_ptr
<
RtmpStreamBase
>
ptr
;
if
(
AcquireStreamToSend
(
&
ptr
)
!=
0
)
{
return
-
1
;
}
...
...
@@ -2414,7 +2422,7 @@ int RtmpRetryingClientStream::SendAudioMessage(const RtmpAudioMessage& msg) {
}
int
RtmpRetryingClientStream
::
SendAACMessage
(
const
RtmpAACMessage
&
msg
)
{
base
::
intrusive_ptr
<
SubStream
>
ptr
;
base
::
intrusive_ptr
<
RtmpStreamBase
>
ptr
;
if
(
AcquireStreamToSend
(
&
ptr
)
!=
0
)
{
return
-
1
;
}
...
...
@@ -2422,7 +2430,7 @@ int RtmpRetryingClientStream::SendAACMessage(const RtmpAACMessage& msg) {
}
int
RtmpRetryingClientStream
::
SendVideoMessage
(
const
RtmpVideoMessage
&
msg
)
{
base
::
intrusive_ptr
<
SubStream
>
ptr
;
base
::
intrusive_ptr
<
RtmpStreamBase
>
ptr
;
if
(
AcquireStreamToSend
(
&
ptr
)
!=
0
)
{
return
-
1
;
}
...
...
@@ -2430,16 +2438,15 @@ int RtmpRetryingClientStream::SendVideoMessage(const RtmpVideoMessage& msg) {
}
int
RtmpRetryingClientStream
::
SendAVCMessage
(
const
RtmpAVCMessage
&
msg
)
{
base
::
intrusive_ptr
<
SubStream
>
ptr
;
base
::
intrusive_ptr
<
RtmpStreamBase
>
ptr
;
if
(
AcquireStreamToSend
(
&
ptr
)
!=
0
)
{
return
-
1
;
}
return
ptr
->
SendAVCMessage
(
msg
);
}
void
RtmpRetryingClientStream
::
StopCurrentStream
()
{
base
::
intrusive_ptr
<
SubStream
>
sub_stream
;
base
::
intrusive_ptr
<
RtmpStreamBase
>
sub_stream
;
{
BAIDU_SCOPED_LOCK
(
_stream_mutex
);
sub_stream
=
_using_sub_stream
;
...
...
@@ -2449,30 +2456,8 @@ void RtmpRetryingClientStream::StopCurrentStream() {
}
}
void
RtmpRetryingClientStream
::
SubStream
::
OnFirstMessage
()
{
_parent
->
OnPlayable
();
}
void
RtmpRetryingClientStream
::
OnPlayable
()
{}
void
RtmpRetryingClientStream
::
SubStream
::
OnMetaData
(
AMFObject
*
obj
,
const
base
::
StringPiece
&
name
)
{
_parent
->
CallOnMetaData
(
obj
,
name
);
}
void
RtmpRetryingClientStream
::
SubStream
::
OnSharedObjectMessage
(
RtmpSharedObjectMessage
*
msg
)
{
_parent
->
CallOnSharedObjectMessage
(
msg
);
}
void
RtmpRetryingClientStream
::
SubStream
::
OnAudioMessage
(
RtmpAudioMessage
*
msg
)
{
_parent
->
CallOnAudioMessage
(
msg
);
}
void
RtmpRetryingClientStream
::
SubStream
::
OnVideoMessage
(
RtmpVideoMessage
*
msg
)
{
_parent
->
CallOnVideoMessage
(
msg
);
}
void
RtmpRetryingClientStream
::
SubStream
::
OnStop
()
{
_parent
->
OnSubStreamStop
(
this
);
}
base
::
EndPoint
RtmpRetryingClientStream
::
remote_side
()
const
{
{
BAIDU_SCOPED_LOCK
(
_stream_mutex
);
...
...
@@ -2510,6 +2495,10 @@ RtmpServerStream::~RtmpServerStream() {
get_rtmp_bvars
()
->
server_stream_count
<<
-
1
;
}
void
RtmpServerStream
::
Destroy
()
{
CHECK
(
false
)
<<
"You're not supposed to call Destroy() for server-side streams"
;
}
void
RtmpServerStream
::
OnPlay
(
const
RtmpPlayOptions
&
opt
,
base
::
Status
*
status
,
google
::
protobuf
::
Closure
*
done
)
{
...
...
src/brpc/rtmp.h
View file @
a403ced6
...
...
@@ -474,14 +474,26 @@ enum RtmpLimitType {
};
// The common part of RtmpClientStream and RtmpServerStream.
class
RtmpStreamBase
:
public
SharedObject
{
class
RtmpStreamBase
:
public
SharedObject
,
public
Destroyable
{
public
:
explicit
RtmpStreamBase
(
bool
is_client
);
// @Destroyable
// For ClientStream, this function must be called to end this stream no matter
// Init() is called or not. Use DestroyingPtr<> which is a specialized unique_ptr
// to call Destroy() automatically.
// If this stream is enclosed in intrusive_ptr<>, this method can be called
// before/during Init(), or multiple times, because the stream is not
// destructed yet after calling Destroy(), otherwise the behavior is
// undefined.
virtual
void
Destroy
();
// Process media messages from the peer.
// Following methods and OnStop() on the same stream are never called
// simultaneously.
// NOTE: Inputs can be modified and consumed.
virtual
void
OnUserData
(
void
*
msg
);
virtual
void
OnMetaData
(
AMFObject
*
,
const
base
::
StringPiece
&
);
virtual
void
OnSharedObjectMessage
(
RtmpSharedObjectMessage
*
msg
);
virtual
void
OnAudioMessage
(
RtmpAudioMessage
*
msg
);
...
...
@@ -505,6 +517,8 @@ public:
virtual
int
SendAACMessage
(
const
RtmpAACMessage
&
msg
);
virtual
int
SendVideoMessage
(
const
RtmpVideoMessage
&
msg
);
virtual
int
SendAVCMessage
(
const
RtmpAVCMessage
&
msg
);
// msg is owned by the caller of this function
virtual
int
SendUserMessage
(
void
*
msg
);
// Send a message to the peer to make it stop. The concrete message depends
// on implementation of the stream.
...
...
@@ -548,6 +562,15 @@ public:
Socket
*
socket
()
{
return
_rtmpsock
.
get
();
}
const
Socket
*
socket
()
const
{
return
_rtmpsock
.
get
();
}
// Returns true when the server accepted play or publish command.
// The acquire fence makes sure the callsite seeing true must be after
// sending play or publish command (possibly in another thread).
bool
is_server_accepted
()
const
{
return
_is_server_accepted
.
load
(
base
::
memory_order_acquire
);
}
// Explicitly notify error to current stream
virtual
void
SignalError
();
protected
:
friend
class
policy
::
RtmpContext
;
friend
class
policy
::
RtmpChunkStream
;
...
...
@@ -563,6 +586,7 @@ friend class policy::OnServerStreamCreated;
// implement the exclusion.
bool
BeginProcessingMessage
(
const
char
*
fun_name
);
void
EndProcessingMessage
();
void
CallOnUserData
(
void
*
data
);
void
CallOnMetaData
(
AMFObject
*
,
const
base
::
StringPiece
&
);
void
CallOnSharedObjectMessage
(
RtmpSharedObjectMessage
*
msg
);
void
CallOnAudioMessage
(
RtmpAudioMessage
*
msg
);
...
...
@@ -579,6 +603,7 @@ friend class policy::OnServerStreamCreated;
int64_t
_create_realtime_us
;
SocketUniquePtr
_rtmpsock
;
base
::
Mutex
_call_mutex
;
base
::
atomic
<
bool
>
_is_server_accepted
;
};
struct
RtmpClientOptions
{
...
...
@@ -730,13 +755,6 @@ class RtmpClientStream : public RtmpStreamBase
public
:
RtmpClientStream
();
// Must be called to end this stream no matter Init() is called or not.
// Use DestroyingPtr<> which is a specialized unique_ptr to call Destroy()
// automatically.
// If this stream is enclosed in intrusive_ptr<>, this method can be called
// before/during Init(), or multiple times, because the stream is not
// destructed yet after calling Destroy(), otherwise the behavior is
// undefined.
void
Destroy
();
// Create this stream on `client' according to `options'.
...
...
@@ -755,12 +773,6 @@ public:
int
Pause
(
bool
pause_or_unpause
,
double
offset_ms
);
// Returns true when the server accepted play or publish command.
// The acquire fence makes sure the callsite seeing true must be after
// sending play or publish command (possibly in another thread).
bool
is_server_accepted
()
const
{
return
_is_server_accepted
.
load
(
base
::
memory_order_acquire
);
}
// The options passed to Init()
const
RtmpClientStreamOptions
&
options
()
const
{
return
_options
;
}
...
...
@@ -803,7 +815,6 @@ friend class RtmpRetryingClientStream;
CallId
_create_stream_rpc_id
;
bool
_from_socketmap
;
bool
_created_stream_with_play_or_publish
;
base
::
atomic
<
bool
>
_is_server_accepted
;
enum
State
{
STATE_UNINITIALIZED
,
STATE_CREATING
,
...
...
@@ -816,16 +827,6 @@ friend class RtmpRetryingClientStream;
RtmpClientStreamOptions
_options
;
};
// stream_name may contain timestamps that should be updated before each retry.
// Inherit this class to do the update.
class
RtmpStreamNameManipulator
:
public
brpc
::
SharedObject
{
public
:
virtual
~
RtmpStreamNameManipulator
()
{}
// *name is play_name or publish_name in RtmpClientStreamOptions, set
// the modified name back to *name before returning.
virtual
void
ModifyStreamName
(
std
::
string
*
name
)
=
0
;
};
struct
RtmpRetryingClientStreamOptions
:
public
RtmpClientStreamOptions
{
// Wait for at least so many milliseconds before next retry.
// Default: 1000
...
...
@@ -849,10 +850,6 @@ struct RtmpRetryingClientStreamOptions : public RtmpClientStreamOptions {
// Default: true
bool
quit_when_no_data_ever
;
// Modify play_name or publish_name before each retry.
// Default: NULL
base
::
intrusive_ptr
<
RtmpStreamNameManipulator
>
stream_name_manipulator
;
RtmpRetryingClientStreamOptions
();
};
...
...
@@ -872,6 +869,54 @@ public:
virtual
void
StartGettingRtmpClient
(
OnGetRtmpClient
*
done
)
=
0
;
};
// Base class for handling the messages received by a SubStream
class
RtmpMessageHandler
{
public
:
virtual
void
OnPlayable
()
=
0
;
virtual
void
OnUserData
(
void
*
)
=
0
;
virtual
void
OnMetaData
(
brpc
::
AMFObject
*
metadata
,
const
base
::
StringPiece
&
name
)
=
0
;
virtual
void
OnAudioMessage
(
brpc
::
RtmpAudioMessage
*
msg
)
=
0
;
virtual
void
OnVideoMessage
(
brpc
::
RtmpVideoMessage
*
msg
)
=
0
;
virtual
void
OnSharedObjectMessage
(
RtmpSharedObjectMessage
*
msg
)
=
0
;
virtual
void
OnSubStreamStop
(
RtmpStreamBase
*
sub_stream
)
=
0
;
virtual
~
RtmpMessageHandler
()
{}
};
class
RtmpRetryingClientStream
;
// RtmpMessageHandler for RtmpRetryingClientStream
class
RetryingClientMessageHandler
:
public
RtmpMessageHandler
{
public
:
RetryingClientMessageHandler
(
RtmpRetryingClientStream
*
parent
);
~
RetryingClientMessageHandler
()
{}
void
OnPlayable
();
void
OnUserData
(
void
*
);
void
OnMetaData
(
brpc
::
AMFObject
*
metadata
,
const
base
::
StringPiece
&
name
);
void
OnAudioMessage
(
brpc
::
RtmpAudioMessage
*
msg
);
void
OnVideoMessage
(
brpc
::
RtmpVideoMessage
*
msg
);
void
OnSharedObjectMessage
(
RtmpSharedObjectMessage
*
msg
);
void
OnSubStreamStop
(
RtmpStreamBase
*
sub_stream
);
private
:
base
::
intrusive_ptr
<
RtmpRetryingClientStream
>
_parent
;
};
class
SubStreamCreator
{
public
:
// Create a new SubStream and use *message_handler to handle messages from
// the current SubStream. *sub_stream is set iff the creation is successful.
// Note: message_handler is OWNED by this creator and deleted by the creator.
virtual
void
NewSubStream
(
RtmpMessageHandler
*
message_handler
,
base
::
intrusive_ptr
<
RtmpStreamBase
>*
sub_stream
)
=
0
;
// Do the Initialization of sub_stream. If an error happens, sub_stream->Destroy()
// would be called.
// Note: sub_stream is not OWNED by the creator.
virtual
void
LaunchSubStream
(
RtmpStreamBase
*
sub_stream
,
RtmpRetryingClientStreamOptions
*
options
)
=
0
;
virtual
~
SubStreamCreator
()
{}
};
class
RtmpRetryingClientStream
:
public
RtmpStreamBase
{
public
:
RtmpRetryingClientStream
();
...
...
@@ -879,16 +924,10 @@ public:
// Must be called to end this stream no matter Init() is called or not.
void
Destroy
();
// [ Must be called ]
// Initialize this stream with the given client. The client is not used
// anymore after calling this method.
void
Init
(
const
RtmpClient
*
client
,
const
RtmpRetryingClientStreamOptions
&
options
);
// Initialize this stream with the given client selector which may return a
// different RtmpClient each time.
// NOTE: client_selector is OWNED by this stream and deleted by this stream.
void
Init
(
RtmpClientSelector
*
client_selector
,
// Initialize this stream with the given sub_stream_creator which may create a
// different sub stream each time.
// NOTE: sub_stream_creator is OWNED by this stream and deleted by this stream.
void
Init
(
SubStreamCreator
*
sub_stream_creator
,
const
RtmpRetryingClientStreamOptions
&
options
);
// @RtmpStreamBase
...
...
@@ -915,34 +954,20 @@ public:
virtual
void
OnPlayable
();
const
RtmpRetryingClientStreamOptions
&
options
()
const
{
return
_options
;
}
protected
:
~
RtmpRetryingClientStream
();
private
:
friend
class
OnSubStreamCreated
;
friend
class
InitSubStream
;
class
SubStream
:
public
RtmpClientStream
{
public
:
explicit
SubStream
(
RtmpRetryingClientStream
*
s
)
:
_parent
(
s
)
{}
// @RtmpStreamBase
void
OnMetaData
(
AMFObject
*
,
const
base
::
StringPiece
&
);
void
OnSharedObjectMessage
(
RtmpSharedObjectMessage
*
msg
);
void
OnAudioMessage
(
RtmpAudioMessage
*
msg
);
void
OnVideoMessage
(
RtmpVideoMessage
*
msg
);
void
OnFirstMessage
();
void
OnStop
();
private
:
base
::
intrusive_ptr
<
RtmpRetryingClientStream
>
_parent
;
};
int
AcquireStreamToSend
(
base
::
intrusive_ptr
<
SubStream
>*
);
void
OnSubStreamStop
(
SubStream
*
);
friend
class
RetryingClientMessageHandler
;
void
OnSubStreamStop
(
RtmpStreamBase
*
sub_stream
);
int
AcquireStreamToSend
(
base
::
intrusive_ptr
<
RtmpStreamBase
>*
);
static
void
OnRecreateTimer
(
void
*
arg
);
void
Recreate
();
void
CallOnStopIfNeeded
();
base
::
intrusive_ptr
<
SubStream
>
_using_sub_stream
;
base
::
intrusive_ptr
<
RtmpStreamBase
>
_using_sub_stream
;
base
::
intrusive_ptr
<
RtmpRetryingClientStream
>
_self_ref
;
mutable
base
::
Mutex
_stream_mutex
;
RtmpRetryingClientStreamOptions
_options
;
...
...
@@ -957,7 +982,7 @@ friend class InitSubStream;
bthread_timer_t
_create_timer_id
;
// Note: RtmpClient can be efficiently copied.
RtmpClient
_client_copy
;
RtmpClientSelector
*
_client_selec
tor
;
SubStreamCreator
*
_sub_stream_crea
tor
;
};
// Utility function to get components from rtmp_url which could be in forms of:
...
...
@@ -1012,7 +1037,7 @@ class RtmpServerStream : public RtmpStreamBase {
public
:
RtmpServerStream
();
~
RtmpServerStream
();
// Called when receiving a play request.
// Call status->set_error() when the play request is rejected.
// Call done->Run() when the play request is processed (either accepted
...
...
@@ -1048,6 +1073,7 @@ public:
// @RtmpStreamBase, sending StreamNotFound
int
SendStopMessage
(
const
base
::
StringPiece
&
error_description
);
void
Destroy
();
private
:
friend
class
policy
::
RtmpContext
;
...
...
@@ -1065,3 +1091,4 @@ friend class policy::RtmpChunkStream;
#endif // BRPC_RTMP_H
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment