#include "MesgDispatcher.h" #include NAMESPACE_MAS_BEGIN MesgThread::MesgThread(TZ_INT tid) : m_tid(tid) { this->Start(); } MesgThread::~MesgThread() { this->StopAndWait(); m_eventsLock.Lock(); m_pendingMesgs.clear(); m_eventsLock.Unlock(); m_insMapLock.Lock(); m_insMap.clear(); m_insMapLock.Unlock(); } void MesgThread::RegisterIListerner( IListerner * listener) { tzc::ScopedLock lock(m_insMapLock); auto iter = m_insMap.find(listener); if (m_insMap.end() == iter) { m_insMap.emplace(listener, listener->GetRegMesg()); } else { iter->second = listener->GetRegMesg(); } } TZ_INT MesgThread::PushMesg(SPtr & mesg) { m_eventsLock.Lock(); m_pendingMesgs.emplace_back(mesg); m_eventsLock.Unlock(); m_eventSema.Signal(); return MEC_OK; } SPtr MesgThread::getFrontEvent() { m_eventsLock.Lock(); auto info = m_pendingMesgs.front(); m_pendingMesgs.pop_front(); m_eventsLock.Unlock(); return info; } void MesgThread::dispatcherEvent(SPtr & info) { std::list eventList; m_insMapLock.Lock(); for (auto mem : m_insMap) { if ((mem.second & info->MesgType) != info->MesgType) { continue; } eventList.emplace_back(mem.first); } m_insMapLock.Unlock(); for (IListerner * listerner : eventList) { listerner->HandleMesg(info); } } void MesgThread::Entry() { SPtr info = nullptr; while (!this->IsStop()) { if (!m_eventSema.Wait(EVENT_WAIT_MSECOND)) continue; info = this->getFrontEvent(); if (!info) continue; this->dispatcherEvent(info); } } MesgDispatcher * MesgDispatcher::_ins = nullptr; tzc::Mutex MesgDispatcher::_insLock; MesgDispatcher::MesgDispatcher() { for (TZ_INT i; i < 4; ++i) { auto t = new MesgThread(i + 1); m_mesgThreads.emplace(t->GetTid(), t); } } MesgDispatcher::~MesgDispatcher() { for (auto mem : m_mesgThreads) { TZ_delete(mem.second); } m_mesgThreads.clear(); } MesgDispatcher * MesgDispatcher::Instance() { if (!_ins) { _insLock.Lock(); if (!_ins) { _ins = new MesgDispatcher(); } _insLock.Unlock(); } return _ins; } void MesgDispatcher::DestoryInstance() { _insLock.Lock(); TZ_delete(_ins); _insLock.Unlock(); } TZ_INT MesgDispatcher::RegisterIListerner( IListerner * listener) { for (auto mem : m_mesgThreads) { mem.second->RegisterIListerner(listener); } TZLogInfo("register listerner succeed~~~"); return MEC_OK; } TZ_INT MesgDispatcher::PushMesg( TZ_INT tid, SPtr & mesg) { MesgThread * t = nullptr; if (!tid) t = this->chooseThread(); else { auto iter = m_mesgThreads.find(tid); if (m_mesgThreads.end() == iter) { TZLogWarn("invalid tid %d!!!", tid); return 0; } t = iter->second; } t->PushMesg(mesg); return t->GetTid(); } MesgThread * MesgDispatcher::chooseThread() { TZ_INT mTid = 0; TZ_INT pendingCnt = std::numeric_limits::max(); for (auto mem : m_mesgThreads) { auto temp = mem.second->GetPendingCnt(); if (temp <= pendingCnt) { pendingCnt = temp; mTid = mem.first; } } return m_mesgThreads[mTid]; } NAMESPACE_MAS_END