Paweł Biernat
06/07/2024, 8:57 AMprefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
Marvin
06/07/2024, 8:57 AMMissingResult: State data is missing
typically arises when result persistence is disabled and the state is retrieved from the API. Here’s a possible approach to ensure your task fails if the deployed flow fails without running into the MissingResult
issue.
1. Check the Deployment State:
Instead of directly fetching flow_run.state.result()
, you can wait for the flow run to complete and then check its final state.
2. Use `wait_for_flow_run`:
Prefect provides a utility wait_for_flow_run
that can wait for a flow run to finish and retrieve its state safely. You should use this utility in combination with run_deployment
.
Here’s a code sample to achieve this:
python
from prefect import flow, task
from prefect.deployments import run_deployment
from prefect.states import State
@task
def execute_deployment():
# Run the deployment
flow_run = run_deployment(
name="my-deployment",
parameters={"param1": "value1"}
)
# Wait for the flow run to complete and get its state
state = flow_run.wait_for_flow_run()
# Check the state of the flow run
if state.is_failed():
raise RuntimeError("Deployed Flow Run has failed.")
return state
@flow
def parent_flow():
# Execute the deployment
deployment_state = execute_deployment()
return deployment_state
if __name__ == "__main__":
parent_flow()
### Explanations:
1. Run Deployment: run_deployment
submits the deployment to run and returns a flow run object.
2. Wait and Check State: wait_for_flow_run
ensures you wait and receive the final state of the flow run before proceeding. This will handle the missing result error seamlessly.
3. Raise Error: If the deployed flow run fails, raise an error to fail the task.
### Useful Links:
You can read more about running deployments and handling flow runs in Prefect’s documentation:
- Deployments
- Prefect API Documentation
By structuring your code this way, you can avoid the data persistence issues and ensure your task fails if the deployed flow fails.Paweł Biernat
06/07/2024, 9:00 AMMarvin
06/07/2024, 9:00 AMflow_run.state.result()
, you can implement custom retry logic using the retry_delay
and retry_attempts
parameters with a retry_condition_fn
. Below is an example of how to achieve this:
### Example Code
python
from prefect import flow, task
from prefect.deployments import run_deployment
from prefect.states import State
from prefect.exceptions import MissingResult
from datetime import timedelta
@task(retries=3, retry_delay=timedelta(seconds=10), retry_judge=retry_condition_fn)
def execute_deployment():
try:
# Run the deployment
flow_run = run_deployment(
name="my-deployment",
parameters={"param1": "value1"}
)
# Wait for the flow run to complete and get its state
state = flow_run.wait_for_flow_run()
# Check the state of the flow run
if state.is_failed():
raise RuntimeError("Deployed Flow Run has failed.")
return state
except MissingResult as e:
raise RuntimeError("Missing result:") from e
def retry_condition_fn(exception):
# Define conditions under which to retry
if isinstance(exception, RuntimeError) and "Deployed Flow Run has failed" in str(exception):
return True
return False
@flow
def parent_flow():
# Execute the deployment
deployment_state = execute_deployment()
return deployment_state
if __name__ == "__main__":
parent_flow()
### Explanation
1. Task Configuration: The execute_deployment
task is configured with retries and a custom retry_condition_fn
. This function allows specifying under what conditions a retry should be attempted.
2. Custom Exception Handling: Inside the task, any raised exceptions (e.g., MissingResult
) are handled, and a RuntimeError
is raised if the deployed flow state fails.
3. Retry Condition Function: retry_condition_fn
checks if the exception is the one we are looking to retry on and returns True
if it matches.
### Useful Links
- Retrying Tasks
- Running Deployments
By structuring your code in this manner, you can handle exceptions robustly and retry the task as needed without directly accessing the state result.Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by