luigi task based on ternary operator

104 Views Asked by At

I have a requirement where DAG looks like following


                   -> Task C -> Task D
                  | 
Task A -> Task B -
                  |
                   -> Task E -> Task F

Task B have a python ternary operator which decides what direction/leg to execute at runtime i.e. runtime flow will be either (Task C, Task D) or (Task E, Task F)

How can we define this under luigi task requires function ? Any idea or code snippet will be appreciated.

1

There are 1 best solutions below

0
On BEST ANSWER

You can define dynamic dependency.

I think in your example you'd have a task G that would depend on B and based on the output of B yield C and D or E and F

import luigi

class GTask(luigi.Task):
    def requires():
        return BTask()
    
    def run():
        if check_b_outout(self.input()):
            yield CTask()
            yield DTask()
        else:
            yield ETask()
            yield FTask()