Jump to content

Recommended Posts

Attached is a sketch of a publish-subscribe solution inspired by the design of the native queue functions. Open the project and launch the Simple Demo.vi to get an idea of the concept (or create a few instances of the example templates and run those).
 
As you know there are *lots* of messaging solutions out there that also support many to many communication, many of which are much more advanced. They are often based on user events, and LVOOP. I have still been missing a solution that is as easy and intuitive to use as queues though. 
 
I also wanted to overcome a few other issues with user events:
 
- I do not like to always have to have an event case for each subscriber.
- I want each subscriber to be able to preview or flush its incoming data stream...(not fully implemented yet, but logically simple to do)
- and I want to be able to keep a channel open for as long as anyone is interested in using it, 
not die when the process thathappened to create it goes idle(which is what happens with queues and user events...).
 
I am publishing this to get some feedback on the concept (as it is now it has lots of missing logic and the design is not thought that much through). 
 
Do you find it useful or just silly?  Are there other solutions out there that are just as simple? 
 
PS - The original attachment was LV2014, I have now added a 2012 version as well.

MultiQ - a sketch of a many to many (pub-sub) messaging solution.zip

MultiQ LV2012.zip

Edited by Mads
Link to comment

I had a quick look.

 

I thought it was very over-engineered and offered nos ignificant advantage over simply using the Queue primitives themselves but then I reailsed that you force ALL messages to ALL subscribers, something which Queues normally don't do.

 

If you want to present different Queues for Publishing and Subscribing, you can utilise Event Callbacks for the broadcasting / subscription / unsubscription instead of having to code it yourself.

 

I have half a working version running, maybe I can post it tomorrow.

Link to comment

I am not sure you have understood the whole concept/goal; namely to have lossless broadcast channels, with a simple interface (made to resemble the regular "consume only once" queues). Perhaps the demo makes it unclear as it focuses mostly on how simple the API is, and not on real use cases? 

Here is an old thread that discusses the issue.

