Julian
09/10/2020, 9:44 PMwith Flow("scale_presto_cluster", result=get_default_local_result()) as flow:
scale_down = Parameter("scale_down_capacity", required=False)
scale_up = Parameter("scale_up_capacity", required=False)
with PrestoCluster(n_worker_low=scale_down, n_worker_high=scale_up) as client:
run_flow_task_stats_weekly = FlowRunTask(flow_name="dummy-weekly", project_name='default', wait=True)
run_flow_task_stats_weekly(parameters=get_weekly_fromto())
with case(check_if_monthly_run, True):
run_flow_task_stats_monthly = FlowRunTask(flow_name="dummy-monthly", project_name='default', wait=True)
run_flow_task_stats_monthly(parameters=get_monthly_fromto(), upstream_tasks=[run_flow_task_stats_weekly])
I used a ResourceManager as documented in https://docs.prefect.io/api/latest/tasks/resources.html#functions
I noticed that the cleanup task wasn't called after I cancelled scale_presto_cluster
flow run during it's execution, but was cancelled instead as well.
Since this flow will be run nightly, I mus't ensure that the cleanup
task of the Flow will be triggered no matter what. How can I achieve this? Is a task even the right approach?Dylan
09/10/2020, 9:46 PMCancelled
state the intention is to release resources as fast as possibleFailed
state so that the flow can cleanup resources as expectedcleanup
task definitely run every timeSven Teresniak
09/11/2020, 8:38 AMStateHandler
to act on state change to cancelled
?nicholas
09/11/2020, 2:53 PMStateHandler
to catch Cancelled
states something like this:
from prefect import Task, Flow
from prefect.engine.state import Cancelled
def my_state_handler(task, from_state, to_state):
if isinstance(to_state, Cancelled):
# do something
return to_state
t = Task(state_handlers=[my_state_handler])
# ..etc
Sven Teresniak
09/11/2020, 4:52 PMcleanup()
is skipped when a flowrun is cancellednicholas
09/11/2020, 4:59 PMSven Teresniak
09/12/2020, 8:25 AM