What is the Prefect 2 equivalent of this? task_arg...
# prefect-community
What is the Prefect 2 equivalent of this? task_args={"name": "Get Data Part One"}. Context: I have 1 task that I use multiple times in the final flow run / final task, and I need to differentiate each one for the logs.
hi @Jason Motley - you can use
to do something like
Copy code
In [1]: from prefect import flow, task

In [2]: @task
   ...: def reusable_task():
   ...:     pass

In [3]: @flow
   ...: def my_flow():
   ...:     reusable_task.with_options(task_run_name="task 1")()
   ...:     reusable_task.with_options(task_run_name="task 2")()

In [4]: my_flow()
10:45:15.341 | INFO    | prefect.engine - Created flow run 'resourceful-crow' for flow 'my-flow'
10:45:18.195 | INFO    | Flow run 'resourceful-crow' - Created task run 'reusable_task-0' for task 'reusable_task'
10:45:18.197 | INFO    | Flow run 'resourceful-crow' - Executing 'reusable_task-0' immediately...
10:45:19.166 | INFO    | Task run 'task 1' - Finished in state Completed()
10:45:19.326 | INFO    | Flow run 'resourceful-crow' - Created task run 'reusable_task-1' for task 'reusable_task'
10:45:19.327 | INFO    | Flow run 'resourceful-crow' - Executing 'reusable_task-1' immediately...
10:45:20.339 | INFO    | Task run 'task 2' - Finished in state Completed()
10:45:20.626 | INFO    | Flow run 'resourceful-crow' - Finished in state Completed('All states completed.')
Thank you!
🦜 1
What is the equivalent of specifying "upstream tasks"?
E.g. upstream_tasks=[first_task]
Copy code
In [5]: @flow
   ...: def my_flow():
   ...:     x = reusable_task.with_options(task_run_name="task 1")()
   ...:     reusable_task.with_options(task_run_name="task 2")(wait_for=[x])
This doesn't seem to work as desired. If my reusable task returns a dataframe, I get an error that the dataframe object has no attribute
If you place
at the front you will then get "unexpected keyword argument" for any required arguments in the re-usable task.
hmm, can you share your code? it seems like you may be treating your task result as the task object itself
Here is the Prefect 1.0 item I am trying to replicate:
Copy code
area_1_comp = area_1_comparison()
dataframe_result = my_task(secret = PrefectSecret("my_secret"), 
                               area = "area1", sub_area="sub_area_2", 
                               source="source_origin", task_args={"name": "Get Area 1 Data"}, upstream_tasks=[area_1_comp])
The 2 issues are: 1. Upstream task designation 2. Custom task name for the re-usable task.
Copy code
In [8]: from prefect import flow, task

In [9]: import pandas as pd

In [10]: @task
    ...: def area_1_comp() -> pd.DataFrame:
    ...:     return pd.DataFrame([{"foo": "bar"}])

In [11]: @task
    ...: def reusable_task(df: pd.DataFrame):
    ...:     print(df)

In [12]: @flow
    ...: def my_flow():
    ...:     df = area_1_comp()
    ...:     reusable_task.with_options(task_run_name="task 2")(df)

In [13]: my_flow()

11:37:16.613 | INFO    | prefect.engine - Created flow run 'attractive-beluga' for flow 'my-flow'
11:37:17.282 | INFO    | Flow run 'attractive-beluga' - Created task run 'area_1_comp-0' for task 'area_1_comp'
11:37:17.284 | INFO    | Flow run 'attractive-beluga' - Executing 'area_1_comp-0' immediately...
11:37:17.755 | INFO    | Task run 'area_1_comp-0' - Finished in state Completed()
11:37:17.878 | INFO    | Flow run 'attractive-beluga' - Created task run 'reusable_task-0' for task 'reusable_task'
11:37:17.880 | INFO    | Flow run 'attractive-beluga' - Executing 'reusable_task-0' immediately...
0  bar
11:37:18.365 | INFO    | Task run 'task 2' - Finished in state Completed()
11:37:18.508 | INFO    | Flow run 'attractive-beluga' - Finished in state Completed('All states completed.')
note that here you don’t actually need to explicitly say to
the dataframe, since prefect knows that if you’re passing the result from your upstream task
into the downstream
, it necessarily needs to wait for that result anyways you could technically still say
, but it would be redundant
Copy code
In [13]: @flow
    ...: def my_flow():
    ...:     df = area_1_comp()
    ...:     reusable_task.with_options(task_run_name="task 2")(df, wait_for=[df])

In [14]: my_flow()

11:40:26.188 | INFO    | prefect.engine - Created flow run 'whimsical-marmot' for flow 'my-flow'
11:40:26.900 | INFO    | Flow run 'whimsical-marmot' - Created task run 'area_1_comp-0' for task 'area_1_comp'
11:40:26.902 | INFO    | Flow run 'whimsical-marmot' - Executing 'area_1_comp-0' immediately...
11:40:27.525 | INFO    | Task run 'area_1_comp-0' - Finished in state Completed()
11:40:27.737 | INFO    | Flow run 'whimsical-marmot' - Created task run 'reusable_task-0' for task 'reusable_task'
11:40:27.738 | INFO    | Flow run 'whimsical-marmot' - Executing 'reusable_task-0' immediately...
0  bar
11:40:28.316 | INFO    | Task run 'task 2' - Finished in state Completed()
11:40:28.442 | INFO    | Flow run 'whimsical-marmot' - Finished in state Completed('All states completed.')
Could you explain how you are using re-usable task here? In my example that is the task that returns a dataframe.
Adding "with options" causes keyword argument errors with the re-usable tasks required arguments.
Copy code
In [11]: @task
    ...: def reusable_task(df: pd.DataFrame):
    ...:     print(df)
i just printed the df, I assumed
was analogous to your
from your example
its worth clarifying that
returns a callable
object, which you should then call with the tasks expected kwargs like
Copy code
def foo(message: str):

def bar():
   new_foo = foo.with_options(task_run_name="my foo run")
so does this require that I have 2 lines per task when I'm using the with options?
line 1: foo with options line 2: new foo (that has the options) with the required arguments of re-usable foo?
no. like my example above, you can call the Task immediately after altering it if you want
Copy code
In [12]: @flow
    ...: def my_flow():
    ...:     df = area_1_comp()
    ...:     reusable_task.with_options(task_run_name="task 2")(df)
it might be more confusing to look at this way though
This might be an annoying ask but is it possible to write my code that I'm trying to replicate including the keyword arguments?
Copy code
def my_task(
   area = "area1",
) -> pd.DataFrame:
   # do something to make your df
   return pd.DataFrame()

def something_that_needs_a_df(df: pd.DataFrame):

def my_flow():

   some_df = my_task.with_options(task_run_name="specific task run name")( # now pass kwargs to modified version of `my_task`
      sub_area="some other value",
      source="some other value"
like this?
yup this is great, thnak you!
👍 1