Commit f6ece731 authored by zhujiashun's avatar zhujiashun

add CuePoint support in RTMP

parent c744500a
...@@ -2227,6 +2227,25 @@ bool RtmpChunkStream::OnDataMessageAMF0( ...@@ -2227,6 +2227,25 @@ bool RtmpChunkStream::OnDataMessageAMF0(
} }
stream->CallOnMetaData(&metadata, name); stream->CallOnMetaData(&metadata, name);
return true; return true;
} else if (name == RTMP_AMF0_ON_CUE_POINT) {
if (istream.check_emptiness()) {
return false;
}
RtmpCuePoint cuepoint;
cuepoint.timestamp = mh.timestamp;
if (!ReadAMFObject(&cuepoint.data, &istream)) {
RTMP_ERROR(socket, mh) << "Fail to read cuepoint";
return false;
}
// TODO: execq?
butil::intrusive_ptr<RtmpStreamBase> stream;
if (!connection_context()->FindMessageStream(mh.stream_id, &stream)) {
LOG_EVERY_SECOND(WARNING) << socket->remote_side()
<< ": Fail to find stream_id=" << mh.stream_id;
return false;
}
stream->CallOnCuePoint(&cuepoint);
return true;
} else if (name == RTMP_AMF0_DATA_SAMPLE_ACCESS) { } else if (name == RTMP_AMF0_DATA_SAMPLE_ACCESS) {
return true; return true;
} else if (name == RTMP_AMF0_COMMAND_ON_STATUS) { } else if (name == RTMP_AMF0_COMMAND_ON_STATUS) {
......
...@@ -112,6 +112,7 @@ const char* messagetype2str(uint8_t); ...@@ -112,6 +112,7 @@ const char* messagetype2str(uint8_t);
#define RTMP_AMF0_COMMAND_CALL "call" #define RTMP_AMF0_COMMAND_CALL "call"
#define RTMP_AMF0_SET_DATAFRAME "@setDataFrame" #define RTMP_AMF0_SET_DATAFRAME "@setDataFrame"
#define RTMP_AMF0_ON_META_DATA "onMetaData" #define RTMP_AMF0_ON_META_DATA "onMetaData"
#define RTMP_AMF0_ON_CUE_POINT "onCuePoint"
#define RTMP_AMF0_SAMPLE_ACCESS "|RtmpSampleAccess" #define RTMP_AMF0_SAMPLE_ACCESS "|RtmpSampleAccess"
#define RTMP_INFO_LEVEL_STATUS "status" #define RTMP_INFO_LEVEL_STATUS "status"
......
...@@ -119,17 +119,7 @@ butil::Status FlvWriter::Write(const RtmpAudioMessage& msg) { ...@@ -119,17 +119,7 @@ butil::Status FlvWriter::Write(const RtmpAudioMessage& msg) {
return butil::Status::OK(); return butil::Status::OK();
} }
butil::Status FlvWriter::Write(const RtmpMetaData& metadata) { butil::Status FlvWriter::WriteScriptData(const butil::IOBuf& req_buf, uint32_t timestamp) {
butil::IOBuf req_buf;
{
butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
AMFOutputStream ostream(&zc_stream);
WriteAMFString(RTMP_AMF0_ON_META_DATA, &ostream);
WriteAMFObject(metadata.data, &ostream);
if (!ostream.good()) {
return butil::Status(EINVAL, "Fail to serialize metadata");
}
}
char buf[32]; char buf[32];
char* p = buf; char* p = buf;
if (!_write_header) { if (!_write_header) {
...@@ -141,8 +131,8 @@ butil::Status FlvWriter::Write(const RtmpMetaData& metadata) { ...@@ -141,8 +131,8 @@ butil::Status FlvWriter::Write(const RtmpMetaData& metadata) {
// FLV tag // FLV tag
*p++ = FLV_TAG_SCRIPT_DATA; *p++ = FLV_TAG_SCRIPT_DATA;
policy::WriteBigEndian3Bytes(&p, req_buf.size()); policy::WriteBigEndian3Bytes(&p, req_buf.size());
policy::WriteBigEndian3Bytes(&p, (metadata.timestamp & 0xFFFFFF)); policy::WriteBigEndian3Bytes(&p, (timestamp & 0xFFFFFF));
*p++ = (metadata.timestamp >> 24) & 0xFF; *p++ = (timestamp >> 24) & 0xFF;
policy::WriteBigEndian3Bytes(&p, 0); // StreamID policy::WriteBigEndian3Bytes(&p, 0); // StreamID
_buf->append(buf, p - buf); _buf->append(buf, p - buf);
_buf->append(req_buf); _buf->append(req_buf);
...@@ -153,6 +143,35 @@ butil::Status FlvWriter::Write(const RtmpMetaData& metadata) { ...@@ -153,6 +143,35 @@ butil::Status FlvWriter::Write(const RtmpMetaData& metadata) {
return butil::Status::OK(); return butil::Status::OK();
} }
butil::Status FlvWriter::Write(const RtmpCuePoint& cuepoint) {
butil::IOBuf req_buf;
{
butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
AMFOutputStream ostream(&zc_stream);
WriteAMFString(RTMP_AMF0_SET_DATAFRAME, &ostream);
WriteAMFString(RTMP_AMF0_ON_CUE_POINT, &ostream);
WriteAMFObject(cuepoint.data, &ostream);
if (!ostream.good()) {
return butil::Status(EINVAL, "Fail to serialize cuepoint");
}
}
return WriteScriptData(req_buf, cuepoint.timestamp);
}
butil::Status FlvWriter::Write(const RtmpMetaData& metadata) {
butil::IOBuf req_buf;
{
butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
AMFOutputStream ostream(&zc_stream);
WriteAMFString(RTMP_AMF0_ON_META_DATA, &ostream);
WriteAMFObject(metadata.data, &ostream);
if (!ostream.good()) {
return butil::Status(EINVAL, "Fail to serialize metadata");
}
}
return WriteScriptData(req_buf, metadata.timestamp);
}
FlvReader::FlvReader(butil::IOBuf* buf) FlvReader::FlvReader(butil::IOBuf* buf)
: _read_header(false), _buf(buf) { : _read_header(false), _buf(buf) {
} }
...@@ -1262,6 +1281,22 @@ int RtmpStreamBase::SendControlMessage( ...@@ -1262,6 +1281,22 @@ int RtmpStreamBase::SendControlMessage(
return _rtmpsock->Write(msg); return _rtmpsock->Write(msg);
} }
int RtmpStreamBase::SendCuePoint(const RtmpCuePoint& cuepoint) {
butil::IOBuf req_buf;
{
butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
AMFOutputStream ostream(&zc_stream);
WriteAMFString(RTMP_AMF0_SET_DATAFRAME, &ostream);
WriteAMFString(RTMP_AMF0_ON_CUE_POINT, &ostream);
WriteAMFObject(cuepoint.data, &ostream);
if (!ostream.good()) {
LOG(ERROR) << "Fail to serialize cuepoint";
return -1;
}
}
return SendMessage(cuepoint.timestamp, policy::RTMP_MESSAGE_DATA_AMF0, req_buf);
}
int RtmpStreamBase::SendMetaData(const RtmpMetaData& metadata, int RtmpStreamBase::SendMetaData(const RtmpMetaData& metadata,
const butil::StringPiece& name) { const butil::StringPiece& name) {
butil::IOBuf req_buf; butil::IOBuf req_buf;
...@@ -1441,6 +1476,11 @@ void RtmpStreamBase::OnUserData(void*) { ...@@ -1441,6 +1476,11 @@ void RtmpStreamBase::OnUserData(void*) {
<< "] ignored UserData{}"; << "] ignored UserData{}";
} }
void RtmpStreamBase::OnCuePoint(RtmpCuePoint* cuepoint) {
LOG(INFO) << remote_side() << '[' << stream_id()
<< "] ignored CuePoint{" << cuepoint->data << '}';
}
void RtmpStreamBase::OnMetaData(RtmpMetaData* metadata, const butil::StringPiece& name) { void RtmpStreamBase::OnMetaData(RtmpMetaData* metadata, const butil::StringPiece& name) {
LOG(INFO) << remote_side() << '[' << stream_id() LOG(INFO) << remote_side() << '[' << stream_id()
<< "] ignored MetaData{" << metadata->data << '}' << "] ignored MetaData{" << metadata->data << '}'
...@@ -1500,6 +1540,13 @@ void RtmpStreamBase::CallOnUserData(void* data) { ...@@ -1500,6 +1540,13 @@ void RtmpStreamBase::CallOnUserData(void* data) {
} }
} }
void RtmpStreamBase::CallOnCuePoint(RtmpCuePoint* obj) {
if (BeginProcessingMessage("OnCuePoint()")) {
OnCuePoint(obj);
EndProcessingMessage();
}
}
void RtmpStreamBase::CallOnMetaData(RtmpMetaData* obj, const butil::StringPiece& name) { void RtmpStreamBase::CallOnMetaData(RtmpMetaData* obj, const butil::StringPiece& name) {
if (BeginProcessingMessage("OnMetaData()")) { if (BeginProcessingMessage("OnMetaData()")) {
OnMetaData(obj, name); OnMetaData(obj, name);
...@@ -2241,6 +2288,10 @@ void RetryingClientMessageHandler::OnUserData(void* msg) { ...@@ -2241,6 +2288,10 @@ void RetryingClientMessageHandler::OnUserData(void* msg) {
_parent->CallOnUserData(msg); _parent->CallOnUserData(msg);
} }
void RetryingClientMessageHandler::OnCuePoint(brpc::RtmpCuePoint* cuepoint) {
_parent->CallOnCuePoint(cuepoint);
}
void RetryingClientMessageHandler::OnMetaData(brpc::RtmpMetaData* metadata, const butil::StringPiece& name) { void RetryingClientMessageHandler::OnMetaData(brpc::RtmpMetaData* metadata, const butil::StringPiece& name) {
_parent->CallOnMetaData(metadata, name); _parent->CallOnMetaData(metadata, name);
} }
...@@ -2403,6 +2454,14 @@ int RtmpRetryingClientStream::AcquireStreamToSend( ...@@ -2403,6 +2454,14 @@ int RtmpRetryingClientStream::AcquireStreamToSend(
return 0; return 0;
} }
int RtmpRetryingClientStream::SendCuePoint(const RtmpCuePoint& obj) {
butil::intrusive_ptr<RtmpStreamBase> ptr;
if (AcquireStreamToSend(&ptr) != 0) {
return -1;
}
return ptr->SendCuePoint(obj);
}
int RtmpRetryingClientStream::SendMetaData(const RtmpMetaData& obj, const butil::StringPiece& name) { int RtmpRetryingClientStream::SendMetaData(const RtmpMetaData& obj, const butil::StringPiece& name) {
butil::intrusive_ptr<RtmpStreamBase> ptr; butil::intrusive_ptr<RtmpStreamBase> ptr;
if (AcquireStreamToSend(&ptr) != 0) { if (AcquireStreamToSend(&ptr) != 0) {
......
...@@ -370,6 +370,11 @@ struct RtmpMetaData { ...@@ -370,6 +370,11 @@ struct RtmpMetaData {
AMFObject data; AMFObject data;
}; };
struct RtmpCuePoint {
uint32_t timestamp;
AMFObject data;
};
struct RtmpSharedObjectMessage { struct RtmpSharedObjectMessage {
// Not implemented yet. // Not implemented yet.
}; };
...@@ -385,10 +390,15 @@ public: ...@@ -385,10 +390,15 @@ public:
// Start appending FLV tags into the buffer // Start appending FLV tags into the buffer
explicit FlvWriter(butil::IOBuf* buf); explicit FlvWriter(butil::IOBuf* buf);
// Append a video/audio/metadata message into the output buffer. // Append a video/audio/metadata/cuepoint message into the output buffer.
butil::Status Write(const RtmpVideoMessage&); butil::Status Write(const RtmpVideoMessage&);
butil::Status Write(const RtmpAudioMessage&); butil::Status Write(const RtmpAudioMessage&);
butil::Status Write(const RtmpMetaData&); butil::Status Write(const RtmpMetaData&);
butil::Status Write(const RtmpCuePoint&);
private:
butil::Status WriteScriptData(const butil::IOBuf& req_buf, uint32_t timestamp);
private: private:
bool _write_header; bool _write_header;
butil::IOBuf* _buf; butil::IOBuf* _buf;
...@@ -510,12 +520,13 @@ public: ...@@ -510,12 +520,13 @@ public:
// simultaneously. // simultaneously.
// NOTE: Inputs can be modified and consumed. // NOTE: Inputs can be modified and consumed.
virtual void OnUserData(void* msg); virtual void OnUserData(void* msg);
virtual void OnCuePoint(RtmpCuePoint*);
virtual void OnMetaData(RtmpMetaData*, const butil::StringPiece&); virtual void OnMetaData(RtmpMetaData*, const butil::StringPiece&);
virtual void OnSharedObjectMessage(RtmpSharedObjectMessage* msg); virtual void OnSharedObjectMessage(RtmpSharedObjectMessage* msg);
virtual void OnAudioMessage(RtmpAudioMessage* msg); virtual void OnAudioMessage(RtmpAudioMessage* msg);
virtual void OnVideoMessage(RtmpVideoMessage* msg); virtual void OnVideoMessage(RtmpVideoMessage* msg);
// Will be called in the same thread before any OnMetaData/ // Will be called in the same thread before any OnMetaData/OnCuePoint
// OnSharedObjectMessage/OnAudioMessage/OnVideoMessage are called. // OnSharedObjectMessage/OnAudioMessage/OnVideoMessage are called.
virtual void OnFirstMessage(); virtual void OnFirstMessage();
...@@ -526,6 +537,7 @@ public: ...@@ -526,6 +537,7 @@ public:
// Send media messages to the peer. // Send media messages to the peer.
// Returns 0 on success, -1 otherwise. // Returns 0 on success, -1 otherwise.
virtual int SendCuePoint(const RtmpCuePoint&);
virtual int SendMetaData(const RtmpMetaData&, virtual int SendMetaData(const RtmpMetaData&,
const butil::StringPiece& name = "onMetaData"); const butil::StringPiece& name = "onMetaData");
virtual int SendSharedObjectMessage(const RtmpSharedObjectMessage& msg); virtual int SendSharedObjectMessage(const RtmpSharedObjectMessage& msg);
...@@ -571,7 +583,7 @@ public: ...@@ -571,7 +583,7 @@ public:
bool is_paused() const { return _paused; } bool is_paused() const { return _paused; }
// True if OnMetaData or OnXXXMessage() was ever called. // True if OnMetaData/OnCuePoint/OnXXXMessage() was ever called.
bool has_data_ever() const { return _has_data_ever; } bool has_data_ever() const { return _has_data_ever; }
// The underlying socket for reading/writing. // The underlying socket for reading/writing.
...@@ -603,6 +615,7 @@ friend class policy::OnServerStreamCreated; ...@@ -603,6 +615,7 @@ friend class policy::OnServerStreamCreated;
bool BeginProcessingMessage(const char* fun_name); bool BeginProcessingMessage(const char* fun_name);
void EndProcessingMessage(); void EndProcessingMessage();
void CallOnUserData(void* data); void CallOnUserData(void* data);
void CallOnCuePoint(RtmpCuePoint*);
void CallOnMetaData(RtmpMetaData*, const butil::StringPiece&); void CallOnMetaData(RtmpMetaData*, const butil::StringPiece&);
void CallOnSharedObjectMessage(RtmpSharedObjectMessage* msg); void CallOnSharedObjectMessage(RtmpSharedObjectMessage* msg);
void CallOnAudioMessage(RtmpAudioMessage* msg); void CallOnAudioMessage(RtmpAudioMessage* msg);
...@@ -612,7 +625,7 @@ friend class policy::OnServerStreamCreated; ...@@ -612,7 +625,7 @@ friend class policy::OnServerStreamCreated;
bool _is_client; bool _is_client;
bool _paused; // Only used by RtmpServerStream bool _paused; // Only used by RtmpServerStream
bool _stopped; // True when OnStop() was called. bool _stopped; // True when OnStop() was called.
bool _processing_msg; // True when OnXXXMessage/OnMetaData are called. bool _processing_msg; // True when OnXXXMessage/OnMetaData/OnCuePoint are called.
bool _has_data_ever; bool _has_data_ever;
uint32_t _message_stream_id; uint32_t _message_stream_id;
uint32_t _chunk_stream_id; uint32_t _chunk_stream_id;
...@@ -874,6 +887,7 @@ class RtmpMessageHandler { ...@@ -874,6 +887,7 @@ class RtmpMessageHandler {
public: public:
virtual void OnPlayable() = 0; virtual void OnPlayable() = 0;
virtual void OnUserData(void*) = 0; virtual void OnUserData(void*) = 0;
virtual void OnCuePoint(brpc::RtmpCuePoint* cuepoint) = 0;
virtual void OnMetaData(brpc::RtmpMetaData* metadata, const butil::StringPiece& name) = 0; virtual void OnMetaData(brpc::RtmpMetaData* metadata, const butil::StringPiece& name) = 0;
virtual void OnAudioMessage(brpc::RtmpAudioMessage* msg) = 0; virtual void OnAudioMessage(brpc::RtmpAudioMessage* msg) = 0;
virtual void OnVideoMessage(brpc::RtmpVideoMessage* msg) = 0; virtual void OnVideoMessage(brpc::RtmpVideoMessage* msg) = 0;
...@@ -891,6 +905,7 @@ public: ...@@ -891,6 +905,7 @@ public:
void OnPlayable(); void OnPlayable();
void OnUserData(void*); void OnUserData(void*);
void OnCuePoint(brpc::RtmpCuePoint* cuepoint);
void OnMetaData(brpc::RtmpMetaData* metadata, const butil::StringPiece& name); void OnMetaData(brpc::RtmpMetaData* metadata, const butil::StringPiece& name);
void OnAudioMessage(brpc::RtmpAudioMessage* msg); void OnAudioMessage(brpc::RtmpAudioMessage* msg);
void OnVideoMessage(brpc::RtmpVideoMessage* msg); void OnVideoMessage(brpc::RtmpVideoMessage* msg);
...@@ -934,6 +949,7 @@ public: ...@@ -934,6 +949,7 @@ public:
// If the stream is recreated, following methods may return -1 and set // If the stream is recreated, following methods may return -1 and set
// errno to ERTMPPUBLISHABLE for once. (so that users can be notified to // errno to ERTMPPUBLISHABLE for once. (so that users can be notified to
// resend metadata or header messages). // resend metadata or header messages).
int SendCuePoint(const RtmpCuePoint&);
int SendMetaData(const RtmpMetaData&, int SendMetaData(const RtmpMetaData&,
const butil::StringPiece& name = "onMetaData"); const butil::StringPiece& name = "onMetaData");
int SendSharedObjectMessage(const RtmpSharedObjectMessage& msg); int SendSharedObjectMessage(const RtmpSharedObjectMessage& msg);
...@@ -949,7 +965,7 @@ public: ...@@ -949,7 +965,7 @@ public:
void StopCurrentStream(); void StopCurrentStream();
// If a sub stream was created, this method will be called in the same // If a sub stream was created, this method will be called in the same
// thread before any OnMetaData/OnSharedObjectMessage/OnAudioMessage/ // thread before any OnMetaData/OnCuePoint/OnSharedObjectMessage/OnAudioMessage/
// OnVideoMessage are called. // OnVideoMessage are called.
virtual void OnPlayable(); virtual void OnPlayable();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment