new to Prefect here, but how do we avoid crash-loo...
# ask-community
d
new to Prefect here, but how do we avoid crash-looping when using an
IntervalSchedule
if the workflow is raising an exception (so we'd like to simply cancel the workflow as soon an error occurs)?
z
Hi @Daniel Saxton, welcome to Prefect! When you say "cancel the workflow as soon as an error occurs" do you want to • cancel the running workflow that just raised an exception? • cancel different runs of that workflow that are scheduled to run using
IntervalSchedule
?
Cancelling the running Flow that raised an exceptions sort of happens by default. If a task raises an error, the task will fail and any downstream tasks will fail unless retries or other special logic is configured. Happy to help offer some guidance for more complicated cancellation logic. Automatically cancelling future scheduled Flow runs is available via Automations. You can configure an Automation to automatically pause a flow's schedule if the flow fails. https://docs.prefect.io/orchestration/concepts/automations.html#actions.
d
thanks for the quick response, yeah i was thinking more along the lines of the first option, so cancelling all future runs of a flow if it ever fails, it seemed to me with an
IntervalSchedule
the flow continues running on the schedule indefinitely
partly i was thinking this would help when debugging flows (so you don't have to control+c each time an error happens), but also it may be the case that a failure indicates that something is critically wrong and requires manual fixing, in which case you might want to have it stop execution completely
thanks for the doc link, will check that out
z
are you using an agent to execute flows? or just calling
flow.run()
?
d
only calling
flow.run()
and running the script locally
z
The best way to do this would be using a state handler on the flow. For example
Copy code
from prefect import task, Flow
from prefect.schedules.clocks import IntervalClock
from prefect.schedules import Schedule
from datetime import timedelta
import prefect

def exit_on_flow_failure(obj, old_state, new_state):
        if new_state.is_failed():
                # log the exception
                logger = prefect.context.get("logger")
                logger.error("Flow encountered exception")
                logger.error(repr(new_state.result))
                # exit execution
                raise BaseException('Flow failed. Exiting execution...')
        return new_state

@task
def bye():
        raise ValueError()

with Flow('test', state_handlers=[exit_on_flow_failure]) as flow:
        bye()


flow.schedule = Schedule(clocks=[IntervalClock(timedelta(seconds=10))])
flow.run()
Results in this:
Copy code
(py37) ~ % python cancel_schedule.py
[2021-09-18 23:12:54-0400] INFO - prefect.test | Waiting for next scheduled run at 2021-09-19T03:13:00+00:00
[2021-09-18 23:13:00-0400] INFO - prefect.FlowRunner | Beginning Flow run for 'test'
[2021-09-18 23:13:00-0400] INFO - prefect.TaskRunner | Task 'bye': Starting task run...
[2021-09-18 23:13:00-0400] ERROR - prefect.TaskRunner | Task 'bye': Exception encountered during task execution!
Traceback (most recent call last):
  File "/Users/zangell/prefect/core/src/prefect/engine/task_runner.py", line 863, in get_task_run_state
    logger=self.logger,
  File "/Users/zangell/prefect/core/src/prefect/utilities/executors.py", line 445, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "cancel_schedule.py", line 19, in bye
    raise ValueError()
ValueError
[2021-09-18 23:13:00-0400] INFO - prefect.TaskRunner | Task 'bye': Finished task run for task with final state: 'Failed'
[2021-09-18 23:13:00-0400] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
[2021-09-18 23:13:00-0400] ERROR - prefect | Flow encountered exception
[2021-09-18 23:13:00-0400] ERROR - prefect | {<Task: bye>: <Failed: "Error during execution of task: ValueError()">}
Traceback (most recent call last):
  File "cancel_schedule.py", line 26, in <module>
    flow.run()
  File "/Users/zangell/prefect/core/src/prefect/core/flow.py", line 1278, in run
    **kwargs,
  File "/Users/zangell/prefect/core/src/prefect/core/flow.py", line 1099, in _run
    **kwargs,
  File "/Users/zangell/prefect/core/src/prefect/engine/flow_runner.py", line 282, in run
    executor=executor,
  File "/Users/zangell/prefect/core/src/prefect/utilities/executors.py", line 81, in inner
    return runner_method(self, *args, **kwargs)
  File "/Users/zangell/prefect/core/src/prefect/engine/runner.py", line 72, in inner
    new_state = self.handle_state_change(old_state=state, new_state=new_state)
  File "/Users/zangell/prefect/core/src/prefect/engine/runner.py", line 161, in handle_state_change
    new_state = self.call_runner_target_handlers(old_state, new_state)
  File "/Users/zangell/prefect/core/src/prefect/engine/flow_runner.py", line 116, in call_runner_target_handlers
    new_state = handler(self.flow, old_state, new_state) or new_state
  File "cancel_schedule.py", line 14, in exit_on_flow_failure
    raise BaseException('Flow failed. Exiting execution...')
BaseException: Flow failed. Exiting execution...
(and it stops running)
d
great, thanks a lot! just so i understand, this works because the state handler is raising the exception at the flow rather than the task level?
z
Yep exactly. And this particular exception isnt caught by flow.run(), while task exceptions are handled
šŸ‘ 1
d
related question: does Prefect have any common state handlers built into Prefect core itself (so that users can simply import instead of writing themselves)? i wonder if that might be useful
k
We do not have for state handlers. Most of our users use a backend (Prefect Cloud or Server). Prefect Cloud has 10k task runs free per month which is more then enough to get started with btw. So this type of error would be independent for each run with a backend. I bring this up because most state handlers are pretty custom to a user. For example, (if this tasks fails, do x, y and z then mark it as SUCCESS). The conditions that are used to take actions (sending notifications or running other processes) seem to be a very tailored thing. A lot of users hit the GraphQL API in their state handler
d
yeah, i started using Prefect Cloud soon after experimenting locally, definitely an awesome product you all have built šŸ‘
šŸ‘ 1
šŸ™ 1