Thread
#prefect-community
    d

    Daniel Saxton

    1 year ago
    new to Prefect here, but how do we avoid crash-looping when using an
    IntervalSchedule
    if the workflow is raising an exception (so we'd like to simply cancel the workflow as soon an error occurs)?
    Zach Angell

    Zach Angell

    1 year ago
    Hi @Daniel Saxton, welcome to Prefect! When you say "cancel the workflow as soon as an error occurs" do you want to ā€¢ cancel the running workflow that just raised an exception? ā€¢ cancel different runs of that workflow that are scheduled to run using
    IntervalSchedule
    ?
    Cancelling the running Flow that raised an exceptions sort of happens by default. If a task raises an error, the task will fail and any downstream tasks will fail unless retries or other special logic is configured. Happy to help offer some guidance for more complicated cancellation logic. Automatically cancelling future scheduled Flow runs is available via Automations. You can configure an Automation to automatically pause a flow's schedule if the flow fails. https://docs.prefect.io/orchestration/concepts/automations.html#actions.
    d

    Daniel Saxton

    1 year ago
    thanks for the quick response, yeah i was thinking more along the lines of the first option, so cancelling all future runs of a flow if it ever fails, it seemed to me with an
    IntervalSchedule
    the flow continues running on the schedule indefinitely
    partly i was thinking this would help when debugging flows (so you don't have to control+c each time an error happens), but also it may be the case that a failure indicates that something is critically wrong and requires manual fixing, in which case you might want to have it stop execution completely
    thanks for the doc link, will check that out
    Zach Angell

    Zach Angell

    1 year ago
    are you using an agent to execute flows? or just calling
    flow.run()
    ?
    d

    Daniel Saxton

    1 year ago
    only calling
    flow.run()
    and running the script locally
    Zach Angell

    Zach Angell

    1 year ago
    The best way to do this would be using a state handler on the flow. For example
    from prefect import task, Flow
    from prefect.schedules.clocks import IntervalClock
    from prefect.schedules import Schedule
    from datetime import timedelta
    import prefect
    
    def exit_on_flow_failure(obj, old_state, new_state):
            if new_state.is_failed():
                    # log the exception
                    logger = prefect.context.get("logger")
                    logger.error("Flow encountered exception")
                    logger.error(repr(new_state.result))
                    # exit execution
                    raise BaseException('Flow failed. Exiting execution...')
            return new_state
    
    @task
    def bye():
            raise ValueError()
    
    with Flow('test', state_handlers=[exit_on_flow_failure]) as flow:
            bye()
    
    
    flow.schedule = Schedule(clocks=[IntervalClock(timedelta(seconds=10))])
    flow.run()
    Results in this:
    (py37) ~ % python cancel_schedule.py
    [2021-09-18 23:12:54-0400] INFO - prefect.test | Waiting for next scheduled run at 2021-09-19T03:13:00+00:00
    [2021-09-18 23:13:00-0400] INFO - prefect.FlowRunner | Beginning Flow run for 'test'
    [2021-09-18 23:13:00-0400] INFO - prefect.TaskRunner | Task 'bye': Starting task run...
    [2021-09-18 23:13:00-0400] ERROR - prefect.TaskRunner | Task 'bye': Exception encountered during task execution!
    Traceback (most recent call last):
      File "/Users/zangell/prefect/core/src/prefect/engine/task_runner.py", line 863, in get_task_run_state
        logger=self.logger,
      File "/Users/zangell/prefect/core/src/prefect/utilities/executors.py", line 445, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File "cancel_schedule.py", line 19, in bye
        raise ValueError()
    ValueError
    [2021-09-18 23:13:00-0400] INFO - prefect.TaskRunner | Task 'bye': Finished task run for task with final state: 'Failed'
    [2021-09-18 23:13:00-0400] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
    [2021-09-18 23:13:00-0400] ERROR - prefect | Flow encountered exception
    [2021-09-18 23:13:00-0400] ERROR - prefect | {<Task: bye>: <Failed: "Error during execution of task: ValueError()">}
    Traceback (most recent call last):
      File "cancel_schedule.py", line 26, in <module>
        flow.run()
      File "/Users/zangell/prefect/core/src/prefect/core/flow.py", line 1278, in run
        **kwargs,
      File "/Users/zangell/prefect/core/src/prefect/core/flow.py", line 1099, in _run
        **kwargs,
      File "/Users/zangell/prefect/core/src/prefect/engine/flow_runner.py", line 282, in run
        executor=executor,
      File "/Users/zangell/prefect/core/src/prefect/utilities/executors.py", line 81, in inner
        return runner_method(self, *args, **kwargs)
      File "/Users/zangell/prefect/core/src/prefect/engine/runner.py", line 72, in inner
        new_state = self.handle_state_change(old_state=state, new_state=new_state)
      File "/Users/zangell/prefect/core/src/prefect/engine/runner.py", line 161, in handle_state_change
        new_state = self.call_runner_target_handlers(old_state, new_state)
      File "/Users/zangell/prefect/core/src/prefect/engine/flow_runner.py", line 116, in call_runner_target_handlers
        new_state = handler(self.flow, old_state, new_state) or new_state
      File "cancel_schedule.py", line 14, in exit_on_flow_failure
        raise BaseException('Flow failed. Exiting execution...')
    BaseException: Flow failed. Exiting execution...
    (and it stops running)
    d

    Daniel Saxton

    1 year ago
    great, thanks a lot! just so i understand, this works because the state handler is raising the exception at the flow rather than the task level?
    Zach Angell

    Zach Angell

    1 year ago
    Yep exactly. And this particular exception isnt caught by flow.run(), while task exceptions are handled
    d

    Daniel Saxton

    1 year ago
    related question: does Prefect have any common state handlers built into Prefect core itself (so that users can simply import instead of writing themselves)? i wonder if that might be useful
    Kevin Kho

    Kevin Kho

    1 year ago
    We do not have for state handlers. Most of our users use a backend (Prefect Cloud or Server). Prefect Cloud has 10k task runs free per month which is more then enough to get started with btw. So this type of error would be independent for each run with a backend. I bring this up because most state handlers are pretty custom to a user. For example, (if this tasks fails, do x, y and z then mark it as SUCCESS). The conditions that are used to take actions (sending notifications or running other processes) seem to be a very tailored thing. A lot of users hit the GraphQL API in their state handler
    d

    Daniel Saxton

    1 year ago
    yeah, i started using Prefect Cloud soon after experimenting locally, definitely an awesome product you all have built šŸ‘