Paweł Biernat
06/07/2024, 8:57 AMprefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
Marvin
06/07/2024, 8:57 AMMissingResult: State data is missing
typically arises when result persistence is disabled and the state is retrieved from the API. Here’s a possible approach to ensure your task fails if the deployed flow fails without running into the MissingResult
issue.
1. Check the Deployment State:
Instead of directly fetching flow_run.state.result()
, you can wait for the flow run to complete and then check its final state.
2. Use `wait_for_flow_run`:
Prefect provides a utility wait_for_flow_run
that can wait for a flow run to finish and retrieve its state safely. You should use this utility in combination with run_deployment
.
Here’s a code sample to achieve this:
python
from prefect import flow, task
from prefect.deployments import run_deployment
from prefect.states import State
@task
def execute_deployment():
# Run the deployment
flow_run = run_deployment(
name="my-deployment",
parameters={"param1": "value1"}
)
# Wait for the flow run to complete and get its state
state = flow_run.wait_for_flow_run()
# Check the state of the flow run
if state.is_failed():
raise RuntimeError("Deployed Flow Run has failed.")
return state
@flow
def parent_flow():
# Execute the deployment
deployment_state = execute_deployment()
return deployment_state
if __name__ == "__main__":
parent_flow()
### Explanations:
1. Run Deployment: run_deployment
submits the deployment to run and returns a flow run object.
2. Wait and Check State: wait_for_flow_run
ensures you wait and receive the final state of the flow run before proceeding. This will handle the missing result error seamlessly.
3. Raise Error: If the deployed flow run fails, raise an error to fail the task.
### Useful Links:
You can read more about running deployments and handling flow runs in Prefect’s documentation:
- Deployments
- Prefect API Documentation
By structuring your code this way, you can avoid the data persistence issues and ensure your task fails if the deployed flow fails.Paweł Biernat
06/07/2024, 9:00 AMMarvin
06/07/2024, 9:00 AMflow_run.state.result()
, you can implement custom retry logic using the retry_delay
and retry_attempts
parameters with a retry_condition_fn
. Below is an example of how to achieve this:
### Example Code
python
from prefect import flow, task
from prefect.deployments import run_deployment
from prefect.states import State
from prefect.exceptions import MissingResult
from datetime import timedelta
@task(retries=3, retry_delay=timedelta(seconds=10), retry_judge=retry_condition_fn)
def execute_deployment():
try:
# Run the deployment
flow_run = run_deployment(
name="my-deployment",
parameters={"param1": "value1"}
)
# Wait for the flow run to complete and get its state
state = flow_run.wait_for_flow_run()
# Check the state of the flow run
if state.is_failed():
raise RuntimeError("Deployed Flow Run has failed.")
return state
except MissingResult as e:
raise RuntimeError("Missing result:") from e
def retry_condition_fn(exception):
# Define conditions under which to retry
if isinstance(exception, RuntimeError) and "Deployed Flow Run has failed" in str(exception):
return True
return False
@flow
def parent_flow():
# Execute the deployment
deployment_state = execute_deployment()
return deployment_state
if __name__ == "__main__":
parent_flow()
### Explanation
1. Task Configuration: The execute_deployment
task is configured with retries and a custom retry_condition_fn
. This function allows specifying under what conditions a retry should be attempted.
2. Custom Exception Handling: Inside the task, any raised exceptions (e.g., MissingResult
) are handled, and a RuntimeError
is raised if the deployed flow state fails.
3. Retry Condition Function: retry_condition_fn
checks if the exception is the one we are looking to retry on and returns True
if it matches.
### Useful Links
- Retrying Tasks
- Running Deployments
By structuring your code in this manner, you can handle exceptions robustly and retry the task as needed without directly accessing the state result.