Bob Colner
03/15/2020, 11:06 PMwith Flow("core-etl", schedule=schedule) as flow:
events_itc()
dims_subscribers(
upstream_tasks=[events_subscriptions()]
)
dims_devices_made(
upstream_tasks=[dims_devices_made_installed, dims_adjust]
)
dims_devices_instasize(
upstream_tasks=[dims_devices_instasize_installed, dims_adjust]
)
dims_merged_subscribers(
upstream_tasks=[dims_devices_instasize, dims_subscribers]
)
flow.visualize()
dims_devices_instasize
and dims_subscribers
both run 2 times in the flow.Chris White
03/15/2020, 11:23 PMa
and b
are completely independent tasks:
with Flow("example") as flow:
a = my_task()
b = my_task()
dims_inst = dims_devices_instasize(
upstream_tasks=[dims_devices_instasize_installed, dims_adjust]
)
and use dims_inst
as the upstream dependency of dims_merged_subscribers
, you should be good to go (and see a different viz)Bob Colner
03/15/2020, 11:30 PMChris White
03/15/2020, 11:31 PMBob Colner
03/15/2020, 11:31 PMChris White
03/15/2020, 11:34 PMf() and f()
are two totally different function calls with distinct return values; this tutorial actually covers the distinction: https://docs.prefect.io/core/advanced_tutorials/task-guide.html#adding-tasks-to-flows
See the section header “Calling Tasks creates copies”Bob Colner
03/15/2020, 11:36 PMChris White
03/15/2020, 11:41 PM@task
def my_task():
return 42
with Flow("example") as flow:
# calling my_task produces a copy
# that is distinct from my_task
# but has the same runtime logic
a = my_task()
b = my_task(upstream_tasks=[a])
which is a flow with two tasks as can be seen in the following DAG viz:Bob Colner
03/15/2020, 11:45 PMChris White
03/15/2020, 11:48 PMBob Colner
03/15/2020, 11:51 PMChris White
03/15/2020, 11:58 PMwith Flow("core-etl", schedule=schedule) as flow:
sub_task = dims_subscribers(
upstream_tasks=[events_subscriptions]
)
made_task = dims_devices_made(
upstream_tasks=[dims_devices_made_installed, dims_adjust]
)
inst_task = dims_devices_instasize(
upstream_tasks=[dims_devices_instasize_installed, dims_adjust]
)
dims_merged_subscribers(
upstream_tasks=[inst_task, sub_task]
)
Bob Colner
03/16/2020, 12:01 AMChris White
03/16/2020, 12:04 AMBob Colner
03/16/2020, 12:05 AMChris White
03/16/2020, 12:06 AM