Julian
09/10/2020, 9:44 PMwith Flow("scale_presto_cluster", result=get_default_local_result()) as flow:
scale_down = Parameter("scale_down_capacity", required=False)
scale_up = Parameter("scale_up_capacity", required=False)
with PrestoCluster(n_worker_low=scale_down, n_worker_high=scale_up) as client:
run_flow_task_stats_weekly = FlowRunTask(flow_name="dummy-weekly", project_name='default', wait=True)
run_flow_task_stats_weekly(parameters=get_weekly_fromto())
with case(check_if_monthly_run, True):
run_flow_task_stats_monthly = FlowRunTask(flow_name="dummy-monthly", project_name='default', wait=True)
run_flow_task_stats_monthly(parameters=get_monthly_fromto(), upstream_tasks=[run_flow_task_stats_weekly])
I used a ResourceManager as documented in https://docs.prefect.io/api/latest/tasks/resources.html#functions
I noticed that the cleanup task wasn't called after I cancelled scale_presto_cluster flow run during it's execution, but was cancelled instead as well.
Since this flow will be run nightly, I mus't ensure that the cleanup task of the Flow will be triggered no matter what. How can I achieve this? Is a task even the right approach?Dylan
Dylan
Cancelled state the intention is to release resources as fast as possibleDylan
Failed state so that the flow can cleanup resources as expectedDylan
cleanup task definitely run every timeDylan
Dylan
Sven Teresniak
09/11/2020, 8:38 AMStateHandler to act on state change to cancelled ?nicholas
StateHandler to catch Cancelled states something like this:
from prefect import Task, Flow
from prefect.engine.state import Cancelled
def my_state_handler(task, from_state, to_state):
if isinstance(to_state, Cancelled):
# do something
return to_state
t = Task(state_handlers=[my_state_handler])
# ..etcSven Teresniak
09/11/2020, 4:52 PMcleanup() is skipped when a flowrun is cancelledSven Teresniak
09/11/2020, 4:52 PMnicholas
Sven Teresniak
09/12/2020, 8:25 AM