https://prefect.io logo
m

Michail Melonas

01/19/2022, 3:10 PM
I’m curious as to whether it is possible to update the state of a running Flow according the outcome of a Task. That is, if at some point during the execution of a Task a condition is met, can I update the state of the Flow/Task to Failure?
k

Kevin Kho

01/19/2022, 3:18 PM
Yes you can use
flow.set_reference_tasks
to set the reference task for the Flow
m

Michail Melonas

01/20/2022, 7:18 AM
@Kevin Kho can I explicitly set the state of
Task
, e.g.:
Copy code
class MyTask(Task):
    def run(self):
        from random import random
        if random() > 0.5: return "Hello, World!"
        else: 
            # set the state of this Task to Failed (Finished)
            pass
k

Kevin Kho

01/20/2022, 7:23 AM
Yes if you
raise FAIL
where FAIL is the Prefect signal
I didn’t link. Link is here
🙏 1
Also check this
m

Michail Melonas

01/20/2022, 7:27 AM
Ah, so keeping with the above example, we could have:
Copy code
from prefect.engine.signals import FAIL

class MyTask(Task):
    def run(self):
        from random import random
        if random() > 0.5: return "Hello, World!"
        else: raise FAIL()
k

Kevin Kho

01/20/2022, 7:29 AM
yes exactly
🙌 1
m

Michail Melonas

02/16/2022, 1:58 PM
@Kevin Kho I’m finding that when setting the state of a flow to “Failed” using the Prefect Cloud UI “SET STATE” button my callbacks that are meant to be triggered on a state change to “finished” (I’m using the
is_finished
method) are not called. However, when using the “CANCEL” button the behaviour is as expected.
Am I missing something and is this intentional?
k

Kevin Kho

02/16/2022, 2:28 PM
This is very surprising to me. People have said Cancel callbacks are not called and that’s expected. Could you show me your state handler code? I have seen someone say doing something like:
Copy code
def mystatehandler(....)
    if isinstance(new_state, Cancelled):
        ...
works but I wasn’t sure
m

Michail Melonas

02/16/2022, 2:38 PM
My callbacks:
Copy code
def kill_keda_job_callback(flow, old_state, new_state):
    if new_state.is_finished():
        pass
    return new_state


def delete_queues_callback(flow, old_state, new_state):
    if new_state.is_finished():
        <http://logger.info|logger.info>(f"Deleting {flow.work_queue_name} and {flow.callback_queue_name}")
        with Connection() as (_, channel):
            channel.queue_delete(queue=flow.work_queue_name)
            channel.queue_delete(queue=flow.callback_queue_name)
    return new_state
When I declare the flow:
Copy code
flow.state_handlers = [kill_keda_job_callback, delete_queues_callback]
k

Kevin Kho

02/16/2022, 2:40 PM
Cancelling from the UI calls these?
Let me try testing some stuff and I’ll get back to you later
m

Michail Melonas

02/16/2022, 2:43 PM
The state handlers get trigger when either an
Exception
or
prefect.engine.signals.FAIL
gets raised during task execution, or if the “CANCEL” button is employed.
They don’t get triggered when “SET STATE”>FAILED
k

Kevin Kho

02/16/2022, 2:56 PM
Ok will test in a bit and see what I find. At the moment, I think setting state in the UI bypasses all state handlers. But will test these scenarios and see what I find
🙌 1
Ok my previous thinking was wrong. I dove into this with @Jean Luciano, one of our solutions engineers and here is what we found. I think we are consistent with your findings, and then I’ll add some explanation. If you click cancel in the UI, it will hit the State Handler that contains:
Copy code
def mystatehandler(flow, old_state, new_state):
    if isinstance(new_state, Cancelled):
and also:
Copy code
def kill_keda_job_callback(flow, old_state, new_state):
    if new_state.is_finished():
gets triggered because of the finished state. When we set the state to Failed in the UI, there are two possible outcomes. The first is if there are still downstream tasks, the Flow gets confused and there are Client errors. The second is if you mark as FAILED during a terminal task. In this case, that last task may still succeed causing the state to succeed. State handlers for
is_failed()
are not hit in this case This is because setting the state in the UI and using the Cancel button take two code paths. Setting the state in the UI is a direct modification of state in the Prefect database. It does not touch the Flow process. Using the Cancel button goes into the Flow process and tries to terminate it. Because it is done in the Flow process, the Flow can still run the state handlers while exiting. So the summary is it’s not by design but you are right that Cancel works differently and setting state in the UI does not
m

Michail Melonas

02/17/2022, 10:13 AM
Cool, thanks for the feedback. Since we need those callbacks to be triggered, we’ll just stick to the Cancel button when need be (and perhaps set state to Failed afterward if necessary).
k

Kevin Kho

02/17/2022, 2:10 PM
Yes that would be ideal
2 Views