Tejas Shah
03/13/2025, 11:24 AMfrom prefect import flow, serve, allow_failure
@flow(log_prints=True, flow_run_name="child_flow_{flow_param}")
def child_flow(flow_param: int):
print(f"In flow {flow_param}")
if flow_param==5:
raise ValueError("child_flow_5 always fails")
@flow(log_prints=True)
def parent_flow():
for i in range(0,11):
allow_failure
if __name__ == "__main__":
parent_flow_deployment = parent_flow.to_deployment(name="parent_flow_dev_deployment")
serve(parent_flow_deployment)
Marvin
03/13/2025, 11:24 AMparent_flow
continues executing even if child_flow_5
fails, you should use the allow_failure
function correctly. The allow_failure
decorator allows a task or flow to fail without failing the entire flow run.
Here's a corrected version of your code:
python
from prefect import flow, serve, allow_failure
@flow(log_prints=True, flow_run_name="child_flow_{flow_param}")
def child_flow(flow_param: int):
print(f"In flow {flow_param}")
if flow_param == 5:
raise ValueError("child_flow_5 always fails")
@flow(log_prints=True)
def parent_flow():
for i in range(0, 11):
allow_failure(child_flow)(flow_param=i)
if __name__ == "__main__":
parent_flow_deployment = parent_flow.to_deployment(name="parent_flow_dev_deployment")
serve(parent_flow_deployment)
### Key Changes:
- Use allow_failure
: Wrap the child_flow
call with allow_failure
to let it fail without stopping the parent_flow
.
### Resources:
- Prefect 3.x API Reference
- Prefect 3.x Documentation
This should allow your parent_flow
to continue executing all child flows, even if child_flow_5
raises an exception. If you have any more questions or need further assistance, feel free to ask.Tejas Shah
03/13/2025, 11:26 AM'allow_failure' object is not callable
Marvin
03/13/2025, 11:26 AMallow_failure
. Let's correct that.
To allow a child flow to fail without stopping the parent flow, you can use the try-except
block directly within your flow. Here's how you can modify your parent_flow
to handle failures in `child_flow_5`:
python
from prefect import flow, serve
@flow(log_prints=True, flow_run_name="child_flow_{flow_param}")
def child_flow(flow_param: int):
print(f"In flow {flow_param}")
if flow_param == 5:
raise ValueError("child_flow_5 always fails")
@flow(log_prints=True)
def parent_flow():
for i in range(0, 11):
try:
child_flow(flow_param=i)
except Exception as e:
print(f"Child flow {i} failed with exception: {e}")
if __name__ == "__main__":
parent_flow_deployment = parent_flow.to_deployment(name="parent_flow_dev_deployment")
serve(parent_flow_deployment)
### Key Changes:
- Wrap child_flow
call in a try-except
block: This allows each child flow to fail independently without affecting the rest of the flow execution.
This approach should allow your parent_flow
to continue executing all child flows, even if one of them raises an exception. If you have further questions, feel free to ask.Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by