Hey, I'm running prefect version 0.13.5 and want to have a flow that triggers two other flows, which involves scaling a presto cluster.
Here is an exemplary code snippet
with Flow("scale_presto_cluster", result=get_default_local_result()) as flow:
scale_down = Parameter("scale_down_capacity", required=False)
scale_up = Parameter("scale_up_capacity", required=False)
with PrestoCluster(n_worker_low=scale_down, n_worker_high=scale_up) as client:
run_flow_task_stats_weekly = FlowRunTask(flow_name="dummy-weekly", project_name='default', wait=True)
run_flow_task_stats_weekly(parameters=get_weekly_fromto())
with case(check_if_monthly_run, True):
run_flow_task_stats_monthly = FlowRunTask(flow_name="dummy-monthly", project_name='default', wait=True)
run_flow_task_stats_monthly(parameters=get_monthly_fromto(), upstream_tasks=[run_flow_task_stats_weekly])
I used a ResourceManager as documented in
https://docs.prefect.io/api/latest/tasks/resources.html#functions
I noticed that the cleanup task wasn't called after I cancelled
scale_presto_cluster
flow run during it's execution, but was cancelled instead as well.
Since this flow will be run nightly, I mus't ensure that the
cleanup
task of the Flow will be triggered no matter what. How can I achieve this? Is a task even the right approach?