How can I implement "weak" edges with NetworkX?

876 Views Asked by At

Background

I am writing a Python module to collect and schedule tasks for a nightly automatic update, with the option to interactively execute individual tasks. (The nightly updates run all tasks unconditionally.)

  • Some of these individual tasks depend on the execution of another task to function properly: e.g. if task A is scheduled (automatically or manually) but task B is not, then task B must be added to the schedule before A. I'm referring to these as strong dependencies, because they "pull in" a new task if necessary.

  • Other tasks depend on the output of another task, but can also operate on old output if that task is not scheduled; e.g. if task C is scheduled but task D is not, then D is not added, and C runs on old data. I'm referring to these as weak dependencies, because they're "too weak" to make changes to the schedule.

I've successfully implemented strong dependencies using nx.DiGraph to build the dependency relationships and nx.topological_sort to generate the final task schedule (with the requested tasks as the starting nodes). I'm unsure how to implement weak dependencies, however, because topological_sort seems to always add nodes connected by any edge. I've even tried setting the edges' weight to 0, but no luck.

Implementation

EDIT: Here's a rough example of my current implementation.

# tasklib.py
task_graph = nx.DiGraph()

def run(start_tasks):
    global task_graph

    for modname in glob.glob('modules/*.py'):
        __import__(os.path.splitext(modname)[0].replace('/','.'))

    task_queue = nx.topological_sort(task_graph, start_tasks)

    # Remove weakly-linked tasks here

    for t in task_queue:
        task_graph.node[t]['func']()

def task(func):
    task_graph.add_node(func.__name__, func=func)
    return func

def depends(*args): # strong dependency
    def add_deps(func):
        for dep in args:
            task_graph.add_edge(func.__name__, dep)
        return func
    return add_deps

def after(*args): # weak dependency
    def add_deps(func):
        for dep in args:
            task_graph.add_edge(func.__name__, dep, weak=True)
        return func
    return add_deps


# modules/mytasks.py
from tasklib import task, depends, after

@task
def process_data():
    # do some lengthy processing

@task
@depends('process_data')
def process_more_data():
    # do even more processing on the above output

@task
@after('process_data')
def generate_report():
    # generate a report on the processed data

Here, process_more_data needs up-to-date output from process_data before it can continue the job, so it uses a strong dependency. On the other hand, generate_report can do so using old data if run by itself, but if process_data is also scheduled, it doesn't make sense to generate a report on old data when new results are about to be produced; it shouldn't run until afterward.

Based on @BrenBarn's suggestion, I will probably try adding an additional step to run that removes weakly-linked tasks from task_queue unless they also exist in start_tasks.

Question

What my question boils down to is: Can I make edges that participate in the sort only if both nodes are in the starting node set? If so, how? If not, is there an alternative method or library that will achieve this effect?

I've considered writing a stripped-down network library with this feature for my own purposes, but I'm hoping there's a better way.

Extra Credit

Besides the above, I'd also like to support "post-dependencies," where one task implies that a dependency task runs after it — but I fear this is beyond the scope of a mere topo sort.

1

There are 1 best solutions below

1
On BEST ANSWER

One approach is to build a dependency graph containing a directed edge between A and B if and only if A strongly depends upon B, or A weakly depends upon B and B is strongly depended upon by some other task.

Here's some code that does just that. In this example, we'll consider six tasks, a through f:

import networkx as nx

deps = nx.DiGraph()
deps.add_nodes_from("abcdef")
deps.add_edges_from([("a","b"), ("c","a"), ("e","c")], is_strong=True)
deps.add_edges_from([("f","c"), ("d","c")], is_strong=False)

We can read this as a is a strong requirement of b, etc., and f is a weak requirement of c. The user will select tasks a and c to run. Since task a runs, we'll need c to run before it, and therefore also need e to run before c. Task d has no strong dependencies, but it is a weak dependency of c. Hence we'll want to run the tasks in the order: d, e, c, a.

First we'll extract the subgraph induced by throwing out all weak edges. We'll look at the ancestors of each scheduled task in this graph: this is the set of dependencies that we need to run. Then We'll just take the subgraph of the full dependency graph induced by these nodes. A topo sort on this graph yields our desired solution.

Here's the code:

def construct_task_graph(deps, tasks):
    required = set(tasks)
    strong_subgraph = extract_strong_subgraph(deps)
    for task in tasks:
        ancestors = nx.ancestors(strong_subgraph, task)
        required.update(ancestors)

    return nx.subgraph(deps, required)

def extract_strong_subgraph(deps):
    strong_deps = nx.DiGraph()
    strong_deps.add_nodes_from(deps)

    strong_edges = (e for e in deps.edges_iter(data=True) if e[2]["is_strong"])
    strong_deps.add_edges_from(strong_edges)

    return strong_deps

Note that construct_task_graph is a bit inefficient: tasks may be ancestors of tasks which were processed in earlier iterations, yielding duplicate requirements. These are thrown out by the set, but a more efficient way would be to implement something like a DFS.

In any case, we get:

>>> task_graph = construct_task_graph(deps, ["a", "d"])
>>> task_graph.nodes()
['a', 'c', 'e', 'd']
>>> task_graph.edges()
[('c', 'a'), ('e', 'c'), ('d', 'c')]

and most importantly:

>>> nx.topological_sort(task_graph)
['d', 'e', 'c', 'a']