Alexis Lucido
10/29/2021, 8:36 AMAnna Geller
10/29/2021, 9:17 AMthe cleanest way to make a flow retry an upstream task if a downstream task failsyou can just add
max_retries
and retry_delay
to your task decorator -> the easiest way would be to set it for both tasks, or at least for the upstream task you mentioned which may need a retry:
from prefect import Flow, Parameter, task
from datetime import timedelta
@task(log_stdout=True, max_retries=5, retry_delay=timedelta(minutes=2))
def hello_world(user_input: str):
result = f"hello {user_input}"
print(result)
return result
Is there something like Airflow subdags, allowing to group tasks inside a flow?There is something MUCH better: instead of a SubDAG, you can call any other flow from a parent flow. There is a
StartFlowRun
task to do it, but also those 3 tasks can be helpful:
• `create_flow_run`(flow, project) → creates a FlowRun for a given flow, returns flow run ID
• `wait_for_flow_run`(flow_run_id) → waits for the FlowRun to complete
And if that’s what you meant, you can also configure retry for a StartFlowRun task, but note that this will simply retry only if the flow run failed to start. To retry FlowRun on failure of a child task, you would need task like `raise_flow_run_state`:
from prefect.engine.signals import signal_from_state
from prefect.backend import FlowRunView
@task
def raise_flow_run_state(flow_run: FlowRunView):
flow_run_state = flow_run.state
if not flow_run_state.is_successful():
exc = signal_from_state(flow_run_state)(
f"{flow_run.flow_run_id} finished in state {flow_run_state}"
)
raise exc
return flow_run
with Flow("MasterFlow") as flow:
staging_area_id = create_flow_run(flow_name="staging_area", project_name="Flow_of_Flows", run_name="staging_area")
staging_area_flow_run_view = wait_for_flow_run(staging_area_id)
raise_flow_run_state(staging_area_flow_run_view)
Here is documentation showing subflows: https://docs.prefect.io/core/idioms/flow-to-flow
And if you’re interested here is my old article comparing Airflow and Prefect in terms of SubDAG feature. But note that the task was previously named FlowRunTask (as shown in the post) and now it’s called StartFlowRun to align with other tasks like RenameFlowRun.Alexis Lucido
10/29/2021, 10:33 AMAnna Geller
10/29/2021, 10:39 AM