Netanel Malka
05/02/2024, 2:33 PMMarvin
05/02/2024, 2:34 PMresult
of the State
object associated with the task. Here's how you can do it asynchronously:
python
import asyncio
from prefect import flow, task
@task
async def my_task():
return 1
@flow
async def my_flow():
state = await my_task(return_state=True)
# Here we fetch the result of the task
result = await state.result(fetch=True)
return result + 1
async def main():
# We run the flow and fetch its result
state = await my_flow(return_state=True)
flow_result = await state.result(fetch=True)
assert flow_result == 2
asyncio.run(main())
In the example above, my_task
returns a result, which is then used within my_flow
. The task's state is returned and the result is fetched with state.result(fetch=True)
. With Prefect version 2.6.0 and onwards, result retrieval was automatic for persisted results. If you are using a Prefect version before 2.6.0, you would not need the await
when using State.result()
.
More information on resolving task results and setting up async fetch behavior can be found in the Prefect documentation:
- Results
- Task RunnersNate
05/02/2024, 2:36 PMfetch=True
• you'll need persist_result=True
on the flow or whchever specific tasks you want to have persisted resultsNetanel Malka
05/02/2024, 2:38 PMdef get_result(f: Flow, run_state: State, task_name: str, task_invocation_number: int = 0):
task = f.get_tasks(name=task_name)[task_invocation_number]
return run_state.result[task]._result.value
And:
def get_predict_conf_task_result(self, status) -> dict:
return json.loads(get_result(self.flow, status, "prediction_params")['job_envs']['CONFIGS'])['predict_conf']
Not sure how to do that with Prefect 2Nate
05/02/2024, 2:47 PMIn [1]: from prefect import flow, task
In [2]: @task
...: def foo(x):
...: return x
...:
In [3]: @flow
...: def f():
...: immediate_result = foo(42)
...: assert immediate_result == 42
...: state = foo(37, return_state=True)
...: return state
...:
In [4]: state = f()
09:45:56.906 | INFO | prefect.engine - Created flow run 'quixotic-moose' for flow 'f'
09:45:57.267 | INFO | Flow run 'quixotic-moose' - Created task run 'foo-0' for task 'foo'
09:45:57.270 | INFO | Flow run 'quixotic-moose' - Executing 'foo-0' immediately...
09:45:57.629 | INFO | Task run 'foo-0' - Finished in state Completed()
09:45:57.775 | INFO | Flow run 'quixotic-moose' - Created task run 'foo-1' for task 'foo'
09:45:57.777 | INFO | Flow run 'quixotic-moose' - Executing 'foo-1' immediately...
09:45:58.186 | INFO | Task run 'foo-1' - Finished in state Completed()
09:45:58.333 | INFO | Flow run 'quixotic-moose' - Finished in state Completed('All states completed.')
In [5]: state.result()
Out[5]: 37
where note that if I wanted to be able to do state.result()
in a completely different place, I would need
@task(persist_result=True)
on my taskNetanel Malka
05/02/2024, 2:53 PMcreate_awaited_flow_run(
flow=categorical_model_predict,
name=categorical_model_predict_flow_name,
tags=["ds", "categorical_model"],
parameters=prediction_params(predict_start_conf)
)
While prediction_params
is a task, that I want to get the result of it inside a test that runs this flow.
Do I need to mask it with (persist_result=True)
?Nate
05/02/2024, 2:57 PMcreate_awaited_flow_run
is doing
but in general, in prefect 2:
• its not like prefect 1 where you can only access upstream task results in the context of another task. if you call a task foo()
the return value will be the result of the task and you can immediately access that result. if you need to persist that return
value across different processes, you'd need persist_result=True
to serialize/save that return
to disk so you can load it later from somewhere else via state.result()
• both flows and tasks behave almost entirely just like regular python functions
does that help?Netanel Malka
05/02/2024, 5:04 PM