What is the Prefect 2 equivalent of this? task_arg...
# prefect-community
j
What is the Prefect 2 equivalent of this? task_args={"name": "Get Data Part One"}. Context: I have 1 task that I use multiple times in the final flow run / final task, and I need to differentiate each one for the logs.
n
hi @Jason Motley - you can use
with_options
to do something like
Copy code
In [1]: from prefect import flow, task

In [2]: @task
   ...: def reusable_task():
   ...:     pass
   ...:

In [3]: @flow
   ...: def my_flow():
   ...:     reusable_task.with_options(task_run_name="task 1")()
   ...:     reusable_task.with_options(task_run_name="task 2")()
   ...:

In [4]: my_flow()
10:45:15.341 | INFO    | prefect.engine - Created flow run 'resourceful-crow' for flow 'my-flow'
10:45:18.195 | INFO    | Flow run 'resourceful-crow' - Created task run 'reusable_task-0' for task 'reusable_task'
10:45:18.197 | INFO    | Flow run 'resourceful-crow' - Executing 'reusable_task-0' immediately...
10:45:19.166 | INFO    | Task run 'task 1' - Finished in state Completed()
10:45:19.326 | INFO    | Flow run 'resourceful-crow' - Created task run 'reusable_task-1' for task 'reusable_task'
10:45:19.327 | INFO    | Flow run 'resourceful-crow' - Executing 'reusable_task-1' immediately...
10:45:20.339 | INFO    | Task run 'task 2' - Finished in state Completed()
10:45:20.626 | INFO    | Flow run 'resourceful-crow' - Finished in state Completed('All states completed.')
j
Thank you!
🦜 1
What is the equivalent of specifying "upstream tasks"?
E.g. upstream_tasks=[first_task]
n
wait_for
so
Copy code
In [5]: @flow
   ...: def my_flow():
   ...:     x = reusable_task.with_options(task_run_name="task 1")()
   ...:     reusable_task.with_options(task_run_name="task 2")(wait_for=[x])
   ...:
j
This doesn't seem to work as desired. If my reusable task returns a dataframe, I get an error that the dataframe object has no attribute
with_options
.
If you place
with_options
at the front you will then get "unexpected keyword argument" for any required arguments in the re-usable task.
n
hmm, can you share your code? it seems like you may be treating your task result as the task object itself
j
Here is the Prefect 1.0 item I am trying to replicate:
Copy code
area_1_comp = area_1_comparison()
dataframe_result = my_task(secret = PrefectSecret("my_secret"), 
                               area = "area1", sub_area="sub_area_2", 
                               source="source_origin", task_args={"name": "Get Area 1 Data"}, upstream_tasks=[area_1_comp])
The 2 issues are: 1. Upstream task designation 2. Custom task name for the re-usable task.
n
Copy code
In [8]: from prefect import flow, task

In [9]: import pandas as pd

In [10]: @task
    ...: def area_1_comp() -> pd.DataFrame:
    ...:     return pd.DataFrame([{"foo": "bar"}])
    ...:

In [11]: @task
    ...: def reusable_task(df: pd.DataFrame):
    ...:     print(df)
    ...:

In [12]: @flow
    ...: def my_flow():
    ...:     df = area_1_comp()
    ...:     reusable_task.with_options(task_run_name="task 2")(df)
    ...:

In [13]: my_flow()

11:37:16.613 | INFO    | prefect.engine - Created flow run 'attractive-beluga' for flow 'my-flow'
11:37:17.282 | INFO    | Flow run 'attractive-beluga' - Created task run 'area_1_comp-0' for task 'area_1_comp'
11:37:17.284 | INFO    | Flow run 'attractive-beluga' - Executing 'area_1_comp-0' immediately...
11:37:17.755 | INFO    | Task run 'area_1_comp-0' - Finished in state Completed()
11:37:17.878 | INFO    | Flow run 'attractive-beluga' - Created task run 'reusable_task-0' for task 'reusable_task'
11:37:17.880 | INFO    | Flow run 'attractive-beluga' - Executing 'reusable_task-0' immediately...
   foo
0  bar
11:37:18.365 | INFO    | Task run 'task 2' - Finished in state Completed()
11:37:18.508 | INFO    | Flow run 'attractive-beluga' - Finished in state Completed('All states completed.')
note that here you don’t actually need to explicitly say to
wait_for
the dataframe, since prefect knows that if you’re passing the result from your upstream task
area_1_comp
into the downstream
reusable_task
, it necessarily needs to wait for that result anyways you could technically still say
wait_for
, but it would be redundant
Copy code
In [13]: @flow
    ...: def my_flow():
    ...:     df = area_1_comp()
    ...:     reusable_task.with_options(task_run_name="task 2")(df, wait_for=[df])
    ...:


In [14]: my_flow()

11:40:26.188 | INFO    | prefect.engine - Created flow run 'whimsical-marmot' for flow 'my-flow'
11:40:26.900 | INFO    | Flow run 'whimsical-marmot' - Created task run 'area_1_comp-0' for task 'area_1_comp'
11:40:26.902 | INFO    | Flow run 'whimsical-marmot' - Executing 'area_1_comp-0' immediately...
11:40:27.525 | INFO    | Task run 'area_1_comp-0' - Finished in state Completed()
11:40:27.737 | INFO    | Flow run 'whimsical-marmot' - Created task run 'reusable_task-0' for task 'reusable_task'
11:40:27.738 | INFO    | Flow run 'whimsical-marmot' - Executing 'reusable_task-0' immediately...
   foo
0  bar
11:40:28.316 | INFO    | Task run 'task 2' - Finished in state Completed()
11:40:28.442 | INFO    | Flow run 'whimsical-marmot' - Finished in state Completed('All states completed.')
j
Could you explain how you are using re-usable task here? In my example that is the task that returns a dataframe.
Adding "with options" causes keyword argument errors with the re-usable tasks required arguments.
n
Copy code
In [11]: @task
    ...: def reusable_task(df: pd.DataFrame):
    ...:     print(df)
    ...:
i just printed the df, I assumed
reusable_task
was analogous to your
my_task
from your example
its worth clarifying that
with_options
returns a callable
Task
object, which you should then call with the tasks expected kwargs like
Copy code
@task
def foo(message: str):
   print(message)

@flow
def bar():
   new_foo = foo.with_options(task_run_name="my foo run")
   new_foo("hi!")
j
so does this require that I have 2 lines per task when I'm using the with options?
line 1: foo with options line 2: new foo (that has the options) with the required arguments of re-usable foo?
n
no. like my example above, you can call the Task immediately after altering it if you want
Copy code
In [12]: @flow
    ...: def my_flow():
    ...:     df = area_1_comp()
    ...:     reusable_task.with_options(task_run_name="task 2")(df)
    ...:
it might be more confusing to look at this way though
j
This might be an annoying ask but is it possible to write my code that I'm trying to replicate including the keyword arguments?
n
Copy code
@task
def my_task(
   area = "area1",
   sub_area="sub_area_2",
   source="source_origin",
) -> pd.DataFrame:
   # do something to make your df
   return pd.DataFrame()

@task
def something_that_needs_a_df(df: pd.DataFrame):
   print(df)

@flow
def my_flow():

   some_df = my_task.with_options(task_run_name="specific task run name")( # now pass kwargs to modified version of `my_task`
      area="3.1415", 
      sub_area="some other value",
      source="some other value"
   )
    
   something_that_needs_a_df(df=some_df)
like this?
j
yup this is great, thnak you!
👍 1