https://prefect.io logo
Title
i

Ievgenii Martynenko

03/31/2022, 7:30 PM
Hi, I'm experimenting with best practices according to https://docs.prefect.io/core/concepts/best-practices.html#writing-prefect-code, and don't get why the following code doesn't build proper execution flow. It should be single top to bottom DAG, but instead I got some mess. Task are initialized outside on Flow imperatively; inside Flow tasks are executed, most of them have no output and dependency is set via .set_upstream.
with Flow("Test ETL Flow") as flow:
    start_task_result = start_task()
    truncate_task_result = truncate_task()
    fetch_task_result = fetch_task()
    update_template_result = update_template_task(variables=fetch_task_result)
    merge_task_result = merge_task()
    complete_task_result = complete_task()

    truncate_task.set_upstream(start_task_result)
    fetch_task.set_upstream(truncate_task_result)
    merge_task.set_upstream(update_template_result)
    complete_task.set_upstream(merge_task_result)

flow.visualize()
k

Kevin Kho

03/31/2022, 7:34 PM
I think the mess here is because of this:
truncate_task.set_upstream(start_task_result)
Instead you want:
truncate_task_result.set_upstream(start_task_result)
so that the upstream is set on the instance.
truncate_task.set_upstream(start_task_result)
refers to the task while
truncate_task_result
is the instance of the task
i

Ievgenii Martynenko

03/31/2022, 7:37 PM
<here should be "not bad" meme face". Thanks.
k

Kevin Kho

03/31/2022, 7:38 PM
Nice!