ale

    ale

    10 months ago
    Hey folks 😒imple_smile: We recently started using
    create_flow_run
    and
    wait_for_flow_run
    to replace the (legacy?)
    StartFlowRun
    . The FlowRun doc page warns about this feature being experimental and it may be subject to change in the future. Would it be possible to know what module parts can be considered stable and what parts are to be considered under review/refactoring? This would be super helpful for us to understand how to approach future upgrades of our Prefect Core deployment. cc @Kevin Kho @Michael Adkins @grandimk
    Anna Geller

    Anna Geller

    10 months ago
    @ale to the best of my knowledge, only the
    FlowRunView
    is experimental in the sense that perhaps you shouldn’t use FlowRunView directly in your production flows because it’s a backend abstraction. The tasks themselves (
    create_flow_run
    ,
    wait_for_flow_run
    ,
    get_task_run_result
    ) are not experimental. You can think of it this way:
    FlowRunView
    is a backend implementation behind those tasks, but even if new version would use something else in the backend (e.g. running a GraphQL query instead of
    flow_run = FlowRunView.from_flow_run_id(flow_run_id)
    ), the tasks themselves should work the same way.
    and btw, I wouldn’t classify
    StartFlowRun
    as legacy at all - for some use cases it’s quite useful. e.g. imagine a flow in which you would need to create many flow runs that must run sequentially.
    StartFlowRun
    is much less verbose in such use case:
    from prefect import Flow
    from prefect.tasks.prefect import StartFlowRun
    
    FLOW_NAME = "..."
    STORAGE = ...
    PROJECT_NAME = "..."
    start_flow_run = StartFlowRun(project_name=PROJECT_NAME, wait=True)
    
    
    with Flow(FLOW_NAME, storage=STORAGE) as flow:
        staging = start_flow_run(flow_name="01_extract_load", task_args={"name": "Stage"})
        dbt_run = start_flow_run(flow_name="02_dbt_snowflake", task_args={"name": "DBT"})
        dashboards = start_flow_run(flow_name="03_dashboards", task_args={"name": "Refresh"})
    
        staging.set_downstream(dbt_run)
        dbt_run.set_downstream(dashboards)
    The same with those new tasks would require much more code:
    with Flow(FLOW_NAME, storage=STORAGE) as flow:
        extract_load_id = create_flow_run(
            flow_name="01_extract_load",
            project_name=PROJECT_NAME,
            task_args={"name": "Staging"},
        )
        extract_load_wait_task = wait_for_flow_run(
            extract_load_id, task_args={"name": "Staging - wait"}
        )
    
        transform_id = create_flow_run(
            flow_name="02_dbt", project_name=PROJECT_NAME, task_args={"name": "DBT flow"}
        )
        transform_id_wait_task = wait_for_flow_run(
            transform_id, task_args={"name": "DBT flow - wait"}
        )
        extract_load_wait_task.set_downstream(transform_id)
    
        dashboards_id = create_flow_run(
            flow_name="03_dashboards",
            project_name=PROJECT_NAME,
            task_args={"name": "Dashboards"},
        )
        dashboards_wait_task = wait_for_flow_run(
            dashboards_id, task_args={"name": "Dashboards - wait"}
        )
        transform_id_wait_task.set_downstream(dashboards_id)
    So there is no right or wrong here, you can use what best suits your use case and your preferences.
    ale

    ale

    10 months ago
    Thanks a lot @Anna Geller 🙌