Jeff Williams
05/21/2021, 3:11 PMlen(state.result.keys())
I get a non-zero value locally but get zero when run on the agent.
Any ideas as to why?Kevin Kho
05/21/2021, 3:13 PMfrom prefect import task, Flow
from prefect.engine.state import State, TriggerFailed
from typing import Set, Optional
def custom_terminal_state_handler(
flow: Flow,
state: State,
reference_task_states: Set[State],
) -> Optional[State]:
if state.is_failed():
for task, task_state in state.result.items():
if not task_state.is_failed():
continue
if isinstance(task_state, TriggerFailed):
print(f"Task {task.name} failed because an upstream failed")
continue
print(f"Task {task.name} failed with exception {task_state.result!r}")
return state
@task
def abc(x):
return x
@task
def bcd(x):
raise ValueError("Foo!")
@task
def cde(x):
return x
with Flow(
"terminal_handler", terminal_state_handler=custom_terminal_state_handler
) as flow:
a = abc(1)
b = bcd(a)
c = cde(b)
flow.run()
Jeff Williams
05/21/2021, 3:14 PMKevin Kho
05/21/2021, 3:16 PMJeff Williams
05/21/2021, 3:25 PMZach Angell
05/21/2021, 3:36 PMflow.run()
, the Flow
object specifies all of its task
objects should be included as return_tasks
in the `Flow`'s final state
.
When running against Server/Cloud, return_tasks
defaults to None
, which is why your state
object has no results. The reasoning here is that all task results for a flow run can be a very large piece of data, especially with mapped tasks. We don't have an efficient way of interacting with this from Core just yet. However, we're working on related features that will make this feasible.
Is there something specific you're trying to accomplish with the terminal state handler? I'm happy to help find a workaroundJeff Williams
05/21/2021, 3:46 PMZach Angell
05/21/2021, 4:05 PMstate
is successful or failed.
For more detailed info like task results, I'd recommend querying the Prefect API for the time being. This could be done in the terminal state handler if needed in a timely manner, or as a batch job after the fact.
Some of the work in this PR (which will make what you want to do much easier) might provide inspiration https://github.com/PrefectHQ/prefect/pull/4434/files. In particular, I'd take a look through src/backend/flow_run.py
and src/backend/task_run.py
Jeff Williams
05/21/2021, 4:28 PM