Hey, I'm running prefect version 0.13.5 and want t...
# prefect-community
j
Hey, I'm running prefect version 0.13.5 and want to have a flow that triggers two other flows, which involves scaling a presto cluster. Here is an exemplary code snippet
Copy code
with Flow("scale_presto_cluster", result=get_default_local_result()) as flow:
    scale_down = Parameter("scale_down_capacity", required=False)
    scale_up = Parameter("scale_up_capacity", required=False)
    with PrestoCluster(n_worker_low=scale_down, n_worker_high=scale_up) as client:
        run_flow_task_stats_weekly = FlowRunTask(flow_name="dummy-weekly", project_name='default', wait=True)
        run_flow_task_stats_weekly(parameters=get_weekly_fromto())

        with case(check_if_monthly_run, True):
            run_flow_task_stats_monthly = FlowRunTask(flow_name="dummy-monthly", project_name='default', wait=True)
            run_flow_task_stats_monthly(parameters=get_monthly_fromto(), upstream_tasks=[run_flow_task_stats_weekly])
I used a ResourceManager as documented in https://docs.prefect.io/api/latest/tasks/resources.html#functions I noticed that the cleanup task wasn't called after I cancelled
scale_presto_cluster
flow run during it's execution, but was cancelled instead as well. Since this flow will be run nightly, I mus't ensure that the
cleanup
task of the Flow will be triggered no matter what. How can I achieve this? Is a task even the right approach?
d
Hi @Julian, excellent question!
I believe once the flow run is in a
Cancelled
state the intention is to release resources as fast as possible
Instead, I would suggest putting a particular task run into a
Failed
state so that the flow can cleanup resources as expected
If you need the
cleanup
task definitely run every time
I would suggest making code a task so that you can see its status specifically
And you can take advantage of retry logic
s
Is it possible to use a
StateHandler
to act on state change to
cancelled
?
n
Hi @Sven Teresniak - yes, you can use a
StateHandler
to catch
Cancelled
states something like this:
Copy code
from prefect import Task, Flow
from prefect.engine.state import Cancelled

def my_state_handler(task, from_state, to_state):
  if isinstance(to_state, Cancelled):
    # do something
  return to_state

t = Task(state_handlers=[my_state_handler])
# ..etc
s
coool. very nice to clean up. my colleaque said the ressource manager's
cleanup()
is skipped when a flowrun is cancelled
maybe we can workaround that by using state handler
n
Hm that sounds like it's potentially a bug, would you mind opening a ticket with what you're seeing (and a min repro example if possible?)
s
Sure. I'll investigate further and get back to you (using a ticket)