MessageQueue implementation now uses checked weak_ptr receivers for

broadcasts. This solves an issue where the message queue may prevent
components (like views) from getting cleaned up if they don't unregister
before they are discarded.
This commit is contained in:
casey langen 2018-01-29 09:08:54 -08:00
parent 5df2881a46
commit 4c758af5ac
3 changed files with 38 additions and 5 deletions

View File

@ -132,7 +132,8 @@ void MessageQueue::RegisterForBroadcasts(IMessageTargetPtr target) {
void MessageQueue::UnregisterForBroadcasts(IMessageTarget *target) { void MessageQueue::UnregisterForBroadcasts(IMessageTarget *target) {
LockT lock(this->queueMutex); LockT lock(this->queueMutex);
for (auto it : this->receivers) { for (auto it : this->receivers) {
if (it.get() == target) { auto shared = it.lock();
if (shared && shared.get() == target) {
this->receivers.erase(it); this->receivers.erase(it);
return; return;
} }
@ -242,19 +243,40 @@ void MessageQueue::Dispatch(IMessagePtr message) {
message->Target()->ProcessMessage(*message); message->Target()->ProcessMessage(*message);
} }
else { else {
std::set<IMessageTargetPtr> copy; std::set<IWeakMessageTarget, WeakPtrLess> copy;
/* copy to dispatch outside of a lock */
{ {
LockT lock(this->queueMutex); LockT lock(this->queueMutex);
std::copy( std::copy(
receivers.begin(), receivers.begin(),
receivers.end(), receivers.end(),
std::inserter(copy, copy.begin())); std::inserter(copy, copy.begin()));
} }
/* dispatch */
bool prune = false;
for (auto receiver : copy) { for (auto receiver : copy) {
receiver->ProcessMessage(*message); auto shared = receiver.lock();
if (shared) {
shared->ProcessMessage(*message);
}
else {
prune = true;
}
}
if (prune) { /* at least one of our weak_ptrs is dead. */
LockT lock(this->queueMutex);
auto it = this->receivers.begin();
while (it != this->receivers.end()) {
if (it->expired()) {
it = this->receivers.erase(it);
}
else {
++it;
}
}
} }
} }
} }

View File

@ -65,15 +65,25 @@ namespace musik { namespace core { namespace runtime {
} }
private: private:
typedef std::weak_ptr<IMessageTarget> IWeakMessageTarget;
struct EnqueuedMessage { struct EnqueuedMessage {
IMessagePtr message; IMessagePtr message;
std::chrono::milliseconds time; std::chrono::milliseconds time;
}; };
struct WeakPtrLess { /* https://stackoverflow.com/a/12875729 */
template <typename T>
bool operator() (const std::weak_ptr<T>& l, const std::weak_ptr<T>& r) const {
return l.lock().get() < r.lock().get();
}
};
std::mutex queueMutex; std::mutex queueMutex;
std::list<EnqueuedMessage*> queue; std::list<EnqueuedMessage*> queue;
std::list<EnqueuedMessage*> dispatch; std::list<EnqueuedMessage*> dispatch;
std::set<IMessageTargetPtr> receivers; std::set<IWeakMessageTarget, WeakPtrLess> receivers;
std::condition_variable_any waitForDispatch; std::condition_variable_any waitForDispatch;
std::atomic<int64_t> nextMessageTime; std::atomic<int64_t> nextMessageTime;

View File

@ -544,6 +544,7 @@ void SettingsLayout::OnAddedToParent(IWindow* parent) {
auto receiver = this; auto receiver = this;
#endif #endif
MessageQueue().RegisterForBroadcasts(receiver->shared_from_this()); MessageQueue().RegisterForBroadcasts(receiver->shared_from_this());
MessageQueue().RegisterForBroadcasts(receiver->shared_from_this());
} }
void SettingsLayout::OnRemovedFromParent(IWindow* parent) { void SettingsLayout::OnRemovedFromParent(IWindow* parent) {