User events are now the closest existing and yes, simpler alternative (many can generate the events and all subscribers (which have registered for the user event) get their copy of the event. MultiQ tries to remove some particular drawbacks of a user event based solution though - as mentioned in the original post. 

Other similar solutions are I would say much more engineered and have a more complex interface (because they typically aim to solve additional issues), but maybe I am wrong. Implementation-wise this is a sloppy sketch. I use a class just as a fancy cluster/way to bundle functionality for example, and the notifier used to send feedback to the create/destroy funtions can be replaced by something easier and more robust for example, sure.

I can get around the non-persistence/link to the original creator of user events by having them all generated by a separate process (as MultiQ does with its internal queues), and then only the flush/preview/status functions remain for such a solution. The access to the internal event queue is more limited though, although it opened up a bit in 2014 (I have made a sketch of something like that already, but have not published it yet).

Edited by Mads
Link to comment
- and I want to be able to keep a channel open for as long as anyone is interested in using it, 
not die when the process thathappened to create it goes idle(which is what happens with queues and user events...).

 

 

This is desirable in my apps too. I created a queue wrapper (umpteen years ago) that ensures there is always exactly one of any queue and is created when any queue function is called thereby removing the Init, Do Something. Destroy overhead and memory runaway from creating refs. Destroying the queue merely has the effect of clearing the queue which is reconstituted on the next call,

 

Queue.vi

Edited by ShaunR
Link to comment

User events are now the closest existing and yes, simpler alternative (many can generate the events and all subscribers (which have registered for the user event) get their copy of the event. MultiQ tries to remove some particular drawbacks of a user event based solution though - as mentioned in the original post. 

Oh this is why I would make use of Callbacks.  You can delegate handling of the event to a callback VI which can send the new data via Queue, Notifier, Occurrence and global variable, file, Smoke signals, whatever.

 

I would still recommend using a queue interface for the user (programmer) but leveraging the subscribr / unsubscribe feature of events purely for the act of distribution to N listeners.

Link to comment

Back in 2011 I posted a framework 

https://lavag.org/topic/14566-message-routing-architecture/

 

It was inspired by a CLA presentation by Rob Humfeld (Who is the guy that taught me LabVIEW) called JAMA

 

The WOW factor for me was when he open 2 separate vis one generating data and one displaying data.

He created a third to detect spikes and ran it without stopping the other 2. The module ran and started receiving the waveform data and detected spikes. 

After that he stopped the vi made changes and started it back up again without stopping the 2 running vis.

 

This was the type of architecture I was looking for so I took the ideas he had wrapped in an OOP framework.

Hopefully the graphics and example code explain the way it works.

 

The power of this  framework is modules are totally decoupled from each other and do not depend on any other module.

They created, manage and destroy their own courier (transport mechanism) 

They only receive messages that they register for.

Any module can send any message at any time this makes creating simulator quick and easy.

 

I find this framework especially useful when building test systems.

  • Using the framework I will create an asynchronous modules for each major device in the system. These modules will connect to the hardware and monitor health and maintain connection.
  • Next I will create a module that will be called synchronously by the test sequencer. Its purpose is to
  1. connect to the async module through the framework 
  2. request and receive measurements via messages
  3. Pass results back to the sequencer
  4. Close down

This works because the framework allows any module to connect, communicate and die without adversely affecting other modules. 

 

Mark

 

 

Link to comment

Hi Mark,

 

Thanks for the feedback. While searching for an existing solution a long time ago I found your message routing code. At the time I thought the API was a bit daunting. I could not figure out quickly how to set up exactly the mechanism I was looking for, so we made something much simpler for the project at hand, and moved on (I still have it installed though so perhaps I should revisit it).

 

The core functionality you describe is also covered by MultiQ I think, except for the added functionality that comes with OOP. As MultiQ only operates on strings (like the old queues, TCP/IP functions etc.) the interface is (intended to be) very simple, but needs an overlaying protocol to handle messaging, data types etc...

 

There are several issues with MultiQ that I hoped to get feedback on. The main question is what people think about the concept itself (I am using LAVA as an idea exchange here, at the risk of getting most eyes on the bad parts of code :-)), could it be useful (perhaps not if you are already using JAMA/your message routing solution for example). -But there are other things; like how full queues should be handled. 

 

Regular queues can go wild if no one is consuming the data and the length has not been limited, or you can block new production by setting a maximum number of elements. However, with a pub-sub design such behaviour becomes an even bigger problem; do you really want to halt publication because one subscriber is not consuming? Probably not, but sometimes you might...So should the solution support both, or simplify by choosing one of the two? How does the message routing solution deal with message flooding (I'll have another look at it later)?

Edited by Mads
Link to comment

Wikipage about this: https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern

 

Further I was wondering, your current implementation does not support subscription to multiple different publishers (who have a different name) right? That would be a nice feature, another extension would be that you can also publish across a network.

 

You can have multiple producers on the same "MultiQ" (there is really no access control built in (at least yet) so anyone who knows the name of a MultiQ and subscribes to it can also publish), but a subscriber cannot choose to bundle multiple subscriptions (to separate MultiQs) into one subscription reference. If that is what you are thinking of?

 

You can build a lot on top of MultiQ though, so you could extend it with a "single subscription to multiple separate MultiQs" set of Vis, to make it look like one subscriber line at a higher level. You could also make a "Topic" solution. I am not sure how much such functionality should be implemented directly at the lower level, and how much should be added as a higher level, utilizing the basic MultiQ (as it is now) at the core. A networked solution could be made on top of the current implementation too.

Edited by Mads
Link to comment

OK.

 

So let me clarify some points here.

 

There is a "distributor" which is launched dynamically and acts as the dealer to deal out the messages to a number of "registered" subscribers to that producer.

 

So when you "create" you launch a distributor. The distributor is a permanent consumer for the producer (i.e. it prevents queue fill-up when no subscribers). It also is the "dealer" to multiply messages by copying a message to each subscriber's incoming queue, if and when they appear. So you have a Distributor  per publisher that deals messages on a per publisher basis to subscribers.

 

If I'm close in understanding; it might be worth looking at Dispatcher as it sounds like this is an almost identical philosophy where I call the "distributor" a  "handler"

Edited by ShaunR
Link to comment

Yes Shaun, you have understood everything correctly (although all subscribers in this case can also produce data on the channel. -They can only opt out of subscribing to the data (preventing a non-used subscription queue from flooding).

 

I was actually just about to write that your Dispatcher is very similar to a networked (cool) version of MultiQ, when I saw your post.

I like the simple API of the Dispatcher, very nice :-) I was sure I had looked at it before, perhaps I was too focused on a non-networked solution back then.

Edited by Mads
Link to comment

There are several issues with MultiQ that I hoped to get feedback on. The main question is what people think about the concept itself (I am using LAVA as an idea exchange here, at the risk of getting most eyes on the bad parts of code :-)), could it be useful (perhaps not if you are already using JAMA/your message routing solution for example). -But there are other things; like how full queues should be handled. 

 

Regular queues can go wild if no one is consuming the data and the length has not been limited, or you can block new production by setting a maximum number of elements. However, with a pub-sub design such behaviour becomes an even bigger problem; do you really want to halt publication because one subscriber is not consuming? Probably not, but sometimes you might...So should the solution support both, or simplify by choosing one of the two? How does the message routing solution deal with message flooding (I'll have another look at it later)?

 

One of main difference that I see between yours and mine is who is responsible for sending the messages (who's while loop sends the message).

In yours you appear to spawn a "distributor" and it receives and distributes the messages to the registered modules.

In my architecture the sender does all the work (Through the API)

  • API finds the specific message registry using the name of the message
  • In the registry  is an array of all couriers that want to receive the message.
  • The message is placed in each courier and the "Send" method is called

So mine requires the sender to create the copies and send the message to each recipient where yours delegates that responsibility to the distributor loop. In MultiQ Because every module depends on the distributor if it crashes or dies then all communication is lost. This is something that I struggled with in previous implementations. It will work and I know of at least 2 other CLAs that use this method but for me is creates an unnecessary dependency in the types of application I build.

Link to comment

Yes Shaun, you have understood everything correctly (although all subscribers in this case can also produce data on the channel. -They can only opt out of subscribing to the data (preventing a non-used subscription queue from flooding).

 

OK. So what if a subscriber doesn't unregister but fails to service its queue? Both our systems are reliant on the registration being valid , up to date and the subscriber consuming otherwise messages start piling up. In mine, I have the TCPIP refnum that acts as a notifier for remote subscribers disappearing but locally I have the same problem as I bypass the TCPIP and place directly on subscribers queues like you do.

 

What is your strategy here?

Edited by ShaunR
Link to comment

OK. So what if a subscriber doesn't unregister but fails to service its queue? Both our systems are reliant on the registration being valid , up to date and the subscriber consuming otherwise messages start piling up. In mine, I have the TCPIP refnum that acts as a notifier for remote subscribers disappearing but locally I have the same problem as I bypass the TCPIP and place directly on subscribers queues like you do.

 

What is your strategy here?

 

I have not chosen yet...(although one strategy is already there in the draft of course...). ;)

 

Here are some of the alternatives as far as I see it:

 

  1. We keep it simple/lazy and leave it up to the programmer to avoid problems; basically this is how the native queues (user events too in many ways) do it (you can set a maximum size, you can use a timeout on the enqueue/dequeues, or use a lossy insert, but you have to choose a compromise suitable for the task at hand).

     

  2. Enforce a "fool-proof" solution with whatever drawbacks that will have (basically pick one of the startegies a programmer could go for in alternative 1 and close the door).

     

  3. Try to be smart about it by adding health monitoring to the core of the system; if a publisher/subscriber  or group of such seems to be misbehaving, shut it down before it affects the system...So the distributor in this case would say that if a subscriber queue is full on n occasions for example, it is assumed to be dead, and the subscription terminated...or the queue is automatically flushed by the distributor (kind of like the friendly neighbor deciding that you must be on vacation..better empty that mailbox before the burglars arrive...).

     

  4. Implement all 3 above, and allow the user to decide a "profile". (makes the API more messy).

I'm leaning towards alternatives 1 or 2. Any other strategies?

Edited by Mads
Link to comment

I chose 1 & 3. I thought that covered the most basic requirements with an offload of decision in strategy to the person who knows more. There is no need to flush a queue if you destroy it and my little queue wrapper means you can just crowbar queues and they get reconstituted if it really was being used. (effectively just clearing it)

 

You will find the lossy/normal and limit choice in the Dispatcher and the queue size transmitted to the Dispatcher so the status could be monitored for each producer to assist the strategy decision (#1) . I never got around to a heartbeat or pinging and fail-over/retrying which was the next on my list (#3) before I got sidetracked by Websockets..

 

I wouldn't be too concerned about complexity of the API as such. As long as it is a simple interface. Things like heartbeats, pings, bandwidth management and fail-over etc can all be switchable features and completely transparent. Many of those don't make sense if your not traversing the network boundary but your design isn't limited to local processes

 

Can't wait to see where you go with this.

Link to comment

Join the conversation

You can post now and register later. If you have an account, sign in now to post with your account.

Guest
Reply to this topic...

×   Pasted as rich text.   Paste as plain text instead

  Only 75 emoji are allowed.

×   Your link has been automatically embedded.   Display as a link instead

×   Your previous content has been restored.   Clear editor

×   You cannot paste images directly. Upload or insert images from URL.

  • Similar Content

    • By Humungousaur
      Hello,
       
      I am trying to use notifier for synchronization within my code. My project requires sender program to send a command to multiple receiver programs. The sender program has to wait for response from all the receiver programs to proceed. I am using user event to send the command from sender program to the receiver programs. And receiver program has to set the notifier after it receives the command. Sender program will wait on notifier. 
       
      If I don't have any delay between subseqent notifier create vis, my wait on notifier always results in timeout for some of the receiver programs. But when I include a small delay between subsequent create notifier vis, then wait on notifier receives all the notification without timeout.
       
      For me it looks like a LabVIEW issue, but I wanted to hear from others, that is there anything wrong in the implementation?.
       
      I have attached a small sample program for reference. Open up the sender program, configure no of receivers and run the program. Click the send and receive button to send user events and wait on notifiers.
       
      Thanks in advance,
      Nanda
      Notifier Prototypes.zip
    • By manigreatus
      I am working on a cRIO based application in which I am using some SEQs and these SEQs are being used within different parallel processes... Is there any difference in terms of processing to keep SEQ's reference in a shift register or to obtain the SEQ reference (using a name and "Obtain Queue" function) each time it is used?
      Thanks for any comments.
    • By JMak
      Hi All,
      I have been trying to come up with a way to make my program more efficient. I am trying to use less nested case structures, and to avoid calling sub VIs multiple times. I looked at some of the design patterns, and there was not a sole design pattern available that did what I wanted, so I have tried to combine 3 patterns into one.
      My application has a user interface controlling a camera, and each function is context sensitive. For example, if the camera is already acquiring images, the "change video mode" function needs to unconfigure acquisition before adjusting the settings. If the camera is not doing anything, the unconfigure step isn't necessary.
      My previous attempt used nested case structures to test the condition of all these criteria, and take the required action, so there were cases for every combination of states. I wanted the code to be more minimalist, and call functions (sub VIs) in a state machine in a different order, skipping certain cases, depending on the context. I wanted to combine the low CPU usage of the event structure with the queued message handler example, to take an array of states and process them one at a time.
      My first attempt is attached. I used notifiers because I don't want steps to be queued if the user presses multiple commands before completion. I want to only process the first command received after the current notifier changes are made. I couldn't think of a way to change between using the array from the notifier and using the modified array (minus the deleted element) in a shift register, so I used the send notification inside the slave loop to send the modified array of states. I want to know if there is a better, more efficient way of doing this, and whether there are any problems here.
      Thanks,
      James
      MasterSlave Events.vi
×
×
  • Create New...

Important Information

By using this site, you agree to our Terms of Use.