Hi - i found something strange... Seem like when a...
# prefect-community
a
Hi - i found something strange... Seem like when a task feeding into a mapped tasks fails or triggerfails, the downstream task still tries to map the result from the upstream(failed) task, and prefect errors out with 'FAILED' object does not support indexing.
Copy code
from prefect import Flow, Parameter, task
from prefect.tasks.core.constants import Constant
from prefect.engine.signals import FAIL
@task
def trythis(x, fail=False):
    if fail:
        raise FAIL()
    return [k+1 for k in x]
    
@task
def tryprint(x):
    print(x)
    
with Flow("math") as f:
    x = Parameter("x")
    y = trythis(x, fail=True)
    y1 = trythis(y, fail=False)
    z = tryprint.map(y1)
    
flow_state = f.run(x=[1,2,3,4,5,6,7])
print(flow_state.result[z].result)
Output:
Copy code
[2019-12-03 06:50:35,756] INFO - prefect.FlowRunner | Beginning Flow run for 'math'
[2019-12-03 06:50:35,758] INFO - prefect.FlowRunner | Starting flow run.
[2019-12-03 06:50:35,765] INFO - prefect.TaskRunner | Task 'x': Starting task run...
[2019-12-03 06:50:35,769] INFO - prefect.TaskRunner | Task 'x': finished task run for task with final state: 'Success'
[2019-12-03 06:50:35,775] INFO - prefect.TaskRunner | Task 'trythis': Starting task run...
[2019-12-03 06:50:35,780] INFO - prefect.TaskRunner | Task 'trythis': finished task run for task with final state: 'Failed'
[2019-12-03 06:50:35,786] INFO - prefect.TaskRunner | Task 'trythis': Starting task run...
[2019-12-03 06:50:35,789] INFO - prefect.TaskRunner | Task 'trythis': finished task run for task with final state: 'TriggerFailed'
[2019-12-03 06:50:35,795] INFO - prefect.TaskRunner | Task 'tryprint': Starting task run...
[2019-12-03 06:50:35,796] ERROR - prefect.TaskRunner | Task 'tryprint': unexpected error while running task: TypeError("'TRIGGERFAIL' object does not support indexing",)
Traceback (most recent call last):
  File "/opt/conda/envs/pipeline/lib/python3.6/site-packages/prefect/engine/task_runner.py", line 259, in run
    executor=executor,
  File "/opt/conda/envs/pipeline/lib/python3.6/site-packages/prefect/engine/task_runner.py", line 719, in run_mapped_task
    upstream_state.result[i],
TypeError: 'TRIGGERFAIL' object does not support indexing
[2019-12-03 06:50:35,800] INFO - prefect.TaskRunner | Task 'tryprint': finished task run for task with final state: 'Failed'
[2019-12-03 06:50:35,802] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
'TRIGGERFAIL' object does not support indexing
Seems like the mapped task should also raise triggerfailed if the upstream tasks failed.. maybe it's not so simple as it may just be a subset of upstream tasks which failed... what is the design philosophy on this?
j
(just for my clarification) It looks like what you’re doing is raising
FAIL
which becomes the result of your
y
task. The you take that result and pass it into
y1
with fail=False so it will reach the
[k+1 for k in x]
with
x
being the
FAIL
returned from
y
. This is causing the
y1
task to raise the TypeError when it tries to iterate over something that is not iterable. Take a look into using triggers to assist with cases like this! https://docs.prefect.io/api/unreleased/triggers.html e.g. this flow will only run the aggregate task if all upstreams are successful
Copy code
from prefect import task, Flow
from prefect.triggers import all_successful

@task
def divide(x):
    return 1 / x

@task(trigger=all_successful)
def aggregate(results):
    print(results)

with Flow("divide-fail") as flow:
    results = divide.map([0, 1, 2])
    aggregate(results)
flow.run()
After looking into this more it does feel more to me like undesirable UX because I agree that this feels it would be more natural to see output like:
Copy code
[2019-12-03 14:58:33,956] INFO - prefect.TaskRunner | Task 'tryprint': Starting task run...
[2019-12-03 14:58:33,956] DEBUG - prefect.TaskRunner | Task 'tryprint': TRIGGERFAIL signal raised during execution.
[2019-12-03 14:58:33,956] DEBUG - prefect.TaskRunner | Task 'tryprint': Handling state change from Pending to TriggerFailed
[2019-12-03 14:58:33,958] INFO - prefect.TaskRunner | Task 'tryprint': finished task run for task with final state: 'TriggerFailed'
[2019-12-03 14:58:33,958] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
[2019-12-03 14:58:33,959] DEBUG - prefect.FlowRunner | Flow 'math': Handling state change from Running to Failed
Trigger was "all_successful" but some of the upstream tasks failed.
Where the downstream should also raise a triggerfailed due to the upstreams when it comes to mapping.
a
you take that result and pass it into
y1
with fail=False so it will reach the
[k+1 for k in x]
with
x
being the
FAIL
returned from
y
. <----- the y1 should have a TRIGGERFAIL result, after which the TRIGGERFAIL attempts to be iterated over for the mapping operation in the z task, thus the 'does not support indexing error' Maybe a change can be made to not attempt to iterate over results and map tasks when the upstream result is a signal?
j
Interesting idea! I’m discussing this w/ @Chris White