Julian
09/10/2020, 9:44 PMwith Flow("scale_presto_cluster", result=get_default_local_result()) as flow:
scale_down = Parameter("scale_down_capacity", required=False)
scale_up = Parameter("scale_up_capacity", required=False)
with PrestoCluster(n_worker_low=scale_down, n_worker_high=scale_up) as client:
run_flow_task_stats_weekly = FlowRunTask(flow_name="dummy-weekly", project_name='default', wait=True)
run_flow_task_stats_weekly(parameters=get_weekly_fromto())
with case(check_if_monthly_run, True):
run_flow_task_stats_monthly = FlowRunTask(flow_name="dummy-monthly", project_name='default', wait=True)
run_flow_task_stats_monthly(parameters=get_monthly_fromto(), upstream_tasks=[run_flow_task_stats_weekly])
I used a ResourceManager as documented in https://docs.prefect.io/api/latest/tasks/resources.html#functions
I noticed that the cleanup task wasn't called after I cancelled scale_presto_cluster
flow run during it's execution, but was cancelled instead as well.
Since this flow will be run nightly, I mus't ensure that the cleanup
task of the Flow will be triggered no matter what. How can I achieve this? Is a task even the right approach?Dylan
Dylan
Cancelled
state the intention is to release resources as fast as possibleDylan
Failed
state so that the flow can cleanup resources as expectedDylan
cleanup
task definitely run every timeDylan
Dylan
Sven Teresniak
09/11/2020, 8:38 AMStateHandler
to act on state change to cancelled
?nicholas
StateHandler
to catch Cancelled
states something like this:
from prefect import Task, Flow
from prefect.engine.state import Cancelled
def my_state_handler(task, from_state, to_state):
if isinstance(to_state, Cancelled):
# do something
return to_state
t = Task(state_handlers=[my_state_handler])
# ..etc
Sven Teresniak
09/11/2020, 4:52 PMcleanup()
is skipped when a flowrun is cancelledSven Teresniak
09/11/2020, 4:52 PMnicholas
Sven Teresniak
09/12/2020, 8:25 AM