Meghan Franklin
09/30/2022, 8:45 PMfail_signal = signals.FAIL(message_here); raise fail_signal
-> the state handler isn’t triggering, although it does trigger when a task otherwise changes status AND it triggers if a task fails for a reason other than a raise fail_signal
. What even more baffling is that I can run the sample code with mission_critical_task
from here and I get the expected behavior and the expected triggers. It’s a complicated flow, but I’ve ripped out as many pieces and simplified as much as possible and now I’m out of ideas on why I’m getting this lack of behavior. I’m tied to prefect 0.15.13Kalise Richmond
09/30/2022, 9:49 PMMeghan Franklin
10/03/2022, 12:49 PM@task(log_stdout=True, state_handlers=[slack_failure_message])
def build_config(cfg, user_name, py_client) -> Config:
cfg["samples"] = [
(x.split(",")[0].strip(), x.split(",")[1].strip())
for x in cfg["samples"].split(";")
]
cfg["reference_regions"] = [
(x.split(",")[0].strip(), x.split(",")[1].strip())
for x in cfg["reference_regions"].split(";")
]
raise signals.FAIL("LETS KILL IT HERE")
def slack_failure_message(task, old_state, new_state):
print(task)
print("new state", new_state, type(new_state))
print("new state dict", new_state.__dict__)
if new_state.is_failed():
print("AAHHHH, ERROR")
return new_state
The print statements are how I know it’s being called for states other than FAIL. I get this traceback (you’ll see it moves on to another task at the end):
[2022-10-03 08:43:05-0400] INFO - prefect.TaskRunner | Task 'build_config': Starting task run...
<Task: build_config>
new state <Running: "Starting task run."> <class 'prefect.engine.state.Running'>
new state dict {'message': 'Starting task run.', '_result': <NoResultType: None>, 'context': {'tags': []}, 'cached_inputs': {}}
[2022-10-03 08:43:05-0400] INFO - prefect.TaskRunner | FAIL signal raised: FAIL('LETS KILL IT HERE')
[2022-10-03 08:43:05-0400] INFO - prefect.TaskRunner | Task 'build_config': Finished task run for task with final state: 'Failed'
[2022-10-03 08:43:05-0400] INFO - prefect.TaskRunner | Task 'setup_logging': Starting task run...
Kalise Richmond
10/04/2022, 11:57 PMdef slack_failure_message()
above the task that calls it and took out the cfg splits for sample code. Here's my input:
def slack_failure_message(task, old_state, new_state):
print(task)
print("new state", new_state, type(new_state))
print("new state dict", new_state.__dict__)
if new_state.is_failed():
print("AAHHHH, ERROR")
return new_state
@task(log_stdout=True, state_handlers=[slack_failure_message])
def build_config():
raise signals.FAIL("LETS KILL IT HERE")
(prefect1) ➜ se-dev prefect version
0.15.13
(prefect1) ➜ se-dev python test.py
[2022-10-04 16:54:47-0700] INFO - prefect.FlowRunner | Beginning Flow run for 'test'
[2022-10-04 16:54:47-0700] INFO - prefect.TaskRunner | Task 'build_config': Starting task run...
<Task: build_config>
new state <Running: "Starting task run."> <class 'prefect.engine.state.Running'>
new state dict {'message': 'Starting task run.', '_result': <NoResultType: None>, 'context': {'tags': []}, 'cached_inputs': {}}
[2022-10-04 16:54:47-0700] INFO - prefect.TaskRunner | FAIL signal raised: FAIL('LETS KILL IT HERE')
<Task: build_config>
new state <Failed: "LETS KILL IT HERE"> <class 'prefect.engine.state.Failed'>
new state dict {'message': 'LETS KILL IT HERE', '_result': <Result: FAIL('LETS KILL IT HERE')>, 'context': {'tags': []}, 'cached_inputs': {}}
AAHHHH, ERROR
[2022-10-04 16:54:47-0700] INFO - prefect.TaskRunner | Task 'build_config': Finished task run for task with final state: 'Failed'
[2022-10-04 16:54:47-0700] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
(prefect1) ➜ se-dev
Meghan Franklin
10/05/2022, 3:17 PMmy_flow.run(parameters=parameters, executor=lde)
isn’t in the same file as the flow itself.