Connor Keogh
05/23/2024, 11:20 PMMarvin
05/23/2024, 11:21 PMpython
from prefect import task, flow
from prefect.states import Completed, Failed
@task
def always_fails_task():
raise ValueError("I fail successfully")
@task
def always_succeeds_task():
print("I'm fail safe!")
return "success"
@flow
def always_succeeds_flow():
x = always_fails_task.submit()
y = always_succeeds_task.submit()
if y.result() == "success":
return Completed(message="I am happy with this result")
else:
return Failed(message="How did this happen!?")
if __name__ == "__main__":
always_succeeds_flow()
2. Using Automations for Reactive Actions:
Automations can be used to react to state changes externally. Here's an example:
python
from prefect.automations import Automation
from prefect.events.schemas.automations import EventTrigger
from prefect.server.events.actions import CancelFlowRun
from datetime import timedelta
auto_to_create = Automation(
name="Process failure reaction",
trigger=EventTrigger(
expect={"state.change"},
match={"state": "FAILED"},
posture="Reactive",
threshold=3,
within=timedelta(seconds=10),
),
actions=[CancelFlowRun()]
)
def test_create_automation():
auto_to_create.create()
if __name__ == "__main__":
test_create_automation()
Documentation Links:
- Returning States in Prefect
- Automations in Prefect
These examples show that you can have some handling within the flow itself, but more complex automations might be better handled using external triggers and actions.Connor Keogh
05/24/2024, 12:17 AM.result()
?Marvin
05/24/2024, 12:17 AM.result()
method can return various values depending on the state and output of the task or flow. Here is what you need to know:
1. Normal Task Return Values:
- Tasks typically return the value you explicitly return in your task code.
python
@task
def my_task():
return 1
result = my_task().result() # result will be 1
2. Asynchronous Fetching of Results:
- For asynchronous contexts, use await
to fetch results:
python
@flow
async def my_flow():
state = await my_task(return_state=True)
result = await state.result(fetch=True)
return result + 1
result = asyncio.run(my_flow()) # result will be 2
3. State Result Retrieval:
- You can retrieve the result of a flow or task's state:
python
from prefect import flow, task
@task
def example_task():
return "example result"
@flow
def example_flow():
state = example_task(return_state=True)
result = state.result()
return result
flow_result = example_flow() # flow_result will be "example result"
4. Use of Fetch Parameter:
- Prefect 2.6.0 introduced automatic retrieval of persisted results, controlled by `fetch=True`:
python
async def main():
state = await my_flow(return_state=True)
result = await state.result(fetch=True)
assert result == 2
asyncio.run(main())
5. Backward Compatibility:
- Prior to Prefect 2.6.0, State.result()
did not require an await
. With newer versions, State.result()
in an asynchronous context returns a raw result type unless opted-in with fetch=True
.
For further details, refer to the Prefect Documentation on Results.