Background
I am writing a Python module to collect and schedule tasks for a nightly automatic update, with the option to interactively execute individual tasks. (The nightly updates run all tasks unconditionally.)
Some of these individual tasks depend on the execution of another task to function properly: e.g. if task A is scheduled (automatically or manually) but task B is not, then task B must be added to the schedule before A. I'm referring to these as strong dependencies, because they "pull in" a new task if necessary.
Other tasks depend on the output of another task, but can also operate on old output if that task is not scheduled; e.g. if task C is scheduled but task D is not, then D is not added, and C runs on old data. I'm referring to these as weak dependencies, because they're "too weak" to make changes to the schedule.
I've successfully implemented strong dependencies using nx.DiGraph
to build the dependency relationships and nx.topological_sort
to generate the final task schedule (with the requested tasks as the starting nodes). I'm unsure how to implement weak dependencies, however, because topological_sort
seems to always add nodes connected by any edge. I've even tried setting the edges' weight
to 0, but no luck.
Implementation
EDIT: Here's a rough example of my current implementation.
# tasklib.py
task_graph = nx.DiGraph()
def run(start_tasks):
global task_graph
for modname in glob.glob('modules/*.py'):
__import__(os.path.splitext(modname)[0].replace('/','.'))
task_queue = nx.topological_sort(task_graph, start_tasks)
# Remove weakly-linked tasks here
for t in task_queue:
task_graph.node[t]['func']()
def task(func):
task_graph.add_node(func.__name__, func=func)
return func
def depends(*args): # strong dependency
def add_deps(func):
for dep in args:
task_graph.add_edge(func.__name__, dep)
return func
return add_deps
def after(*args): # weak dependency
def add_deps(func):
for dep in args:
task_graph.add_edge(func.__name__, dep, weak=True)
return func
return add_deps
# modules/mytasks.py
from tasklib import task, depends, after
@task
def process_data():
# do some lengthy processing
@task
@depends('process_data')
def process_more_data():
# do even more processing on the above output
@task
@after('process_data')
def generate_report():
# generate a report on the processed data
Here, process_more_data
needs up-to-date output from process_data
before it can continue the job, so it uses a strong dependency. On the other hand, generate_report
can do so using old data if run by itself, but if process_data
is also scheduled, it doesn't make sense to generate a report on old data when new results are about to be produced; it shouldn't run until afterward.
Based on @BrenBarn's suggestion, I will probably try adding an additional step to run
that removes weakly-linked tasks from task_queue
unless they also exist in start_tasks
.
Question
What my question boils down to is: Can I make edges that participate in the sort only if both nodes are in the starting node set? If so, how? If not, is there an alternative method or library that will achieve this effect?
I've considered writing a stripped-down network library with this feature for my own purposes, but I'm hoping there's a better way.
Extra Credit
Besides the above, I'd also like to support "post-dependencies," where one task implies that a dependency task runs after it — but I fear this is beyond the scope of a mere topo sort.
One approach is to build a dependency graph containing a directed edge between
A
andB
if and only ifA
strongly depends uponB
, orA
weakly depends uponB
andB
is strongly depended upon by some other task.Here's some code that does just that. In this example, we'll consider six tasks,
a
throughf
:We can read this as
a
is a strong requirement ofb
, etc., andf
is a weak requirement ofc
. The user will select tasksa
andc
to run. Since taska
runs, we'll needc
to run before it, and therefore also neede
to run beforec
. Taskd
has no strong dependencies, but it is a weak dependency ofc
. Hence we'll want to run the tasks in the order:d, e, c, a
.First we'll extract the subgraph induced by throwing out all weak edges. We'll look at the ancestors of each scheduled task in this graph: this is the set of dependencies that we need to run. Then We'll just take the subgraph of the full dependency graph induced by these nodes. A topo sort on this graph yields our desired solution.
Here's the code:
Note that
construct_task_graph
is a bit inefficient: tasks may be ancestors of tasks which were processed in earlier iterations, yielding duplicate requirements. These are thrown out by the set, but a more efficient way would be to implement something like a DFS.In any case, we get:
and most importantly: