X Tutup
The Wayback Machine - https://web.archive.org/web/20201110124019/https://github.com/taskflow/taskflow/pull/228
Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add semaphores to limit concurrency for certain tasks #228

Open
wants to merge 4 commits into
base: master
from

Conversation

@musteresel
Copy link
Contributor

@musteresel musteresel commented Sep 17, 2020

This is the early stages of implementing a "Semaphore" to limit concurrency in certain sections / tasks of the graph.

A task can be given the requirement to acquire one or multiple semaphores before executing its work and a task can be given the job to release one or multiple semaphores after finishing its work. A task can acquire and release a semaphore, or just acquire or just release it. Semaphores start with an initial count. As long as that count is above 0, tasks can acquire the semaphore and do their work. If the count is 0 or less, a task trying to acquire the semaphore is not run and instead put into a waiting list of that semaphore. When the semaphore is released by another task, then tasks on that waiting list are scheduled to be run again (trying to acquire their semaphore(s) again).

I've added a simple example with 5 tasks with no links/dependencies between them; which under normal circumstances would be executed concurrently. The example however has a semaphore with initial count 1, and all tasks need to acquire that semaphore before running and release that semaphore after they're done. This limits the concurrently running tasks to only one. See examples/onlyone.cpp

Todo:

  • Generally: Test! Right now I just have that single example which runs as expected from a first glance.
  • Add ability for acquire/release to change semaphore counter by another value than 1
  • Measure performance impact when feature is not used.
  • Make sure semaphores don't needlessly block worker threads.

I'd be happy about general feedback about the implementation so far! Did I follow your coding standards? Any oversights? Suggestions for improvement?

musteresel added 3 commits Sep 16, 2020
 - Before a node is _invoked, it tries to acquire all constraints
   which it needs.

 - If it fails to acquire even one; it releases all already acquired
   constraints and is put on a wait list of the constraint which
   couldn't be acquired.  The invoke function then returns, doing
   nothing.

 - After successfully acquiring all constraints, the work of the node
   is done.  After that, it relases all constraints.

 - Releasing constraints causes nodes to be scheduled which previously
   failed to acquire these constraints.

 - The set of constraints to acquire and release are different; a
   constraint can be acquired and released by a node, or only acquired
   or only released.
 - FlowBuilder::semaphore(int) creates a new semaphore with initial
   count.

 - Task::release(Semaphore) makes a task release the given semaphore
   when it is done with its work.

 - Task::acquire(Semaphore) forces a task to acquire the given
   semaphore before doing its work.
 - onlyone: In this example, 5 tasks (with no links between them) are
   given. Each task waits a second, and then prints a letter.  The
   tasks all acquire (and release) a semaphore with initial count of
   1.
@tsung-wei-huang
Copy link
Member

@tsung-wei-huang tsung-wei-huang commented Sep 17, 2020

@musteresel thanks for your effort! I will look into this weekend (a bit busy with the taskflow presentation on CppCon tmr).

@tsung-wei-huang
Copy link
Member

@tsung-wei-huang tsung-wei-huang commented Sep 21, 2020

@musteresel I read through this PR. Here are my questions:

  1. It seems to me t.acquire and t.release must always come with a pair. We may use a single method to represent than in one-shot. Of course, we may allow different threads to acquire and release but I envisage this will confuse users and cause many undefined behvaiors from the user's side. Any thought? Or, more safely, we can do:
  tf::Executor executor(4);
  tf::Taskflow tf;
  tf::ConcurrencyGroup g = tf.concurrency_group(1);  // I use concurrency group to be specific to this example; 

  // PS: semaphore is ok, but its naming is indeed broader in meaning

  std::vector<tf::Task> tasks;
  tasks.push_back(tf.emplace([](){ sl(); std::cout << "A" << std::endl; }));
  tasks.push_back(tf.emplace([](){ sl(); std::cout << "B" << std::endl; }));
  tasks.push_back(tf.emplace([](){ sl(); std::cout << "C" << std::endl; }));
  tasks.push_back(tf.emplace([](){ sl(); std::cout << "D" << std::endl; }));
  tasks.push_back(tf.emplace([](){ sl(); std::cout << "E" << std::endl; }));
  for(auto & t : tasks) {
    t.concurrency_group(g);
  }
  1. Do we really need to allow a task to hold multiple constraints? That is, each task can belong to only one concurrency group in my intention. Of course, you current solution makes it general but I wonder what are the use cases.

  2. I am studying how to get std::vector<Constraint*> _to_acquire and std::vector<Constraint*> _to_release out of tf::Node. Right now, tf::Node is quite large and it contributes the major overhead of task graph parallelism. With new two extra vectors, it will add non-negligible overhead to the entire Taskflow program. Either we can create a special node that hold these two data structures in the handle variant of tf::Node or put it somewhere at the graph level (tf::Taskflow).

  3. I believe there must be limitations between CPU domain and GPU domain because the executor keeps a separate group of workers per domain.

