Join GitHub today
GitHub is home to over 50 million developers working together to host and review code, manage projects, and build software together.
Sign upAdd semaphores to limit concurrency for certain tasks #228
Conversation
- Before a node is _invoked, it tries to acquire all constraints which it needs. - If it fails to acquire even one; it releases all already acquired constraints and is put on a wait list of the constraint which couldn't be acquired. The invoke function then returns, doing nothing. - After successfully acquiring all constraints, the work of the node is done. After that, it relases all constraints. - Releasing constraints causes nodes to be scheduled which previously failed to acquire these constraints. - The set of constraints to acquire and release are different; a constraint can be acquired and released by a node, or only acquired or only released.
- FlowBuilder::semaphore(int) creates a new semaphore with initial count. - Task::release(Semaphore) makes a task release the given semaphore when it is done with its work. - Task::acquire(Semaphore) forces a task to acquire the given semaphore before doing its work.
- onlyone: In this example, 5 tasks (with no links between them) are given. Each task waits a second, and then prints a letter. The tasks all acquire (and release) a semaphore with initial count of 1.
|
@musteresel thanks for your effort! I will look into this weekend (a bit busy with the taskflow presentation on CppCon tmr). |
|
@musteresel I read through this PR. Here are my questions:
tf::Executor executor(4);
tf::Taskflow tf;
tf::ConcurrencyGroup g = tf.concurrency_group(1); // I use concurrency group to be specific to this example;
// PS: semaphore is ok, but its naming is indeed broader in meaning
std::vector<tf::Task> tasks;
tasks.push_back(tf.emplace([](){ sl(); std::cout << "A" << std::endl; }));
tasks.push_back(tf.emplace([](){ sl(); std::cout << "B" << std::endl; }));
tasks.push_back(tf.emplace([](){ sl(); std::cout << "C" << std::endl; }));
tasks.push_back(tf.emplace([](){ sl(); std::cout << "D" << std::endl; }));
tasks.push_back(tf.emplace([](){ sl(); std::cout << "E" << std::endl; }));
for(auto & t : tasks) {
t.concurrency_group(g);
}
|
|
As an example usecase for the second point imagine a tool that wants to collect some statistics from different sources from a list of items, where each data source is rate limited and you also need to limit the number of different connections to those sources (i.e. a DNS recursor to collect domain statistics):
And that's not even all concurrency limits you might want to care for in this example. |
|
@BenBE , thanks for the comment - I guess then we will need to allow a task to be able to go into multiple |
|
I'd somehow prefer some interface, where I can provide the restrictions (semaphores/concurrency groups) as part of the task construction: std::vector<tasks>
task.emplace_back([]() { /* code */ }, {sem1, sem2});
task.emplace_back([]() { /* code */ }, {sem1, sem3});
task.emplace_back([]() { /* code */ }, sem3);
…The "missing braces" in the last line are intentional. Having the "concurrency" counter not as a task counter, but e.g. some "task size" may be used to e.g. task ordering. |
No, of course not! That's exactly the "power" of a semaphore over a just "limit concurrency for this (group of) nodes".
I want to add this as "convenience" member function, which under the hood just calls both acquire and release.
Of course it might confuse users; though anyone who knows about concurrency should've stumbled over the concept of a semaphore IMO. Semaphores are a very "basic" building block for more complex and advanced synchronization tools, too. I think it is absolutely necessary to remain the division between acquire and release. It allows for other forms of synchronization, too, instead of just "limit concurrency":
We can add the "concurrency group" on top of the semaphores? Both being documented; it allows users to use the "safer" / simpler API of the concurrency groups, but also gives them the full power of semaphores.
I think there are 3 types of "overhead" (in the general sense) to discuss:
|
Can you elaborate on this? As far as I see, the semaphores affect only the scheduling of nodes; and as such there shouldn't be any issues. Scheduling for both CPU and GPU workers works the same, after all; they're just using different queues, right? |
|
Note; this (also) addresses #160 |
|
@musteresel Thanks for your comments. Yes, I agree Not sure if u ever read our research paper, I am still thinking about a space-efficient way to handle this. Ideas are welcome. |
|
Yes, I've read (most parts of) your research paper, the size issue just didn't catch my eye that much :) Ok, going forward in this direction I have an idea of how to reduce the size of all nodes without affecting performance; I'll make a pull request ASAP. For the two vectors: Then I think the best idea is to move them into a separate class, and put a pointer to an object of that class into |
|
@musteresel completely agrees with you along the line of reducing memory. In addition, I think the best place to store constraints (or semaphore) is |
- Updates build system - Changes to TBB - Changes to documentation
Hm, I've been thinking about this a bit. If one uses semaphores to e.g. constrain the usage of certain resources (say specific hardware devices, network connections, ...) ... then this constraint might well need to regulate tasks in different subflows, right? If we'd go with a vector of semaphores in the flow_builder then that wouldn't be possible I guess? |
|
@musteresel yes, vector in the flow_builder is preferred. Also, async task is a special task not associated with any graph. So we don't need to handle it: case Node::ASYNC_WORK: {
_invoke_async_work(worker, node);
_decrement_topology_and_notify();
return ;
} |
|
Is this going to be merged in the near future, it seems like a very useful feature. |

Formed in 2009, the Archive Team (not to be confused with the archive.org Archive-It Team) is a rogue archivist collective dedicated to saving copies of rapidly dying or deleted websites for the sake of history and digital heritage. The group is 100% composed of volunteers and interested parties, and has expanded into a large amount of related projects for saving online and digital history.

This is the early stages of implementing a "Semaphore" to limit concurrency in certain sections / tasks of the graph.
A task can be given the requirement to acquire one or multiple semaphores before executing its work and a task can be given the job to release one or multiple semaphores after finishing its work. A task can acquire and release a semaphore, or just acquire or just release it. Semaphores start with an initial count. As long as that count is above 0, tasks can acquire the semaphore and do their work. If the count is 0 or less, a task trying to acquire the semaphore is not run and instead put into a waiting list of that semaphore. When the semaphore is released by another task, then tasks on that waiting list are scheduled to be run again (trying to acquire their semaphore(s) again).
I've added a simple example with 5 tasks with no links/dependencies between them; which under normal circumstances would be executed concurrently. The example however has a semaphore with initial count 1, and all tasks need to acquire that semaphore before running and release that semaphore after they're done. This limits the concurrently running tasks to only one. See examples/onlyone.cpp
Todo:
I'd be happy about general feedback about the implementation so far! Did I follow your coding standards? Any oversights? Suggestions for improvement?