I’m curious as to whether it is possible to update...
# ask-community
m
I’m curious as to whether it is possible to update the state of a running Flow according the outcome of a Task. That is, if at some point during the execution of a Task a condition is met, can I update the state of the Flow/Task to Failure?
k
Yes you can use
flow.set_reference_tasks
to set the reference task for the Flow
m
@Kevin Kho can I explicitly set the state of
Task
, e.g.:
Copy code
class MyTask(Task):
    def run(self):
        from random import random
        if random() > 0.5: return "Hello, World!"
        else: 
            # set the state of this Task to Failed (Finished)
            pass
k
Yes if you
raise FAIL
where FAIL is the Prefect signal
I didn’t link. Link is here
🙏 1
Also check this
m
Ah, so keeping with the above example, we could have:
Copy code
from prefect.engine.signals import FAIL

class MyTask(Task):
    def run(self):
        from random import random
        if random() > 0.5: return "Hello, World!"
        else: raise FAIL()
k
yes exactly
🙌 1
m
@Kevin Kho I’m finding that when setting the state of a flow to “Failed” using the Prefect Cloud UI “SET STATE” button my callbacks that are meant to be triggered on a state change to “finished” (I’m using the
is_finished
method) are not called. However, when using the “CANCEL” button the behaviour is as expected.
Am I missing something and is this intentional?
k
This is very surprising to me. People have said Cancel callbacks are not called and that’s expected. Could you show me your state handler code? I have seen someone say doing something like:
Copy code
def mystatehandler(....)
    if isinstance(new_state, Cancelled):
        ...
works but I wasn’t sure
m
My callbacks:
Copy code
def kill_keda_job_callback(flow, old_state, new_state):
    if new_state.is_finished():
        pass
    return new_state


def delete_queues_callback(flow, old_state, new_state):
    if new_state.is_finished():
        <http://logger.info|logger.info>(f"Deleting {flow.work_queue_name} and {flow.callback_queue_name}")
        with Connection() as (_, channel):
            channel.queue_delete(queue=flow.work_queue_name)
            channel.queue_delete(queue=flow.callback_queue_name)
    return new_state
When I declare the flow:
Copy code
flow.state_handlers = [kill_keda_job_callback, delete_queues_callback]
k
Cancelling from the UI calls these?
Let me try testing some stuff and I’ll get back to you later
m
The state handlers get trigger when either an
Exception
or
prefect.engine.signals.FAIL
gets raised during task execution, or if the “CANCEL” button is employed.
They don’t get triggered when “SET STATE”>FAILED
k
Ok will test in a bit and see what I find. At the moment, I think setting state in the UI bypasses all state handlers. But will test these scenarios and see what I find
🙌 1
Ok my previous thinking was wrong. I dove into this with @Jean Luciano, one of our solutions engineers and here is what we found. I think we are consistent with your findings, and then I’ll add some explanation. If you click cancel in the UI, it will hit the State Handler that contains:
Copy code
def mystatehandler(flow, old_state, new_state):
    if isinstance(new_state, Cancelled):
and also:
Copy code
def kill_keda_job_callback(flow, old_state, new_state):
    if new_state.is_finished():
gets triggered because of the finished state. When we set the state to Failed in the UI, there are two possible outcomes. The first is if there are still downstream tasks, the Flow gets confused and there are Client errors. The second is if you mark as FAILED during a terminal task. In this case, that last task may still succeed causing the state to succeed. State handlers for
is_failed()
are not hit in this case This is because setting the state in the UI and using the Cancel button take two code paths. Setting the state in the UI is a direct modification of state in the Prefect database. It does not touch the Flow process. Using the Cancel button goes into the Flow process and tries to terminate it. Because it is done in the Flow process, the Flow can still run the state handlers while exiting. So the summary is it’s not by design but you are right that Cancel works differently and setting state in the UI does not
m
Cool, thanks for the feedback. Since we need those callbacks to be triggered, we’ll just stick to the Cancel button when need be (and perhaps set state to Failed afterward if necessary).
k
Yes that would be ideal