https://prefect.io logo
Title
m

Meghan Franklin

09/30/2022, 8:45 PM
Hello, I have some absolutely baffling behavior trying to implement state handlers using a
fail_signal = signals.FAIL(message_here); raise fail_signal
-> the state handler isn’t triggering, although it does trigger when a task otherwise changes status AND it triggers if a task fails for a reason other than a
raise fail_signal
. What even more baffling is that I can run the sample code with
mission_critical_task
from here and I get the expected behavior and the expected triggers. It’s a complicated flow, but I’ve ripped out as many pieces and simplified as much as possible and now I’m out of ideas on why I’m getting this lack of behavior. I’m tied to prefect 0.15.13
k

Kalise Richmond

09/30/2022, 9:49 PM
Hi Meghan, that is baffling that it's triggering for other states than the failed state. Are you able to share more details about how you are calling the state handler?
m

Meghan Franklin

10/03/2022, 12:49 PM
Here is one example, but several tasks use a similar call signature:
@task(log_stdout=True, state_handlers=[slack_failure_message])
def build_config(cfg, user_name, py_client) -> Config:
    cfg["samples"] = [
        (x.split(",")[0].strip(), x.split(",")[1].strip())
        for x in cfg["samples"].split(";")
    ]
    cfg["reference_regions"] = [
        (x.split(",")[0].strip(), x.split(",")[1].strip())
        for x in cfg["reference_regions"].split(";")
    ]
    raise signals.FAIL("LETS KILL IT HERE")

def slack_failure_message(task, old_state, new_state):
    print(task)
    print("new state", new_state, type(new_state))
    print("new state dict", new_state.__dict__)
    if new_state.is_failed():
       print("AAHHHH, ERROR")
    return new_state
The print statements are how I know it’s being called for states other than FAIL. I get this traceback (you’ll see it moves on to another task at the end):
[2022-10-03 08:43:05-0400] INFO - prefect.TaskRunner | Task 'build_config': Starting task run...
<Task: build_config>
new state <Running: "Starting task run."> <class 'prefect.engine.state.Running'>
new state dict {'message': 'Starting task run.', '_result': <NoResultType: None>, 'context': {'tags': []}, 'cached_inputs': {}}
[2022-10-03 08:43:05-0400] INFO - prefect.TaskRunner | FAIL signal raised: FAIL('LETS KILL IT HERE')
[2022-10-03 08:43:05-0400] INFO - prefect.TaskRunner | Task 'build_config': Finished task run for task with final state: 'Failed'
[2022-10-03 08:43:05-0400] INFO - prefect.TaskRunner | Task 'setup_logging': Starting task run...
👀 1
k

Kalise Richmond

10/04/2022, 11:57 PM
So strange, I just took your sample to run this locally and I'm seeing it work. The only difference is that I moved
def slack_failure_message()
above the task that calls it and took out the cfg splits for sample code. Here's my input:
def slack_failure_message(task, old_state, new_state):
    print(task)
    print("new state", new_state, type(new_state))
    print("new state dict", new_state.__dict__)
    if new_state.is_failed():
       print("AAHHHH, ERROR")
    return new_state

@task(log_stdout=True, state_handlers=[slack_failure_message])
def build_config():
    raise signals.FAIL("LETS KILL IT HERE")
And here was the output in the terminal:
(prefect1) ➜  se-dev prefect version                               
0.15.13
(prefect1) ➜  se-dev python test.py                                
[2022-10-04 16:54:47-0700] INFO - prefect.FlowRunner | Beginning Flow run for 'test'
[2022-10-04 16:54:47-0700] INFO - prefect.TaskRunner | Task 'build_config': Starting task run...
<Task: build_config>
new state <Running: "Starting task run."> <class 'prefect.engine.state.Running'>
new state dict {'message': 'Starting task run.', '_result': <NoResultType: None>, 'context': {'tags': []}, 'cached_inputs': {}}
[2022-10-04 16:54:47-0700] INFO - prefect.TaskRunner | FAIL signal raised: FAIL('LETS KILL IT HERE')
<Task: build_config>
new state <Failed: "LETS KILL IT HERE"> <class 'prefect.engine.state.Failed'>
new state dict {'message': 'LETS KILL IT HERE', '_result': <Result: FAIL('LETS KILL IT HERE')>, 'context': {'tags': []}, 'cached_inputs': {}}
AAHHHH, ERROR
[2022-10-04 16:54:47-0700] INFO - prefect.TaskRunner | Task 'build_config': Finished task run for task with final state: 'Failed'
[2022-10-04 16:54:47-0700] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
(prefect1) ➜  se-dev
m

Meghan Franklin

10/05/2022, 3:17 PM
That IS why this is so baffling. I also can’t get it to reproduce the behavior on any smaller unit than in my full (stripped down) production flow. I’m wondering if this is because my production flow is actually called from a different script ie
my_flow.run(parameters=parameters, executor=lde)
isn’t in the same file as the flow itself.