@BenBE
Copy link

@BenBE BenBE commented Sep 22, 2020

As an example usecase for the second point imagine a tool that wants to collect some statistics from different sources from a list of items, where each data source is rate limited and you also need to limit the number of different connections to those sources (i.e. a DNS recursor to collect domain statistics):

  1. Number of parallel items to process (domains to look up)
  2. Number of parallel (outstanding) requests per upstream resolver
  3. Number of parallel (open) upstream resolver connections/sockets

And that's not even all concurrency limits you might want to care for in this example.

@tsung-wei-huang
Copy link
Member

@tsung-wei-huang tsung-wei-huang commented Sep 22, 2020

@BenBE , thanks for the comment - I guess then we will need to allow a task to be able to go into multiple concurrency_group. I am more favored for concurrency_group rather than semaphore because I think at this stage of being experimental, its safer to have this one-shot API for controlling concurrency. Any thought?

@BenBE
Copy link

@BenBE BenBE commented Sep 22, 2020

I'd somehow prefer some interface, where I can provide the restrictions (semaphores/concurrency groups) as part of the task construction:

std::vector<tasks>
task.emplace_back([]() { /* code */ }, {sem1, sem2});
task.emplace_back([]() { /* code */ }, {sem1, sem3});
task.emplace_back([]() { /* code */ }, sem3);
…

The "missing braces" in the last line are intentional.

Having the "concurrency" counter not as a task counter, but e.g. some "task size" may be used to e.g. task ordering.

@musteresel
Copy link
Contributor Author

@musteresel musteresel commented Sep 22, 2020

It seems to me t.acquire and t.release must always come with a pair.

No, of course not! That's exactly the "power" of a semaphore over a just "limit concurrency for this (group of) nodes".

We may use a single method to represent than in one-shot.

I want to add this as "convenience" member function, which under the hood just calls both acquire and release.

Of course, we may allow different threads to acquire and release but I envisage this will confuse users and cause many undefined behvaiors from the user's side. Any thought?

Of course it might confuse users; though anyone who knows about concurrency should've stumbled over the concept of a semaphore IMO. Semaphores are a very "basic" building block for more complex and advanced synchronization tools, too. I think it is absolutely necessary to remain the division between acquire and release. It allows for other forms of synchronization, too, instead of just "limit concurrency":

  • Some nodes produce some data, and release a semaphore accordingly. Other nodes use that data, and acquire that semaphore. (Producer -> Consumer model)
  • Multiple nodes utilize different approaches to calculate the same one data; another node needs that data. When the first of the "producing" nodes completes, it releases a semaphore and lets the "consuming" node continue, which can set a flag for the other "producing" nodes to abandon their calculation. (Example: Different approaches at path calculation / prime factorization / genetic optimization / ...)

[..] I think at this stage of being experimental, its safer to have this one-shot API for controlling concurrency. Any thought?

We can add the "concurrency group" on top of the semaphores? Both being documented; it allows users to use the "safer" / simpler API of the concurrency groups, but also gives them the full power of semaphores.

I am studying how to get std::vector<Constraint*> _to_acquire and std::vector<Constraint*> _to_release out of tf::Node. Right now, tf::Node is quite large and it contributes the major overhead of task graph parallelism. With new two extra vectors, it will add non-negligible overhead to the entire Taskflow program. Either we can create a special node that hold these two data structures in the handle variant of tf::Node or put it somewhere at the graph level (tf::Taskflow).

I think there are 3 types of "overhead" (in the general sense) to discuss:

  • Performance: I'll run the benchmarks in a minute, but I doubt that the impact will be large on a taskflow which does not use the new semaphore feature ("Don't pay for what you don't use" - that core C++ idiom)

  • Memory: The two vectors add a bit of memory for every node ... this could be a problem; perhaps extracting the vectors into their own class SemaphoreControl and having a std::unique_ptr<SemaphoreControl> in the node can reduce this far enough? Another approach is to add a map Node -> SemaphoreControl to the graph; but this will take a significant performance hit: a (synchronized!) lookup in a map for each node execution. Would reduce memory to the "minimum possible", though.

  • Code complexity: The node class is already quite large, as are its member functions. Perhaps it might be possible to extract it (if not like above) into a base ("mixin") class.

