123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- #include "MesgDispatcher.h"
- #include <limits>
- NAMESPACE_MAS_BEGIN
- MesgThread::MesgThread(TZ_INT tid) : m_tid(tid)
- {
- this->Start();
- }
- MesgThread::~MesgThread()
- {
- this->StopAndWait();
-
- m_eventsLock.Lock();
- m_pendingMesgs.clear();
- m_eventsLock.Unlock();
- m_insMapLock.Lock();
- m_insMap.clear();
- m_insMapLock.Unlock();
- }
- void MesgThread::RegisterIListerner(
- IListerner * listener)
- {
- tzc::ScopedLock lock(m_insMapLock);
- auto iter = m_insMap.find(listener);
- if (m_insMap.end() == iter)
- {
- m_insMap.emplace(listener, listener->GetRegMesg());
- }
- else
- {
- iter->second = listener->GetRegMesg();
- }
- }
- TZ_INT MesgThread::PushMesg(SPtr<MasMesg> & mesg)
- {
- m_eventsLock.Lock();
- m_pendingMesgs.emplace_back(mesg);
- m_eventsLock.Unlock();
- m_eventSema.Signal();
- return MEC_OK;
- }
- SPtr<MasMesg> MesgThread::getFrontEvent()
- {
- m_eventsLock.Lock();
- auto info = m_pendingMesgs.front();
- m_pendingMesgs.pop_front();
- m_eventsLock.Unlock();
- return info;
- }
- void MesgThread::dispatcherEvent(SPtr<MasMesg> & info)
- {
- std::list<IListerner *> eventList;
- m_insMapLock.Lock();
- for (auto mem : m_insMap)
- {
- if ((mem.second & info->MesgType) != info->MesgType)
- {
- continue;
- }
-
- eventList.emplace_back(mem.first);
- }
- m_insMapLock.Unlock();
- for (IListerner * listerner : eventList)
- {
- listerner->HandleMesg(info);
- }
- }
- void MesgThread::Entry()
- {
- SPtr<MasMesg> info = nullptr;
- while (!this->IsStop())
- {
- if (!m_eventSema.Wait(EVENT_WAIT_MSECOND)) continue;
-
- info = this->getFrontEvent();
- if (!info) continue;
-
- this->dispatcherEvent(info);
- }
- }
- MesgDispatcher * MesgDispatcher::_ins = nullptr;
- tzc::Mutex MesgDispatcher::_insLock;
- MesgDispatcher::MesgDispatcher()
- {
- for (TZ_INT i; i < 4; ++i)
- {
- auto t = new MesgThread(i + 1);
- m_mesgThreads.emplace(t->GetTid(), t);
- }
- }
- MesgDispatcher::~MesgDispatcher()
- {
- for (auto mem : m_mesgThreads)
- {
- TZ_delete(mem.second);
- }
- m_mesgThreads.clear();
- }
- MesgDispatcher * MesgDispatcher::Instance()
- {
- if (!_ins)
- {
- _insLock.Lock();
- if (!_ins)
- {
- _ins = new MesgDispatcher();
- }
- _insLock.Unlock();
- }
-
- return _ins;
- }
- void MesgDispatcher::DestoryInstance()
- {
- _insLock.Lock();
- TZ_delete(_ins);
- _insLock.Unlock();
- }
- TZ_INT MesgDispatcher::RegisterIListerner(
- IListerner * listener)
- {
- for (auto mem : m_mesgThreads)
- {
- mem.second->RegisterIListerner(listener);
- }
- TZLogInfo("register listerner succeed~~~");
- return MEC_OK;
- }
- TZ_INT MesgDispatcher::PushMesg(
- TZ_INT tid, SPtr<MasMesg> & mesg)
- {
- MesgThread * t = nullptr;
- if (!tid) t = this->chooseThread();
- else
- {
- auto iter = m_mesgThreads.find(tid);
- if (m_mesgThreads.end() == iter)
- {
- TZLogWarn("invalid tid %d!!!", tid);
- return 0;
- }
-
- t = iter->second;
- }
- t->PushMesg(mesg);
- return t->GetTid();
- }
- MesgThread * MesgDispatcher::chooseThread()
- {
- TZ_INT mTid = 0;
- TZ_INT pendingCnt = std::numeric_limits<TZ_INT>::max();
- for (auto mem : m_mesgThreads)
- {
- auto temp = mem.second->GetPendingCnt();
- if (temp <= pendingCnt)
- {
- pendingCnt = temp;
- mTid = mem.first;
- }
- }
- return m_mesgThreads[mTid];
- }
- NAMESPACE_MAS_END
|