I have two Luigi workflows, wfA and wfB. I set priority for each task of both Luigi workflow and all wfA task have a higher priority of wfB task.
If I run both workflow simultaneously with a --workers=1, each workflow will have one active task.
If I choose --workers=3 each workflow will have three active tasks but both workflows still independent, the priority field effect is limit to one workflow.
What I like to do is to add a global workers value (for all workflow) and to schedule tasks with highest priority (looking in all workflow).
So, in my example, if I set a global workers limit to 3 then run both workflow wfA and wfB, I want all task of wfA to be schedule (by group of 3 max) then all taks of wfB to be schedule because tasks of wfA have an higher priority compared to wfB tasks
Here is an example of wfA and wfB.
wfA:
import time
import luigi
class TaskA1(luigi.Task):
priority = 110
def output(self):
return luigi.LocalTarget("output1.txt")
def run(self):
with self.output().open('w') as f:
f.write("This is some dummy data")
time.sleep(8)
class TaskA2(luigi.Task):
priority = 111
def output(self):
return luigi.LocalTarget("output2.txt")
def run(self):
with self.output().open('w') as f:
f.write("This is some dummy data")
time.sleep(8)
class TaskA3(luigi.Task):
priority = 112
def output(self):
return luigi.LocalTarget("output3.txt")
def run(self):
with self.output().open('w') as f:
f.write("This is some dummy data")
time.sleep(8)
class TaskA4(luigi.Task):
priority = 113
def requires(self):
return [TaskA1(), TaskA2(), TaskA3()]
def output(self):
return luigi.LocalTarget("output4.txt")
def run(self):
with self.output().open('w') as f:
f.write("This is some dummy data")
time.sleep(8)
if __name__ == "__main__":
luigi.build([TaskA4()], workers=3)
wfB:
import time
import luigi
class TaskB1(luigi.Task):
priority = 10
def output(self):
return luigi.LocalTarget("output1.txt")
def run(self):
with self.output().open('w') as f:
f.write("This is some dummy data")
time.sleep(8)
class TaskB2(luigi.Task):
priority = 11
def output(self):
return luigi.LocalTarget("output2.txt")
def run(self):
with self.output().open('w') as f:
f.write("This is some dummy data")
time.sleep(8)
class TaskB3(luigi.Task):
priority = 12
def output(self):
return luigi.LocalTarget("output3.txt")
def run(self):
with self.output().open('w') as f:
f.write("This is some dummy data")
time.sleep(8)
class TaskB4(luigi.Task):
priority = 13
def requires(self):
return [TaskA1(), TaskA2(), TaskA3()]
def output(self):
return luigi.LocalTarget("output4.txt")
def run(self):
with self.output().open('w') as f:
f.write("This is some dummy data")
time.sleep(8)
if __name__ == "__main__":
luigi.build([TaskB4()], workers=3)
I want to execute both works simentaniously and the scheduler to schedule wfA tasks then wfB tasks.