@musteresel
Copy link
Contributor Author

@musteresel musteresel commented Sep 22, 2020

I believe there must be limitations between CPU domain and GPU domain because the executor keeps a separate group of workers per domain.

Can you elaborate on this? As far as I see, the semaphores affect only the scheduling of nodes; and as such there shouldn't be any issues. Scheduling for both CPU and GPU workers works the same, after all; they're just using different queues, right?

@musteresel
Copy link
Contributor Author

@musteresel musteresel commented Sep 22, 2020

Note; this (also) addresses #160

@tsung-wei-huang
Copy link
Member

@tsung-wei-huang tsung-wei-huang commented Sep 23, 2020

@musteresel Thanks for your comments. Yes, I agree ConcurrencyGroup can be built on top of semaphore. We should keep the semaphore there and build ConcurrencyGroup on top. I am not worried about the performance if you don't use it, but I am quite concerned about adding two vectors (or one with some tricks) to the node. Of course, having a graph-level map is not good either because it causes a large performance hit, as you said.

Not sure if u ever read our research paper, tf::Node is quite large compared to many other libraries (see Figure 9). In our applications (i.e., CAD), we often create millions to billions of tasks and this extra size can cause significant space overhead if we don't use it. But, from the current design, tf::Node seems to be the only place that can store the constraint.

I am still thinking about a space-efficient way to handle this. Ideas are welcome.

@musteresel
Copy link
Contributor Author

@musteresel musteresel commented Sep 24, 2020

Yes, I've read (most parts of) your research paper, the size issue just didn't catch my eye that much :)

Ok, going forward in this direction I have an idea of how to reduce the size of all nodes without affecting performance; I'll make a pull request ASAP.

For the two vectors: Then I think the best idea is to move them into a separate class, and put a pointer to an object of that class into tf::Node. This will then "cost" 8 (or 4 bytes on 32 bit systems) per node, which is probably the minimum when keeping a "general solution". We could further reduce the memory consumption by using e.g. an 1 byte index into an array of semaphore pointers which is present in the graph object; but this limits the number of available semaphores and makes it more difficult to have dynamically allocated semaphores (e.g. in a subflow).

@tsung-wei-huang
Copy link
Member

@tsung-wei-huang tsung-wei-huang commented Sep 24, 2020

@musteresel completely agrees with you along the line of reducing memory. In addition, I think the best place to store constraints (or semaphore) is tf::flow_builder instead of tf::Graph because tf::Node::DynamicWork keeps a graph and that also affects the size of a node. Another advantage is we can keep the constraint to a subgraph level, which makes more sense. For example, when creating a subflow, it inherits tf::flow_builder and that forces users to create constraints from a subflow instead of a taskflow. In this way, we avoid users to accidentally create a constraint group that mixes the semaphore between different subgraph scopes which is generally not allowed.

 - Updates build system
 - Changes to TBB
 - Changes to documentation
@musteresel
Copy link
Contributor Author

@musteresel musteresel commented Oct 3, 2020

[..] that forces users to create constraints from a subflow instead of a taskflow. In this way, we avoid users to accidentally create a constraint group that mixes the semaphore between different subgraph scopes which is generally not allowed.

Hm, I've been thinking about this a bit. If one uses semaphores to e.g. constrain the usage of certain resources (say specific hardware devices, network connections, ...) ... then this constraint might well need to regulate tasks in different subflows, right?

If we'd go with a vector of semaphores in the flow_builder then that wouldn't be possible I guess?

@tsung-wei-huang
Copy link
Member

@tsung-wei-huang tsung-wei-huang commented Oct 5, 2020

@musteresel yes, vector in the flow_builder is preferred. Also, async task is a special task not associated with any graph. So we don't need to handle it:

 case Node::ASYNC_WORK: {
      _invoke_async_work(worker, node);
      _decrement_topology_and_notify();
      return ;
 }
@mfbalin
Copy link

@mfbalin mfbalin commented Oct 30, 2020

Is this going to be merged in the near future, it seems like a very useful feature.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Linked issues

Successfully merging this pull request may close these issues.

None yet

4 participants
You can’t perform that action at this time.
X Tutup