#ifndef __MAS_PIPE_STATE_CHECKER_H #define __MAS_PIPE_STATE_CHECKER_H #include "MAS_AlgoDef.h" #include "ComDef.h" #include "Locks.h" #include #include NAMESPACE_MAS_BEGIN #define PIPESTATECHECKER PipeStateChecker::Instance() typedef enum __MAS_MEDIA_ENPACK { EMMP_UNKNOWN, EMMP_RTP, EMMP_FRAME, } EN_MAS_MEDIA_ENPACK; typedef enum __MAS_MEDIA_ENCODE { EMME_UNKNOWN, EMME_H264, EMME_H265, EMME_JPEG, EMME_PNG, EMME_YUV, EMME_RGB, } EN_MAS_MEDIA_ENCODE; typedef enum __MAS_MEDIA_FRAME_TYPE { EMMFT_UNKNOWN, EMMFT_I, EMMFT_P, EMMFT_B, EMMFT_OTHER, } EN_MEDIA_FRAME_TYPE; struct ExcepInfo { ExcepInfo(const std::string & name, TZ_INT threshold); ExcepInfo(const ExcepInfo & einfo); ~ExcepInfo(); TZ_INT Threshold; //阈值, 多少毫秒没有收到帧数据就进行报错 std::string Name; //界面展示name std::atomic OutputState; std::atomic Lastest; }; struct Media { Media(TZ_INT Length); Media(const Media & origin); ~Media(); TZ_INT Length; TZ_INT DataType; TZ_INT Height; TZ_INT Width; void * Mem; }; struct StraProducing { StraProducing(const std::string & key, const StraRst & rst, SPtr & media) : Key(key), Result(rst), StraMedia(media) {} ~StraProducing() {} std::string Key; StraRst Result; SPtr StraMedia; }; struct DetProducing { DetProducing(const std::string & key, const std::string & rst, SPtr & media, const DrawInfo & draw) : Key(key), Result(rst), DetMedia(media), Draw(draw) {} ~DetProducing() {} std::string Key; std::string Result; SPtr DetMedia; DrawInfo Draw; }; class StreamInfo { public: StreamInfo(); StreamInfo(TZ_HANDLE hd); StreamInfo(const StreamInfo & origin); StreamInfo(SPtr & origin); virtual ~StreamInfo(); TZ_INT SetMediaRsc(SPtr & mediaRsc); TZ_INT SetStraRst(const std::string & straKey, const StraRst & straRst, SPtr & Media); TZ_INT GetDeliverType(); void SetDeliverType(TZ_INT type); TZ_INT AddDetRst(const std::string & detKey, const std::string & detRst, SPtr & Media, const DrawInfo & draw); TZ_HANDLE GetHandle(); TZ_INT SetSeq(TZ_Uint32 seq); TZ_Uint32 GetSeq(); SPtr & GetMediaRsc(); SPtr & GetStraProducing(); std::unordered_map> & GetAllDetRst(); private: TZ_HANDLE m_hd; TZ_Uint32 m_seq; std::atomic_int m_deliverType; SPtr m_mediaRsc; SPtr m_straProducing; std::mutex m_detLock; std::unordered_map> m_detProducings; }; class IStreamPipe : public std::enable_shared_from_this { friend class PipeStateChecker; public: // PlayPipe : srcId 媒体资源id, selfId 设备id // DetectorPipe : srcId 媒体资源id, selfId 检测单元id // StrategyPipe : srcId 场景id, selfId 判定策略id // WSSStreamPipe : srcId 媒体资源id selfId wss的sessId // ScenePipe: srcId 场景id selfId - IStreamPipe(TZ_INT srcId, int64_t selfId, TZ_INT type, const ExcepInfo & einfo); virtual ~IStreamPipe(); TZ_INT Initialize(); TZ_INT Dispose(); TZ_INT AddStreamPipe(const SPtr & pipe); TZ_INT RemoveStreamPipe(TZ_HANDLE hd); TZ_INT RemoveStreamPipeBySelfId(int64_t selfId); void Switch(TZ_BOOL flag); void DeliverStream(SPtr & streamInfo, TZ_BOOL cyFlag = FALSE); TZ_INT GetType(); TZ_INT GetSrcId(); int64_t GetSelfId(); TZ_HANDLE GetHandle(); ExcepInfo GetExcepInfo(); protected: void updateExcepTime(); virtual TZ_BOOL streamFilter(SPtr & streamInfo); virtual TZ_INT pipeInitialize(); virtual TZ_INT pipeDispose(); // pos0: auto deliver or not // pos1: copy or not virtual std::tuple streamArrived( SPtr & stream) = 0; private: void streamArrivedBySwitch(SPtr & streamInfo); protected: TZ_BOOL m_inited; private: TZ_HANDLE m_handle; TZ_INT m_type; // EN_MAS_PIPE_TYPE TZ_INT m_srcId; int64_t m_selfId; std::atomic_bool m_switch; std::mutex m_nextLock; std::map> m_next; ExcepInfo m_einfo; }; class PipeStateChecker : public tzc::OSThread { public: static PipeStateChecker * Instance(); static void DestoryInstance(); public: TZ_INT RegisterPipe(const SPtr & pipe); TZ_INT RemovePipe(TZ_HANDLE handle); private: PipeStateChecker(); virtual ~PipeStateChecker(); virtual void Entry(); private: std::mutex m_pipeMapLock; std::map> m_pipes; static PipeStateChecker * _ins; static tzc::Mutex _insLock; }; NAMESPACE_MAS_END #endif