Executing a solid when at least one of the required inputs is given

640 Views Asked by At

As an input, I would like to retrieve data based on user input, or "randomly" from a DB if no user input is given. All other downstream tasks of the pipeline would be the same.

Therefore, I would like to create a pipeline starting with solids A and B, and a downstream solid C executed based on input from solid A OR solid B.
However, when using conditional outputs on solids A and B, solid C is not executed, as one input is not generated by upstream solids.

Is there a simple way of doing this that I am missing out?

Thanks for your help.

2

There are 2 best solutions below

0
On BEST ANSWER

"fan-in" dependencies will not skip unless all the fanned in outputs were skipped, so that is one way to accomplish this.

@pipeline
def example():
  maybe_a = A()
  maybe_b = B()
  C(items=[maybe_a, maybe_b])

https://docs.dagster.io/examples/fan_in_pipeline#main

0
On

Working off of @Alex's suggestion, here's an executable pipeline:

from dagster import pipeline, solid


@solid()
def A(_):
    return


@solid()
def B(_):
    return

@solid()
def C(context, results):
    context.log.info("a returned {}".format(results[0]))
    context.log.info("b returned {}".format(results[1]))
    context.log.info("compute c")


@pipeline
def my_pipeline():
    a = A()
    b = B()
    C([a, b])