Hey folks :simple_smile: We recently started using...
# ask-community
a
Hey folks simple smile We recently started using
create_flow_run
and
wait_for_flow_run
to replace the (legacy?)
StartFlowRun
. The FlowRun doc page warns about this feature being experimental and it may be subject to change in the future. Would it be possible to know what module parts can be considered stable and what parts are to be considered under review/refactoring? This would be super helpful for us to understand how to approach future upgrades of our Prefect Core deployment. cc @Kevin Kho @Zanie @grandimk
a
@ale to the best of my knowledge, only the
FlowRunView
is experimental in the sense that perhaps you shouldn’t use FlowRunView directly in your production flows because it’s a backend abstraction. The tasks themselves (
create_flow_run
,
wait_for_flow_run
,
get_task_run_result
) are not experimental. You can think of it this way:
FlowRunView
is a backend implementation behind those tasks, but even if new version would use something else in the backend (e.g. running a GraphQL query instead of
Copy code
flow_run = FlowRunView.from_flow_run_id(flow_run_id)
), the tasks themselves should work the same way.
and btw, I wouldn’t classify
StartFlowRun
as legacy at all - for some use cases it’s quite useful. e.g. imagine a flow in which you would need to create many flow runs that must run sequentially.
StartFlowRun
is much less verbose in such use case:
Copy code
from prefect import Flow
from prefect.tasks.prefect import StartFlowRun

FLOW_NAME = "..."
STORAGE = ...
PROJECT_NAME = "..."
start_flow_run = StartFlowRun(project_name=PROJECT_NAME, wait=True)


with Flow(FLOW_NAME, storage=STORAGE) as flow:
    staging = start_flow_run(flow_name="01_extract_load", task_args={"name": "Stage"})
    dbt_run = start_flow_run(flow_name="02_dbt_snowflake", task_args={"name": "DBT"})
    dashboards = start_flow_run(flow_name="03_dashboards", task_args={"name": "Refresh"})

    staging.set_downstream(dbt_run)
    dbt_run.set_downstream(dashboards)
The same with those new tasks would require much more code:
Copy code
with Flow(FLOW_NAME, storage=STORAGE) as flow:
    extract_load_id = create_flow_run(
        flow_name="01_extract_load",
        project_name=PROJECT_NAME,
        task_args={"name": "Staging"},
    )
    extract_load_wait_task = wait_for_flow_run(
        extract_load_id, task_args={"name": "Staging - wait"}
    )

    transform_id = create_flow_run(
        flow_name="02_dbt", project_name=PROJECT_NAME, task_args={"name": "DBT flow"}
    )
    transform_id_wait_task = wait_for_flow_run(
        transform_id, task_args={"name": "DBT flow - wait"}
    )
    extract_load_wait_task.set_downstream(transform_id)

    dashboards_id = create_flow_run(
        flow_name="03_dashboards",
        project_name=PROJECT_NAME,
        task_args={"name": "Dashboards"},
    )
    dashboards_wait_task = wait_for_flow_run(
        dashboards_id, task_args={"name": "Dashboards - wait"}
    )
    transform_id_wait_task.set_downstream(dashboards_id)
So there is no right or wrong here, you can use what best suits your use case and your preferences.
a
Thanks a lot @Anna Geller 🙌