Daniel Saxton
09/18/2021, 3:43 PMIntervalSchedule
if the workflow is raising an exception (so we'd like to simply cancel the workflow as soon an error occurs)?Zach Angell
IntervalSchedule
?Zach Angell
Daniel Saxton
09/18/2021, 9:34 PMIntervalSchedule
the flow continues running on the schedule indefinitelyDaniel Saxton
09/18/2021, 9:37 PMDaniel Saxton
09/18/2021, 9:39 PMZach Angell
flow.run()
?Daniel Saxton
09/19/2021, 12:59 AMflow.run()
and running the script locallyZach Angell
from prefect import task, Flow
from prefect.schedules.clocks import IntervalClock
from prefect.schedules import Schedule
from datetime import timedelta
import prefect
def exit_on_flow_failure(obj, old_state, new_state):
if new_state.is_failed():
# log the exception
logger = prefect.context.get("logger")
logger.error("Flow encountered exception")
logger.error(repr(new_state.result))
# exit execution
raise BaseException('Flow failed. Exiting execution...')
return new_state
@task
def bye():
raise ValueError()
with Flow('test', state_handlers=[exit_on_flow_failure]) as flow:
bye()
flow.schedule = Schedule(clocks=[IntervalClock(timedelta(seconds=10))])
flow.run()
Zach Angell
(py37) ~ % python cancel_schedule.py
[2021-09-18 23:12:54-0400] INFO - prefect.test | Waiting for next scheduled run at 2021-09-19T03:13:00+00:00
[2021-09-18 23:13:00-0400] INFO - prefect.FlowRunner | Beginning Flow run for 'test'
[2021-09-18 23:13:00-0400] INFO - prefect.TaskRunner | Task 'bye': Starting task run...
[2021-09-18 23:13:00-0400] ERROR - prefect.TaskRunner | Task 'bye': Exception encountered during task execution!
Traceback (most recent call last):
File "/Users/zangell/prefect/core/src/prefect/engine/task_runner.py", line 863, in get_task_run_state
logger=self.logger,
File "/Users/zangell/prefect/core/src/prefect/utilities/executors.py", line 445, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "cancel_schedule.py", line 19, in bye
raise ValueError()
ValueError
[2021-09-18 23:13:00-0400] INFO - prefect.TaskRunner | Task 'bye': Finished task run for task with final state: 'Failed'
[2021-09-18 23:13:00-0400] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
[2021-09-18 23:13:00-0400] ERROR - prefect | Flow encountered exception
[2021-09-18 23:13:00-0400] ERROR - prefect | {<Task: bye>: <Failed: "Error during execution of task: ValueError()">}
Traceback (most recent call last):
File "cancel_schedule.py", line 26, in <module>
flow.run()
File "/Users/zangell/prefect/core/src/prefect/core/flow.py", line 1278, in run
**kwargs,
File "/Users/zangell/prefect/core/src/prefect/core/flow.py", line 1099, in _run
**kwargs,
File "/Users/zangell/prefect/core/src/prefect/engine/flow_runner.py", line 282, in run
executor=executor,
File "/Users/zangell/prefect/core/src/prefect/utilities/executors.py", line 81, in inner
return runner_method(self, *args, **kwargs)
File "/Users/zangell/prefect/core/src/prefect/engine/runner.py", line 72, in inner
new_state = self.handle_state_change(old_state=state, new_state=new_state)
File "/Users/zangell/prefect/core/src/prefect/engine/runner.py", line 161, in handle_state_change
new_state = self.call_runner_target_handlers(old_state, new_state)
File "/Users/zangell/prefect/core/src/prefect/engine/flow_runner.py", line 116, in call_runner_target_handlers
new_state = handler(self.flow, old_state, new_state) or new_state
File "cancel_schedule.py", line 14, in exit_on_flow_failure
raise BaseException('Flow failed. Exiting execution...')
BaseException: Flow failed. Exiting execution...
(and it stops running)Daniel Saxton
09/19/2021, 1:16 PMZach Angell
Daniel Saxton
09/19/2021, 2:21 PMKevin Kho
Daniel Saxton
09/20/2021, 12:37 AM