Jason Motley
05/12/2023, 3:34 PMNate
05/12/2023, 3:45 PMwith_options
to do something like
In [1]: from prefect import flow, task
In [2]: @task
...: def reusable_task():
...: pass
...:
In [3]: @flow
...: def my_flow():
...: reusable_task.with_options(task_run_name="task 1")()
...: reusable_task.with_options(task_run_name="task 2")()
...:
In [4]: my_flow()
10:45:15.341 | INFO | prefect.engine - Created flow run 'resourceful-crow' for flow 'my-flow'
10:45:18.195 | INFO | Flow run 'resourceful-crow' - Created task run 'reusable_task-0' for task 'reusable_task'
10:45:18.197 | INFO | Flow run 'resourceful-crow' - Executing 'reusable_task-0' immediately...
10:45:19.166 | INFO | Task run 'task 1' - Finished in state Completed()
10:45:19.326 | INFO | Flow run 'resourceful-crow' - Created task run 'reusable_task-1' for task 'reusable_task'
10:45:19.327 | INFO | Flow run 'resourceful-crow' - Executing 'reusable_task-1' immediately...
10:45:20.339 | INFO | Task run 'task 2' - Finished in state Completed()
10:45:20.626 | INFO | Flow run 'resourceful-crow' - Finished in state Completed('All states completed.')
Jason Motley
05/12/2023, 3:49 PMNate
05/12/2023, 4:05 PMwait_for
so
In [5]: @flow
...: def my_flow():
...: x = reusable_task.with_options(task_run_name="task 1")()
...: reusable_task.with_options(task_run_name="task 2")(wait_for=[x])
...:
Jason Motley
05/12/2023, 4:14 PMwith_options
.with_options
at the front you will then get "unexpected keyword argument" for any required arguments in the re-usable task.Nate
05/12/2023, 4:28 PMJason Motley
05/12/2023, 4:31 PMarea_1_comp = area_1_comparison()
dataframe_result = my_task(secret = PrefectSecret("my_secret"),
area = "area1", sub_area="sub_area_2",
source="source_origin", task_args={"name": "Get Area 1 Data"}, upstream_tasks=[area_1_comp])
Nate
05/12/2023, 4:41 PMIn [8]: from prefect import flow, task
In [9]: import pandas as pd
In [10]: @task
...: def area_1_comp() -> pd.DataFrame:
...: return pd.DataFrame([{"foo": "bar"}])
...:
In [11]: @task
...: def reusable_task(df: pd.DataFrame):
...: print(df)
...:
In [12]: @flow
...: def my_flow():
...: df = area_1_comp()
...: reusable_task.with_options(task_run_name="task 2")(df)
...:
In [13]: my_flow()
11:37:16.613 | INFO | prefect.engine - Created flow run 'attractive-beluga' for flow 'my-flow'
11:37:17.282 | INFO | Flow run 'attractive-beluga' - Created task run 'area_1_comp-0' for task 'area_1_comp'
11:37:17.284 | INFO | Flow run 'attractive-beluga' - Executing 'area_1_comp-0' immediately...
11:37:17.755 | INFO | Task run 'area_1_comp-0' - Finished in state Completed()
11:37:17.878 | INFO | Flow run 'attractive-beluga' - Created task run 'reusable_task-0' for task 'reusable_task'
11:37:17.880 | INFO | Flow run 'attractive-beluga' - Executing 'reusable_task-0' immediately...
foo
0 bar
11:37:18.365 | INFO | Task run 'task 2' - Finished in state Completed()
11:37:18.508 | INFO | Flow run 'attractive-beluga' - Finished in state Completed('All states completed.')
note that here you don’t actually need to explicitly say to wait_for
the dataframe, since prefect knows that if you’re passing the result from your upstream task area_1_comp
into the downstream reusable_task
, it necessarily needs to wait for that result anyways
you could technically still say wait_for
, but it would be redundant
In [13]: @flow
...: def my_flow():
...: df = area_1_comp()
...: reusable_task.with_options(task_run_name="task 2")(df, wait_for=[df])
...:
In [14]: my_flow()
11:40:26.188 | INFO | prefect.engine - Created flow run 'whimsical-marmot' for flow 'my-flow'
11:40:26.900 | INFO | Flow run 'whimsical-marmot' - Created task run 'area_1_comp-0' for task 'area_1_comp'
11:40:26.902 | INFO | Flow run 'whimsical-marmot' - Executing 'area_1_comp-0' immediately...
11:40:27.525 | INFO | Task run 'area_1_comp-0' - Finished in state Completed()
11:40:27.737 | INFO | Flow run 'whimsical-marmot' - Created task run 'reusable_task-0' for task 'reusable_task'
11:40:27.738 | INFO | Flow run 'whimsical-marmot' - Executing 'reusable_task-0' immediately...
foo
0 bar
11:40:28.316 | INFO | Task run 'task 2' - Finished in state Completed()
11:40:28.442 | INFO | Flow run 'whimsical-marmot' - Finished in state Completed('All states completed.')
Jason Motley
05/12/2023, 4:43 PMNate
05/12/2023, 4:44 PMIn [11]: @task
...: def reusable_task(df: pd.DataFrame):
...: print(df)
...:
i just printed the df, I assumed reusable_task
was analogous to your my_task
from your examplewith_options
returns a callable Task
object, which you should then call with the tasks expected kwargs like
@task
def foo(message: str):
print(message)
@flow
def bar():
new_foo = foo.with_options(task_run_name="my foo run")
new_foo("hi!")
Jason Motley
05/12/2023, 4:47 PMNate
05/12/2023, 4:48 PMIn [12]: @flow
...: def my_flow():
...: df = area_1_comp()
...: reusable_task.with_options(task_run_name="task 2")(df)
...:
Jason Motley
05/12/2023, 4:49 PMNate
05/12/2023, 4:57 PM@task
def my_task(
area = "area1",
sub_area="sub_area_2",
source="source_origin",
) -> pd.DataFrame:
# do something to make your df
return pd.DataFrame()
@task
def something_that_needs_a_df(df: pd.DataFrame):
print(df)
@flow
def my_flow():
some_df = my_task.with_options(task_run_name="specific task run name")( # now pass kwargs to modified version of `my_task`
area="3.1415",
sub_area="some other value",
source="some other value"
)
something_that_needs_a_df(df=some_df)
like this?Jason Motley
05/12/2023, 5:04 PM