Processing DAGs with async Python and graphlib

王林
Release: 2024-08-27 06:37:09
Original
492 people have browsed it

Processing DAGs with async Python and graphlib

I recently came across an interesting module in Python's bottomless standard library: graphlib. If you haven't worked with it before, it's a small utility that was added in Python 3.9 and only implements one class: TopologicalSorter.

The name is self-explanatory -- this is a class for topological sorting of a graph. But I don't think it was originally written with just sorting in mind, since it has rather cryptic, but incredibly useful API, such as prepare() method or is_active(). This example in the documentation hints on the motivation behind it:

topological_sorter = TopologicalSorter() # Add nodes to 'topological_sorter'... topological_sorter.prepare() while topological_sorter.is_active(): for node in topological_sorter.get_ready(): # Worker threads or processes take nodes to work on off the # 'task_queue' queue. task_queue.put(node) # When the work for a node is done, workers put the node in # 'finalized_tasks_queue' so we can get more nodes to work on. # The definition of 'is_active()' guarantees that, at this point, at # least one node has been placed on 'task_queue' that hasn't yet # been passed to 'done()', so this blocking 'get()' must (eventually) # succeed. After calling 'done()', we loop back to call 'get_ready()' # again, so put newly freed nodes on 'task_queue' as soon as # logically possible. node = finalized_tasks_queue.get() topological_sorter.done(node)
Copy after login

So graphlib is not a module just for sorting graphs, it's also a utility for running graphs of tasks in topological order, which is useful if your workloads have tasks depending on results of other tasks. Graphs are a great way to model this problem, and topological order is how you make sure tasks are processed in the right order.

One thing that is missing from the docs is asyncio example, which turns out to be quite easy to write. Since with asyncio you don't have to deal with thread-safety, you can get by without using queue for synchronizing threads or any other additional complexity of the sort.

We'll define a simplistic async node visitor function:

async def visit(node: str, sorter: TopologicalSorter): print(f"processing node {node}") sorter.done(node)
Copy after login

In the real world this can be as complex as you'd like, as long as you're doing I/O bound work so reap the benefits of asyncio. The important bit is to call the sorter.done(node) in the end of the function to let the instance of TopologicalSorter know we're done with this node and we can progress onto the next.

Then we plug the visit function into our topologically ordered run:

sorter = TopologicalSorter(graph) sorter.prepare() while sorter.is_active(): node_group = sorter.get_ready() if not node_group: # no nodes are ready yet, so we sleep for a bit await asyncio.sleep(0.25) else: tasks = set() for node in node_group: task = asyncio.create_task(visit(node, sorter)) tasks.add(task) task.add_done_callback(tasks.discard)
Copy after login

Full source code of a working script can be found in this gist.

One peculiar aspect of graphlib is the format of the graph the TopologicalSorter accepts as an argument -- it is in reverse order from your typical representation of a graph. E.g. if you have a graph like this A -> B -> C, normally you'd to represent it like this:

graph = { "A": ["B"], "B": ["C"], "C": [], }
Copy after login

but the TopologicalSorter wants this graph in the with edge direction reversed:

If the optional graph argument is provided it must be a dictionary representing a directed acyclic graph where the keys are nodes and the values are iterables of all predecessors of that node in the graph

So the right way to represent A -> B -> C for the TopologicalSorter is this:

graph = { "C": ["B"], "B": ["A"], "A": [], }
Copy after login

More info and a rather heated debate on this can be found here: https://bugs.python.org/issue46071.

Happy coding!

The above is the detailed content of Processing DAGs with async Python and graphlib. For more information, please follow other related articles on the PHP Chinese website!

source:dev.to
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template
About us Disclaimer Sitemap
php.cn:Public welfare online PHP training,Help PHP learners grow quickly!