Added the concept of message queue "broadcasts" and the ability to

subscribe and unsubscribe to them.
This commit is contained in:
casey langen 2017-03-21 00:37:28 -07:00
parent 6188fce615
commit a37702f5da
4 changed files with 51 additions and 1 deletions

View File

@ -45,8 +45,11 @@ namespace musik {
virtual ~IMessageQueue() { }
virtual void Post(IMessagePtr message, int64 delayMs = 0) = 0;
virtual int Remove(IMessageTarget *target, int type = -1) = 0;
virtual void Broadcast(IMessagePtr message, int64 delayMs) = 0;
virtual bool Contains(IMessageTarget *target, int type = -1) = 0;
virtual void Debounce(IMessagePtr message, int64 delayMs = 0) = 0;
virtual void RegisterForBroadcasts(IMessageTargetPtr target) = 0;
virtual void UnregisterForBroadcasts(IMessageTargetPtr target) = 0;
virtual void WaitAndDispatch() = 0;
virtual void Dispatch() = 0;
};

View File

@ -37,9 +37,13 @@
#include "IMessage.h"
namespace musik { namespace core { namespace runtime {
class IMessageTarget {
public:
virtual ~IMessageTarget() { }
virtual void ProcessMessage(IMessage &message) = 0;
};
typedef std::shared_ptr<IMessageTarget> IMessageTargetPtr;
} } }

View File

@ -124,6 +124,19 @@ void MessageQueue::Dispatch() {
}
}
void MessageQueue::RegisterForBroadcasts(IMessageTargetPtr target) {
LockT lock(this->queueMutex);
this->receivers.insert(target);
}
void MessageQueue::UnregisterForBroadcasts(IMessageTargetPtr target) {
LockT lock(this->queueMutex);
auto it = this->receivers.find(target);
if (it != this->receivers.end()) {
this->receivers.erase(it);
}
}
int MessageQueue::Remove(IMessageTarget *target, int type) {
LockT lock(this->queueMutex);
@ -169,6 +182,14 @@ bool MessageQueue::Contains(IMessageTarget *target, int type) {
return false;
}
void MessageQueue::Broadcast(IMessagePtr message, int64 delayMs) {
if (message->Target()) {
throw new std::runtime_error("broadcasts cannot have a target!");
}
this->Post(message, delayMs);
}
void MessageQueue::Post(IMessagePtr message, int64 delayMs) {
LockT lock(this->queueMutex);
@ -215,5 +236,23 @@ void MessageQueue::Debounce(IMessagePtr message, int64 delayMs) {
}
void MessageQueue::Dispatch(IMessagePtr message) {
message->Target()->ProcessMessage(*message);
if (message->Target()) {
message->Target()->ProcessMessage(*message);
}
else {
std::set<IMessageTargetPtr> copy;
{
LockT lock(this->queueMutex);
std::copy(
receivers.begin(),
receivers.end(),
std::inserter(copy, copy.begin()));
}
for (auto receiver : copy) {
receiver->ProcessMessage(*message);
}
}
}

View File

@ -49,9 +49,12 @@ namespace musik { namespace core { namespace runtime {
virtual ~MessageQueue();
virtual void Post(IMessagePtr message, int64 delayMs = 0);
virtual void Broadcast(IMessagePtr message, int64 messageMs);
virtual int Remove(IMessageTarget *target, int type = -1);
virtual bool Contains(IMessageTarget *target, int type = -1);
virtual void Debounce(IMessagePtr message, int64 delayMs = 0);
virtual void RegisterForBroadcasts(IMessageTargetPtr target);
virtual void UnregisterForBroadcasts(IMessageTargetPtr target);
virtual void WaitAndDispatch();
virtual void Dispatch();
@ -69,6 +72,7 @@ namespace musik { namespace core { namespace runtime {
std::mutex queueMutex;
std::list<EnqueuedMessage*> queue;
std::list<EnqueuedMessage*> dispatch;
std::set<IMessageTargetPtr> receivers;
std::condition_variable_any waitForDispatch;
std::atomic<int64> nextMessageTime;