Jump to content

Worker pool


Recommended Posts

I might add one thing to your Queue-based pattern.

Currently, you have the queue wait indefinitely for a new command to process. This works well if you enqueue all your commands at the same time. But it's possible you might want a pattern where you can keep enqueuing commands asynchronously. In the current design, you would never get to reuse any of your shared reentrant clones, because the last recursive call to the Worker Pool VI is still waiting on the queue, so the entire stack is blocked.

I would add a variable timeout to the queue function. The idea is that the root call has timeout -1 if you want it to run indefinitely, but all inner recursive calls have timeout 0, so that as the last clones finish, their reentrant instances can be reclaimed by the pool and reused. You can also set the root timeout to 0 if you want it to stop as soon as the queue is ever empty.

post-5171-125735568143_thumb.png

Link to comment

I might add one thing to your Queue-based pattern.

This is exactly the kind of discussion I wanted to raise. I would like the community to get help from the community to develop the best way to use this design idea. Ragglefrock, in your example there is a problem. The recrursive call to the worker pool exits immediately if there are no longer stuff to process. The outer most worker pool exits after it has finished its own task. So that worker pool doesn't wait indefinitely for new commands but exits after it has processed the first command in the queue. Indeed it functions as the dataflow worker pool, executing all the commands that are in the queue when the worker pool is started and then exits.

Link to comment

This is exactly the kind of discussion I wanted to raise. I would like the community to get help from the community to develop the best way to use this design idea. Ragglefrock, in your example there is a problem. The recrursive call to the worker pool exits immediately if there are no longer stuff to process. The outer most worker pool exits after it has finished its own task. So that worker pool doesn't wait indefinitely for new commands but exits after it has processed the first command in the queue. Indeed it functions as the dataflow worker pool, executing all the commands that are in the queue when the worker pool is started and then exits.

Ah, indeed. That doesn't make very much sense, does it? I think what I needed was a while loop inside the worker pool VI to accomplish what I was describing.

One question: what does this give us over just using a parallel for loop? I vaguely feel there's some differences, but I'm not 100% sure what they are.

post-5171-125739436964_thumb.png

Link to comment

One thing that came to mind as an interesting use case for this was in the context of a Queued State Machine. I had started prototyping a QSM architecture built on LVOOP, where each state was defined as a class overriding a base State class that had an Execute State dynamic method. This would allow you to dynamically add states, even at run-time, as well as populate a queued state with specific data and properties to control how it executes.

It seemed like a nice idea, but quickly got a little washed out when I thought about how it might be awkward to actually use the darn thing.

But... it's starting to sound interesting again because of this. Imagine now that the engine that executes the queued states is some form of the Worker Pool. We could then have a QSM that can asynchronously handle certain cases if they don't require a strict ordering. Sometimes you would want to assume that state A finishes before state B starts, so there would have to be a way to either dispatch the state to a synchronous or asynchronous execution system, where the asynchronous system is much like your Worker Pool design pattern.

Link to comment

And one last refinement option. You can insert an extra queue (or some such resource) to artificially limit the number of worker threads that can be activated at any given time, regardless of the size of the actual Command Queue. For instance, in the example below I limit the number of workers to the number of CPUs in the system. Here I used a named queue as the shared resource, but there are many other ways to abstract this out.

post-5171-125739709809_thumb.png

Link to comment

Ah, indeed. That doesn't make very much sense, does it? I think what I needed was a while loop inside the worker pool VI to accomplish what I was describing.

Now if there is initially a single task in the queue, the outermost workerpool instance starts executing it. The next worker pool exits because of a timeout. Now submitting a next task to worker pool needs to wait for the first task to finish. So it doesn't get executed in parallel with the first task. If the first task is locked waiting for second task to do something prior to proceeding, then the program ends up into a dead-lock.

One question: what does this give us over just using a parallel for loop? I vaguely feel there's some differences, but I'm not 100% sure what they are.

The number of workers in the worker pool for parallel loop is limited by a development time set upper limit. Furthermore the number of workers is fixed at the time when the loop starts executing. Unlike the proposed architecture, you cannot dynamically at runtime increase the number of workers as needed. Therefore it suits to parallelizing a mathematical analysis where tasks may be independent but doesn't suit very well for situations like dynamically adding listeners for a network connection.

One thing that came to mind as an interesting use case for this was in the context of a Queued State Machine.

State machine architecture is indeed something I had in mind when I came up with the worker pool design pattern. To be more precise, an ability to have parallelly executing substates to represent a single outer state. I am not yet ready with my state machine design but I will blog on it once I have solved the problems I am still having.

Link to comment

Now if there is initially a single task in the queue, the outermost workerpool instance starts executing it. The next worker pool exits because of a timeout. Now submitting a next task to worker pool needs to wait for the first task to finish. So it doesn't get executed in parallel with the first task. If the first task is locked waiting for second task to do something prior to proceeding, then the program ends up into a dead-lock.

Yes, that is true. Is there any way to reclaim or reuse reentrant clones then in this design pattern? If you continue with the idea of having a loop in the Worker Pool clones, then I believe the idea comes down to the fact that you only ever need one clone waiting on the queue at a time. More than that is wasteful, because any clone is capable of starting a new listener as soon as something is enqueued. But on the other hand you have to always have one listener or you encounter the situation you described. I think protecting the Command Queue with a single-element lock could solve this. If you try to access the Command Queue while the resource is locked by another clone, then there's no reason for you to continue executing and you can shut down.

