Hi everyone. Being a current (soon to be former) u...
# prefect-server
a
Hi everyone. Being a current (soon to be former) user of Airflow, I have decided to move our infrastructure to Prefect, and I am just starting to test some pipelines. They're so easy to configure. However, I am wondering what would be the cleanest way to make a flow retry an upstream task if a downstream task fails. Is there something like Airflow subdags, allowing to group tasks inside a flow? Thanks a lot
a
Hi @Alexis Lucido, great to hear that you’re moving to Prefect.
the cleanest way to make a flow retry an upstream task if a downstream task fails
you can just add
max_retries
and
retry_delay
to your task decorator -> the easiest way would be to set it for both tasks, or at least for the upstream task you mentioned which may need a retry:
Copy code
from prefect import Flow, Parameter, task
from datetime import timedelta

@task(log_stdout=True, max_retries=5, retry_delay=timedelta(minutes=2))
def hello_world(user_input: str):
    result = f"hello {user_input}"
    print(result)
    return result
Is there something like Airflow subdags, allowing to group tasks inside a flow?
There is something MUCH better: instead of a SubDAG, you can call any other flow from a parent flow. There is a
StartFlowRun
task to do it, but also those 3 tasks can be helpful: • `create_flow_run`(flow, project) → creates a FlowRun for a given flow, returns flow run ID • `wait_for_flow_run`(flow_run_id) → waits for the FlowRun to complete And if that’s what you meant, you can also configure retry for a StartFlowRun task, but note that this will simply retry only if the flow run failed to start. To retry FlowRun on failure of a child task, you would need task like `raise_flow_run_state`:
Copy code
from prefect.engine.signals import signal_from_state
from prefect.backend import FlowRunView


@task
def raise_flow_run_state(flow_run: FlowRunView):
    flow_run_state = flow_run.state
    if not flow_run_state.is_successful():
        exc = signal_from_state(flow_run_state)(
            f"{flow_run.flow_run_id} finished in state {flow_run_state}"
        )
        raise exc
    return flow_run


with Flow("MasterFlow") as flow:
    staging_area_id = create_flow_run(flow_name="staging_area", project_name="Flow_of_Flows", run_name="staging_area")
    staging_area_flow_run_view = wait_for_flow_run(staging_area_id)
    raise_flow_run_state(staging_area_flow_run_view)
Here is documentation showing subflows: https://docs.prefect.io/core/idioms/flow-to-flow And if you’re interested here is my old article comparing Airflow and Prefect in terms of SubDAG feature. But note that the task was previously named FlowRunTask (as shown in the post) and now it’s called StartFlowRun to align with other tasks like RenameFlowRun.
a
Thank you very much for your detailed answer Anna. Indeed, I just want to be able to retry an entire Flow on failure (and not just the task that failed). I understand I could do that with a task such as the raise_flow_run_state you defined. However, this task use prefect backend, so the cloud option, that we've not been using yet. Is there an equivalent option with prefect core?
a
Unfortunately, no. All of those FlowRun-related tasks require that your child flows are registered to the backend before you can call them from a parent flow. This will be easier in Orion, though, if you’re interested: https://orion-docs.prefect.io/concepts/flows/#subflows
upvote 1
@Alexis Lucido do you know that Prefect cloud offers 10000 free tasks each month? You can sign up, authenticate your terminal with API key, and use all the features we discussed.