An Hoang
01/02/2022, 1:24 AMtask_run_name
when templated with input names can be very helpful in debugging mapped tasks. Are they only available on Prefect backend (Cloud and Server) and not Core?
If I'm doing local debugging with flow_result = flow.run()
, let's say I have a flow with task_a
mapped 1000 times and task_a[25]
failed but the other indices succeed. What's the quickest way to find out which input caused it to fail? I don't think I can access the result of flow_result.result[task_a].result[25]
Kevin Kho
from prefect import Flow, task, Parameter
import prefect
def mystatehandler(task, old_state, new_state):
if new_state.is_failed():
<http://prefect.context.logger.info|prefect.context.logger.info>(new_state.result)
return
@task(state_handlers=[mystatehandler])
def mytask(x):
if x == 2:
x + "test"
else:
return x + 1
with Flow("...") as flow:
mytask.map([1,2,3,4,5])
flow.run()
If you need to log specific inputs, then I think the easiest way to do this is by adding your inputs to the context like this:
from prefect import Flow, task, Parameter
import prefect
def mystatehandler(task, old_state, new_state):
if new_state.is_failed():
<http://prefect.context.logger.info|prefect.context.logger.info>(new_state.result)
<http://prefect.context.logger.info|prefect.context.logger.info>("Failed for x = " + str(prefect.context.x))
return
@task(state_handlers=[mystatehandler])
def mytask(x):
prefect.context.x = x
if x == 2:
x + "test"
else:
return x + 1
with Flow("...") as flow:
mytask.map([1,2,3,4,5])
flow.run()
But it’s kind of a hackAn Hoang
01/02/2022, 4:28 PM