ale
11/08/2021, 11:13 AMcreate_flow_run
and wait_for_flow_run
to replace the (legacy?) StartFlowRun
.
The FlowRun doc page warns about this feature being experimental and it may be subject to change in the future.
Would it be possible to know what module parts can be considered stable and what parts are to be considered under review/refactoring?
This would be super helpful for us to understand how to approach future upgrades of our Prefect Core deployment.
cc @Kevin Kho @Zanie @grandimkAnna Geller
FlowRunView
is experimental in the sense that perhaps you shouldn’t use FlowRunView directly in your production flows because it’s a backend abstraction. The tasks themselves (create_flow_run
, wait_for_flow_run
, get_task_run_result
) are not experimental.
You can think of it this way: FlowRunView
is a backend implementation behind those tasks, but even if new version would use something else in the backend (e.g. running a GraphQL query instead of
flow_run = FlowRunView.from_flow_run_id(flow_run_id)
), the tasks themselves should work the same way.Anna Geller
StartFlowRun
as legacy at all - for some use cases it’s quite useful. e.g. imagine a flow in which you would need to create many flow runs that must run sequentially. StartFlowRun
is much less verbose in such use case:
from prefect import Flow
from prefect.tasks.prefect import StartFlowRun
FLOW_NAME = "..."
STORAGE = ...
PROJECT_NAME = "..."
start_flow_run = StartFlowRun(project_name=PROJECT_NAME, wait=True)
with Flow(FLOW_NAME, storage=STORAGE) as flow:
staging = start_flow_run(flow_name="01_extract_load", task_args={"name": "Stage"})
dbt_run = start_flow_run(flow_name="02_dbt_snowflake", task_args={"name": "DBT"})
dashboards = start_flow_run(flow_name="03_dashboards", task_args={"name": "Refresh"})
staging.set_downstream(dbt_run)
dbt_run.set_downstream(dashboards)
The same with those new tasks would require much more code:
with Flow(FLOW_NAME, storage=STORAGE) as flow:
extract_load_id = create_flow_run(
flow_name="01_extract_load",
project_name=PROJECT_NAME,
task_args={"name": "Staging"},
)
extract_load_wait_task = wait_for_flow_run(
extract_load_id, task_args={"name": "Staging - wait"}
)
transform_id = create_flow_run(
flow_name="02_dbt", project_name=PROJECT_NAME, task_args={"name": "DBT flow"}
)
transform_id_wait_task = wait_for_flow_run(
transform_id, task_args={"name": "DBT flow - wait"}
)
extract_load_wait_task.set_downstream(transform_id)
dashboards_id = create_flow_run(
flow_name="03_dashboards",
project_name=PROJECT_NAME,
task_args={"name": "Dashboards"},
)
dashboards_wait_task = wait_for_flow_run(
dashboards_id, task_args={"name": "Dashboards - wait"}
)
transform_id_wait_task.set_downstream(dashboards_id)
So there is no right or wrong here, you can use what best suits your use case and your preferences.ale
11/08/2021, 12:28 PM