Philipp
07/19/2024, 7:26 AMMarvin
07/19/2024, 7:26 AMpython
import asyncio
from prefect import flow, task
@task
async def my_task():
return 1
@flow
async def my_flow():
state = await my_task(return_state=True)
result = await state.result(fetch=True)
return result + 1
async def main():
state = await my_flow(return_state=True)
assert await state.result(fetch=True) == 2
asyncio.run(main())
Important details:
- Use return_state=True
in the task and flow calls to get the state object.
- Use await state.result(fetch=True)
to retrieve the result.
You can find more details in the Prefect documentation here:
https://docs.prefect.io/latest/concepts/results/
For practical example usage, refer to this link:
https://raw.githubusercontent.com/zzstoatzz/prefect-code-examples/main/examples/flows/basic.pyPhilipp
07/19/2024, 7:28 AMMarvin
07/19/2024, 7:28 AMwait_for_flow_run
and the Prefect client library, you can follow these steps:
Using wait_for_flow_run
and the Prefect Client Library:
1. Execute the Flow:
2. Wait for the Flow Run Completion:
3. Retrieve the Result:
Here's a sample code snippet:
python
import asyncio
from prefect.client import get_client
from prefect import flow, task
from prefect.states import State
@task(persist_result=True)
async def my_task():
return 42
@flow
async def my_flow():
return await my_task()
async def main():
async with get_client() as client:
# Create a new flow run
flow_run = await client.create_flow_run_from_deployment(
deployment_id="<deployment_id>",
parameters={}
)
# Wait for the flow run to complete
flow_run = await client.wait_for_flow_run(
flow_run.id,
retries=5,
retry_delay_seconds=10
)
# Fetch the state result
state = await client.read_flow_run(flow_run.id)
if state.state.result:
result = state.state.result()
print(f"Flow run result: {result}")
else:
print("No result found for the flow run")
asyncio.run(main())
Reading Persisted Results:
Results that are stored persistently contain all necessary metadata for retrieval.
Accessing Persistent Results:
python
import asyncio
from prefect import flow, task
from prefect.client import get_client
@task(persist_result=True)
async def my_task():
return "Persistent Result"
@flow(persist_result=True)
async def my_flow():
return await my_task()
async def main():
async with get_client() as client:
state = await my_flow(return_state=True)
# Retrieve the persistent result
if state.state.result:
result = await state.state.result().get() # Automatically retrieves and deserializes the persisted result
print(f"Persistent result: {result}")
else:
print("No persistent result found")
asyncio.run(main())
For more detailed examples and documentation, refer to:
- Using the Client: https://docs.prefect.io/latest/guides/using-the-client/
- Results in Prefect: https://docs.prefect.io/latest/concepts/results/
- Example Code: https://raw.githubusercontent.com/zzstoatzz/prefect-code-examples/main/examples/flows/async_basic.py