#include "MemPool.h" #include "MesgDispatcher.h" #include "StreamDef.h" #include "SysUtils.h" NAMESPACE_MAS_BEGIN #define PIPE_CHECK_DELAY_SECONDS (2) ExcepInfo::ExcepInfo( const std::string & name, TZ_INT threshold) : Name(name), Threshold(threshold), OutputState(EMPS_NO_DATA), Lastest((TIME_POINT)ZERO_POINT) { } ExcepInfo::ExcepInfo(const ExcepInfo & einfo) : Name(einfo.Name), Threshold(einfo.Threshold), OutputState(einfo.OutputState.load()), Lastest(einfo.Lastest.load()) { } ExcepInfo::~ExcepInfo() { } Media::Media(TZ_INT Length) : Length(Length), DataType(-1), Height(-1), Width(-1), Mem(nullptr) { Mem = MEMPOOL->AllocAvailMem(Length); } Media::Media(const Media & origin) : Length(origin.Length), DataType(origin.DataType), Height(origin.Height), Width(origin.Width), Mem(origin.Mem) { } Media::~Media() { if (Mem) { MEMPOOL->ReturnFreeMem(this->Mem, this->Length); Mem = nullptr; } } StreamInfo::StreamInfo() : m_hd((TZ_HANDLE)this), m_seq(0), m_deliverType(EMPT_MEDIA), m_mediaRsc(nullptr), m_straProducing(nullptr) { } StreamInfo::StreamInfo(TZ_HANDLE hd) : m_hd(hd), m_seq(0), m_deliverType(EMPT_MEDIA), m_mediaRsc(nullptr), m_straProducing(nullptr) { } StreamInfo::StreamInfo(const StreamInfo & origin) : m_hd(origin.m_hd), m_seq(origin.m_seq), m_deliverType(origin.m_deliverType.load()), m_mediaRsc(origin.m_mediaRsc), m_straProducing(nullptr) { } StreamInfo::StreamInfo(SPtr & origin) : m_hd(origin->m_hd), m_seq(origin->m_seq), m_deliverType(origin->m_deliverType.load()), m_mediaRsc(origin->m_mediaRsc), m_straProducing(nullptr) { } StreamInfo::~StreamInfo() { } TZ_INT StreamInfo::AddDetRst(const std::string & detKey, const std::string & detRst, SPtr & Media, const DrawInfo & draw) { auto prst = std::make_shared(detKey, detRst, Media, draw); std::unique_lock lock(m_detLock); m_detProducings.emplace(detKey, prst); return MEC_OK; } TZ_INT StreamInfo::SetStraRst(const std::string & straKey, const StraRst & straRst, SPtr & Media) { if (m_straProducing) { TZLogWarn("stra priducing isn't nullptr!!!"); return MEC_FAILED; } m_straProducing = std::make_shared(straKey, straRst, Media); return MEC_OK; } TZ_INT StreamInfo::GetDeliverType() { return m_deliverType.load(); } void StreamInfo::SetDeliverType(TZ_INT type) { m_deliverType.store(type); } TZ_INT StreamInfo::SetMediaRsc(SPtr & mediaRsc) { if (m_mediaRsc) { TZLogWarn("media rsc isn't nullptr!!!"); return MEC_FAILED; } m_mediaRsc = mediaRsc; return MEC_OK; } TZ_INT StreamInfo::SetSeq(TZ_Uint32 seq) { m_seq = seq; return MEC_OK; } TZ_HANDLE StreamInfo::GetHandle() { return m_hd; } TZ_Uint32 StreamInfo::GetSeq() { return m_seq; } SPtr & StreamInfo::GetMediaRsc() { return m_mediaRsc; } SPtr & StreamInfo::GetStraProducing() { return m_straProducing; } std::unordered_map> & StreamInfo::GetAllDetRst() { return m_detProducings; } IStreamPipe::IStreamPipe( TZ_INT srcId, int64_t selfId, TZ_INT type, const ExcepInfo & einfo) : m_inited(FALSE), m_type(type), m_srcId(srcId), m_selfId(selfId), m_switch(FALSE), m_einfo(einfo) { m_handle = (TZ_HANDLE)this; } IStreamPipe::~IStreamPipe() { this->Dispose(); } TZ_INT IStreamPipe::Initialize() { if (m_inited) { TZLogInfo("pipe %llu type %d has inited...", m_handle, m_type); return MEC_OK; } PIPESTATECHECKER->RegisterPipe(shared_from_this()); auto irst = this->pipeInitialize(); if (irst) return irst; m_switch.store(TRUE); m_inited = TRUE; TZLogInfo("pipe %llu type %d init succeed~~~", m_handle, m_type); return MEC_OK; } TZ_INT IStreamPipe::Dispose() { if (!m_inited) { TZLogInfo("pipe %llu type %d doesn't init...", m_handle, m_type); return MEC_OK; } m_switch.store(FALSE); PIPESTATECHECKER->RemovePipe(m_handle); this->pipeDispose(); m_inited = FALSE; TZLogInfo("pipe %llu type %d dispose succeed~~~", m_handle, m_type); return MEC_OK; } TZ_INT IStreamPipe::AddStreamPipe(const SPtr & pipe) { m_nextLock.lock(); m_next[pipe->m_handle] = pipe; m_nextLock.unlock(); TZLogInfo("add pipe %llu in %llu~~~", pipe->m_handle, this->m_handle); return MEC_OK; } TZ_INT IStreamPipe::RemoveStreamPipe(TZ_HANDLE hd) { std::unique_lock lock(m_nextLock); auto iter = m_next.find(hd); if (m_next.end() == iter) { TZLogWarn("pipe %llu does not in pipe %llu!!!", hd, m_handle); return m_next.size(); } m_next.erase(iter); return m_next.size(); } TZ_INT IStreamPipe::RemoveStreamPipeBySelfId(int64_t selfId) { std::unique_lock lock(m_nextLock); auto iter = std::find_if(m_next.begin(), m_next.end(), [selfId](std::pair> mem) { return mem.second->GetSelfId() == selfId; }); if (m_next.end() == iter) { TZLogWarn("pipe %llu can't find selfId %lld pipe!!!", m_handle, selfId); return MEC_OK; } m_next.erase(iter); TZLogInfo("pipe %llu remove selfId %lld pipe~~~", m_handle, selfId); return MEC_OK; } void IStreamPipe::Switch(TZ_BOOL flag) { m_switch.store(flag); } void IStreamPipe::DeliverStream( SPtr & streamInfo, TZ_BOOL cpFlag) { m_nextLock.lock(); std::map> next(m_next); m_nextLock.unlock(); streamInfo->SetDeliverType(m_type); for (auto & n : next) { if (cpFlag) { auto stream = std::make_shared(streamInfo); n.second->streamArrivedBySwitch(stream); continue; } n.second->streamArrivedBySwitch(streamInfo); } this->updateExcepTime(); } void IStreamPipe::updateExcepTime() { m_einfo.Lastest.store(TIME_NOW); } TZ_BOOL IStreamPipe::streamFilter(SPtr & streamInfo) { return TRUE; } TZ_INT IStreamPipe::pipeInitialize() { return MEC_OK; } TZ_INT IStreamPipe::pipeDispose() { return MEC_OK; } void IStreamPipe::streamArrivedBySwitch( SPtr & streamInfo) { if (!m_switch.load()) return; if (!this->streamFilter(streamInfo)) return; TZ_BOOL autoDeliver = FALSE, cpFlag = FALSE; std::tie(autoDeliver, cpFlag) = this->streamArrived(streamInfo); if (!autoDeliver) return; this->DeliverStream(streamInfo, cpFlag); } TZ_INT IStreamPipe::GetType() { return m_type; } int64_t IStreamPipe::GetSelfId() { return m_selfId; } TZ_INT IStreamPipe::GetSrcId() { return m_srcId; } TZ_HANDLE IStreamPipe::GetHandle() { return m_handle; } ExcepInfo IStreamPipe::GetExcepInfo() { return m_einfo; } PipeStateChecker * PipeStateChecker::_ins = nullptr; tzc::Mutex PipeStateChecker::_insLock; PipeStateChecker::PipeStateChecker() { this->Start(); } PipeStateChecker::~PipeStateChecker() { this->StopAndWait(); m_pipeMapLock.lock(); m_pipes.clear(); m_pipeMapLock.unlock(); } PipeStateChecker * PipeStateChecker::Instance() { if (!_ins) { _insLock.Lock(); if (!_ins) { _ins = new PipeStateChecker(); } _insLock.Unlock(); } return _ins; } void PipeStateChecker::DestoryInstance() { _insLock.Lock(); TZ_delete(_ins); _insLock.Unlock(); } TZ_INT PipeStateChecker::RegisterPipe(const SPtr & pipe) { std::unique_lock lock(m_pipeMapLock); if (m_pipes.end() != m_pipes.find(pipe->m_handle)) { TZLogWarn("pipe %llu registered!!!", pipe->m_handle); return MEC_DUPLICATED; } m_pipes[pipe->m_handle] = pipe; TZLogInfo("register pipe %llu~~~", pipe->m_handle); return MEC_OK; } TZ_INT PipeStateChecker::RemovePipe(TZ_HANDLE handle) { std::unique_lock lock(m_pipeMapLock); if (m_pipes.end() == m_pipes.find(handle)) { TZLogWarn("pipe %llu does not in checker!!!", handle); return MEC_NULL_OBJ; } m_pipes.erase(handle); TZLogInfo("rm pipe %llu~~~", handle); return MEC_OK; } void PipeStateChecker::Entry() { while (!this->IsStop()) { m_pipeMapLock.lock(); std::map> pipes(m_pipes); m_pipeMapLock.unlock(); for (auto iter = pipes.begin(); iter != pipes.end(); ++iter) { TZ_HANDLE handle = (*iter).first; auto pipe = (*iter).second.lock(); if (!pipe) { m_pipeMapLock.lock(); m_pipes.erase(handle); TZLogInfo("erase pipe %llu~~~", handle); m_pipeMapLock.unlock(); continue; } ExcepInfo & einfo = pipe->m_einfo; TIME_POINT now = TIME_NOW; if (std::chrono::duration_cast( now - einfo.Lastest.load()).count() < einfo.Threshold) { einfo.OutputState = EMPS_OK; continue; } if (einfo.Lastest.load() == (TIME_POINT)ZERO_POINT) { einfo.OutputState = EMPS_NO_DATA; } else { einfo.OutputState = EMPS_TIMEOUT; } } tzc::SysUtils::DelaySeconds(PIPE_CHECK_DELAY_SECONDS); } } NAMESPACE_MAS_END