Link to comment

Yes, that is true. Is there any way to reclaim or reuse reentrant clones then in this design pattern? If you continue with the idea of having a loop in the Worker Pool clones, then I believe the idea comes down to the fact that you only ever need one clone waiting on the queue at a time. More than that is wasteful, because any clone is capable of starting a new listener as soon as something is enqueued. But on the other hand you have to always have one listener or you encounter the situation you described. I think protecting the Command Queue with a single-element lock could solve this. If you try to access the Command Queue while the resource is locked by another clone, then there's no reason for you to continue executing and you can shut down.

I believe I was correct. I'm attaching the prototype code I've been working on. If you put a single-element mutex around the Command Queue and have each clone exit if they can't acquire the mutex, then you end up with exactly one clone waiting at any given time. The only exception is the time between a clone receiving a command and spawning the new clone to start listening, but this time is constant and bounded. Adding this new mutex means you now must force the Command Queue to have an infinite timeout, but that seems very acceptable. The root caller can always simply kill the queue after it registers all commands have been processed if they wish.

The code I'm attaching tests this by having each Command Execute method pop up a front panel with a Stop button. This allows you to test the execution ordering. The most recent Worker Pool to look at is Worker Pool with Limit and Queue Lock.vi.

I believe I was correct. I'm attaching the prototype code I've been working on. If you put a single-element mutex around the Command Queue and have each clone exit if they can't acquire the mutex, then you end up with exactly one clone waiting at any given time. The only exception is the time between a clone receiving a command and spawning the new clone to start listening, but this time is constant and bounded. Adding this new mutex means you now must force the Command Queue to have an infinite timeout, but that seems very acceptable. The root caller can always simply kill the queue after it registers all commands have been processed if they wish.

The code I'm attaching tests this by having each Command Execute method pop up a front panel with a Stop button. This allows you to test the execution ordering. The most recent Worker Pool to look at is Worker Pool with Limit and Queue Lock.vi.

Hmmm... Another flaw here. I never actually successfully seem to reclaim or reuse any clone instances in this case either. The problem seems to be that you can't wait for the queue again unless you're the last clone in the Worker Pool chain. This also implies that the last clone in the Worker Pool chain is always free to start listening to the Command Queue, because by definition no one else can. This leads to the fact that we always create new Worker Pool clones, because we never can reach the case where a clone tries to listen to the queue and fails, which is what shuts a clone down in my scenario.

Back to the drawing board...

Worker Pool.zip

Link to comment

I was thinking of an alternate solution, that may be little more complex. Assume you have a sequential worker pool and a parallel worker pool. Sequential worker pool is a loop that dequeues commands and executes them in a sequence. Parallel worker pool on the other hand consists of a single sequential worker pool and a recursive call to parallel worker pool itself. Within the parallel worker pool, the sequential worker pool and parallel worker pool are running in parallel. Neither of these, however listen to the main queue, but a new queue is created for both the parallel worker pool and sequential worker pool instances. In parallel to both of the two worker pools, there is a dispatcher loop. The dispatcher loop listens the main queue. Once it contains elements, it either submits the element to one of the two queues representing the two worker pools, parallel and sequential running in parallel. If the sequential worker pool is busy executing a task, then the dispatcher submits the incoming task to the parallel worker pool. If sequential worker pool is free, then it gets the task.

Link to comment

 Tomi, thank you for this awesome piece of code. I am trying to compare this approach with a "conventional" one where I wrap "Run VI" method of the VI server in the basic LVThread class, and assume all other parallel tasks are the implementations of the childs of the LVThread). I really like your approach:

1. It works in RunTime engine. I do not have to specify FP of every child of LVThread should I want to pass some argument to the tasks

2. It works even in mobile module - no need to use "Get LV Class Path" which is unavailable for the mobile.

However, you can not asynchronously stop (Abort VI) a hung up worker? I understand that "Abort VI" should be used in limited circumstances, but do you see any way to overcome that?

Link to comment

I posted a new article concerning the general concepts behind design pattern that may be clear to many LabVIEW developers but not software engineers of other programming languages. Further I updated the example LabVIEW project with a worker pool that supports recursive worker instance reusage.

Please copy paste the link below, direct linking to LAVA is prohibited by a LAVA bug.

expressionflow.com/2009/11/10/unlimited-parallelism-concurrency-with-recursive-dataflow

Unlimited parallelism & concurrency with recursive dataflow

Tomi

  • Like 2
Link to comment

Felix, I am sorry we really don't have functional programming at all in LabVIEW. What we have is OOP. The blog post was intended for more general audience of dataflow programming languages than just LabVIEW developers. In functional programming you can pass all functions as a parameter to other functions.

Link to comment

Join the conversation

You can post now and register later. If you have an account, sign in now to post with your account.

Guest
Reply to this topic...

×   Pasted as rich text.   Paste as plain text instead

  Only 75 emoji are allowed.

×   Your link has been automatically embedded.   Display as a link instead

×   Your previous content has been restored.   Clear editor

×   You cannot paste images directly. Upload or insert images from URL.

×
×
  • Create New...

Important Information

By using this site, you agree to our Terms of Use.