Constantino Schillebeeckx
10/28/2021, 7:13 PMException
that caused the failure from this callback?Kevin Kho
from prefect import Flow, task
def my_state_handler(task, old_state, new_state):
if new_state.is_failed():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(type(new_state.result))
return new_state
@task(state_handlers=[my_state_handler])
def test(x):
"test" + 1
return x
with Flow("aaa") as flow:
a = test(1)
flow.run()
I suspect it will work on the Flow level tooConstantino Schillebeeckx
10/28/2021, 7:27 PMoriginal_exception = list(state.result.values())[0].result
Kevin Kho
Constantino Schillebeeckx
10/28/2021, 7:32 PMstate.result
object? is it always guaranteed to be of size 1? The docs just show
result (Any, optional): Defaults to `None`. A data payload for the state.
Kevin Kho
Kevin Kho
if state.is_failed()
, I’m positive this will workConstantino Schillebeeckx
12/13/2021, 9:50 PMstate.result
is empty when this is running in ECS, however when it's run locally it contains the Exceptions I'm after. Do you have any other thoughts regarding getting at the original caught exception that caused the failure?Kevin Kho
Kevin Kho
Kevin Kho
Constantino Schillebeeckx
12/14/2021, 5:55 PMmap
and normally set a DaskExecutor, however when I commented yesterday on the behavior that I"m seeing, I left the executor as the default (but still used a map
)Kevin Kho
Kevin Kho
map
(notice log is labelled as test[1]
Kevin Kho
from prefect import Flow, task
import prefect
from prefect.run_configs import ECSRun
from prefect.storage import S3
def my_state_handler(task, old_state, new_state):
if new_state.is_failed():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(type(new_state.result))
return new_state
@task(state_handlers=[my_state_handler])
def test(x):
if x == 2:
"test" + 1
return x
STORAGE = S3(bucket="coiled-prefect", add_default_labels=False)
RUN_CONFIG = ECSRun(run_task_kwargs={'cluster': 'test-cluster'},
task_role_arn= 'arn:aws:iam::1111111111:role/prefect-ecs',
execution_role_arn='arn:aws:iam::111111111111:role/prefect-ecs',
image= 'prefecthq/prefect:latest-python3.8')
with Flow("except", run_config=RUN_CONFIG, storage=STORAGE) as flow:
a = test.map([1,2,3,4])
flow.register("bristech")
Constantino Schillebeeckx
12/14/2021, 8:24 PM