<@ULVA73B9P> How to get a task result from flow, a...
# ask-marvin
n
@Marvin How to get a task result from flow, after the flow ran
m
To retrieve a task result after a flow has run, you can access the
result
of the
State
object associated with the task. Here's how you can do it asynchronously:
Copy code
python
import asyncio
from prefect import flow, task

@task
async def my_task():
    return 1

@flow
async def my_flow():
    state = await my_task(return_state=True)
    # Here we fetch the result of the task
    result = await state.result(fetch=True)
    return result + 1

async def main():
    # We run the flow and fetch its result
    state = await my_flow(return_state=True)
    flow_result = await state.result(fetch=True)
    assert flow_result == 2

asyncio.run(main())
In the example above,
my_task
returns a result, which is then used within
my_flow
. The task's state is returned and the result is fetched with
state.result(fetch=True)
. With Prefect version 2.6.0 and onwards, result retrieval was automatic for persisted results. If you are using a Prefect version before 2.6.0, you would not need the
await
when using
State.result()
. More information on resolving task results and setting up async fetch behavior can be found in the Prefect documentation: - Results - Task Runners
n
^ this is mostly right, except • you shouldnt need
fetch=True
• you'll need
persist_result=True
on the flow or whchever specific tasks you want to have persisted results
n
@Nate Thanks for answering. Actually, i am trying to get a task result for testing. With prefect 1 I did:
Copy code
def get_result(f: Flow, run_state: State, task_name: str, task_invocation_number: int = 0):
    task = f.get_tasks(name=task_name)[task_invocation_number]
    return run_state.result[task]._result.value
And:
Copy code
def get_predict_conf_task_result(self, status) -> dict:
        return json.loads(get_result(self.flow, status, "prediction_params")['job_envs']['CONFIGS'])['predict_conf']
Not sure how to do that with Prefect 2
n
hmm not sure I totally understand the above when exactly are you trying to get your task result?
Copy code
In [1]: from prefect import flow, task

In [2]: @task
   ...: def foo(x):
   ...:     return x
   ...:

In [3]: @flow
   ...: def f():
   ...:     immediate_result = foo(42)
   ...:     assert immediate_result == 42
   ...:     state = foo(37, return_state=True)
   ...:     return state
   ...:

In [4]: state = f()
09:45:56.906 | INFO    | prefect.engine - Created flow run 'quixotic-moose' for flow 'f'
09:45:57.267 | INFO    | Flow run 'quixotic-moose' - Created task run 'foo-0' for task 'foo'
09:45:57.270 | INFO    | Flow run 'quixotic-moose' - Executing 'foo-0' immediately...
09:45:57.629 | INFO    | Task run 'foo-0' - Finished in state Completed()
09:45:57.775 | INFO    | Flow run 'quixotic-moose' - Created task run 'foo-1' for task 'foo'
09:45:57.777 | INFO    | Flow run 'quixotic-moose' - Executing 'foo-1' immediately...
09:45:58.186 | INFO    | Task run 'foo-1' - Finished in state Completed()
09:45:58.333 | INFO    | Flow run 'quixotic-moose' - Finished in state Completed('All states completed.')

In [5]: state.result()
Out[5]: 37
where note that if I wanted to be able to do
state.result()
in a completely different place, I would need
Copy code
@task(persist_result=True)
on my task
n
This is part of a flow:
Copy code
create_awaited_flow_run(
            flow=categorical_model_predict,
            name=categorical_model_predict_flow_name,
            tags=["ds", "categorical_model"],
            parameters=prediction_params(predict_start_conf)
        )
While
prediction_params
is a task, that I want to get the result of it inside a test that runs this flow. Do I need to mask it with
(persist_result=True)
?
n
sorry, I'm not sure because I dont know what
create_awaited_flow_run
is doing but in general, in prefect 2: • its not like prefect 1 where you can only access upstream task results in the context of another task. if you call a task
foo()
the return value will be the result of the task and you can immediately access that result. if you need to persist that
return
value across different processes, you'd need
persist_result=True
to serialize/save that
return
to disk so you can load it later from somewhere else via
state.result()
• both flows and tasks behave almost entirely just like regular python functions does that help?
n
Yes. I will try that. Thanks