How to pass parameters to a "Job as Task" from code?

343 Views Asked by At

I would like to use the new "Job as Task" feature (as mentioned in this SO answer) but I'm having trouble to pass values into that job.

Scenario

  • I have a workflow job which contains 2 tasks.
  • The workflow is scheduled to run every x minutes.
  • Task_A (type "Notebook"): Read data from a table and based on the contents decide, whether the workflow in Task_B should be executed (or not).
  • Task_B (type "Run Job"): This references another Workflow that itself consists of multiple tasks, all of type "Notebook". The workflow takes several parameters. For the sake of brevity here, let's just assume a parameter entity_ids. This workflow runs totally fine on its own if started manually.

For this scenario I would like to add some logic that decides how much of the workflow will be executed:

If Task_A finds specific information in its table, it should start the workflow in Task_B and provide it with a couple of parameters based on that information (in this example: a list of entity_ids). If that information is not found, the workflow should end gracefully and wait for the next interval.

My question: How do I pass (multiple) values into the job that is referenced in Task_B?

I had tried to set this with dbutils.jobs.taskValues.set("entity_id", "[1, 2]") in Task_A and read with dbutils.jobs.taskValues.get("Task_A", "entity_ids", debugValue="[]" ) in the first Notebook of the workflow in Task_B, but this throws an error within the nested job: Task key does not exist in run: Task_A.

My guess is that the nested workflow in Task_B is unaware of the parent workflow and might be run in a different context, and therefore cannot find taskKey == "Task_A".

To verify my assumption, I tried setting up a (test-only) Notebook that only reads the entity_idswith the get() function.

  • If I add it directly as a task, into the same workflow as Task_A, it works.
  • If I put it into its own workflow, and then reference that as a "Run Job" task, it fails with the aforementioned error.

In both cases, it is always the exact same Notebook.

2

There are 2 best solutions below

0
rainingdistros On BEST ANSWER

I tried your approach and can confirm that the behaviour of task variable when used in a "Run Job" Task is as you said.

Please check if the following alternative would work for you ?

  • In your Task_B (type "Run Job"), create a widget named entity_id

  • In your actual workflow, following Task_A, add a if then else block with a check on the task value. Refer to docs for further information.

    • For testing I tried using {{tasks.Tst_TaskVal_Set.values.entity_id}} != "" and then trigger the Run Job only if it is empty. You can click on Dynamic Values for format and other similar values.
  • The truth flow can be followed by the Run Job task where you can pass the task value {{tasks.Task_A.values.entity_ids}} to the widget as a parameter. I can confirm that this works.

Alternate Suggestion:

But considering your number of parameters, see if you can set a flag as task value, write entity_ids to a file in DBFS or cloud and the use the conditionals on the task value flag, and then read the file in the Task_B - Also there may be a limit on the number of characters that the task value/widget can accept.

1
JayashankarGS On

You cannot obtain task values set in one task in a following task of type Run Job. To get these values, you need to pass them as parameters using dynamic values and access them using the code below in your notebook, which is in another workflow job.

dbutils.widgets.get("entity_id")

enter image description here

Click on Browse dynamic values for more information.

{{tasks.[task_name].values.[value_name]}} is the dynamic expression for obtaining task values for a specific task and given key.

In your case, it is {{tasks.Task_A.values.entity_id}}

Output:

enter image description here

Here, B_tasks is another job.