https://prefect.io logo
Title
d

Dennis Hinnenkamp

02/16/2022, 10:34 AM
I have read that there is the possibility to merge two strands of a flow. But apparently this is only possible for conditional flows. Is there a way to merge two or more parallel tasks if successful? I am not so much interested in a task that is executed at the end. For example, the reference task of the flow is dependent on one task from each of the parallel tasks, then it would be nice in the UI if this is visualized in the schema. Example: Reference tasks of the flow are the last three import tasks. It would be cool if this could be visualized like in picture two. Currently I have created a dummy task with an
all_successful
trigger, whose upstream task again consists of the last three import tasks. I hope it is somewhat understandable what exactly I want to achieve 😃
a

Anna Geller

02/16/2022, 10:42 AM
You can do that using task dependencies. Can you share your flow?
Is there a way to merge two or more parallel tasks if successful?
you can merge the branches using dependencies e.g.
upstream_tasks
keyword, and you can influence what should run based on success/failure using
triggers
d

Dennis Hinnenkamp

02/16/2022, 10:51 AM
That's what I did so far
a

Anna Geller

02/16/2022, 11:07 AM
There were a couple of problems here: you didn't initialize your Databricks tasks and passing runtime specific values such as data dependencies from parameter tasks into init, which won't work. I fixed that by moving those arguments to runtime - the syntax for it is:
SomeTask(init_kwargs)(runtime_kwargs)
Also, to run those tasks in parallel, you need to use (local) dask executor. Lastly, as requested I added the task that runs once all databricks tasks are finished successfully. Also, if you are on Prefect Cloud, you can replace the Parameter task that contains the secret by PrefectSecret task for better security. Here is the Gist: https://gist.github.com/2033184d2d64b6ad61a4f86dd121d638 LMK if you have any questions about it.
d

Dennis Hinnenkamp

02/16/2022, 11:45 AM
You are right, I forgot the run parameters. I must have forgotten them when I cleaned up the example script. I will have a closer look at your adjustments and give you a feedback. But I understand correctly that I can only achieve the visualization as I imagine it if I create another task at the very end - probably a dummy without a real task - and configure the upstream_tasks for it?
a

Anna Geller

02/16/2022, 11:51 AM
what would you need this task for? if you don't need to do anything once your databricks job complete, then no need to add this task. It's only useful if you want to do something with it - or do you want just nice diamond-shaped flow chart due to visual preferences? 🙂
d

Dennis Hinnenkamp

02/16/2022, 11:56 AM
or do you want just nice diamond-shaped flow chart due to visual preferences?
That is exactly what I want to achieve. I also want to make it clear in the visualisation that the result of the flow depends on the results of the parallel tasks. This makes it easier for our customers to understand the flow even in e.g. 1 year.
a

Anna Geller

02/16/2022, 12:01 PM
In that case you're right that you need a dummy task. You can just do:
@task(trigger=all_successful, name="All Databricks Tasks Finished")
def reduce_step():
    pass
d

Dennis Hinnenkamp

02/16/2022, 12:23 PM
Perfect, then my first thought was the right one 😄 Thanks Anna!
👍 1