Thread
#prefect-community
    a

    alvin goh

    2 years ago
    Hi - i found something strange... Seem like when a task feeding into a mapped tasks fails or triggerfails, the downstream task still tries to map the result from the upstream(failed) task, and prefect errors out with 'FAILED' object does not support indexing.
    from prefect import Flow, Parameter, task
    from prefect.tasks.core.constants import Constant
    from prefect.engine.signals import FAIL
    @task
    def trythis(x, fail=False):
        if fail:
            raise FAIL()
        return [k+1 for k in x]
        
    @task
    def tryprint(x):
        print(x)
        
    with Flow("math") as f:
        x = Parameter("x")
        y = trythis(x, fail=True)
        y1 = trythis(y, fail=False)
        z = tryprint.map(y1)
        
    flow_state = f.run(x=[1,2,3,4,5,6,7])
    print(flow_state.result[z].result)
    Output:
    [2019-12-03 06:50:35,756] INFO - prefect.FlowRunner | Beginning Flow run for 'math'
    [2019-12-03 06:50:35,758] INFO - prefect.FlowRunner | Starting flow run.
    [2019-12-03 06:50:35,765] INFO - prefect.TaskRunner | Task 'x': Starting task run...
    [2019-12-03 06:50:35,769] INFO - prefect.TaskRunner | Task 'x': finished task run for task with final state: 'Success'
    [2019-12-03 06:50:35,775] INFO - prefect.TaskRunner | Task 'trythis': Starting task run...
    [2019-12-03 06:50:35,780] INFO - prefect.TaskRunner | Task 'trythis': finished task run for task with final state: 'Failed'
    [2019-12-03 06:50:35,786] INFO - prefect.TaskRunner | Task 'trythis': Starting task run...
    [2019-12-03 06:50:35,789] INFO - prefect.TaskRunner | Task 'trythis': finished task run for task with final state: 'TriggerFailed'
    [2019-12-03 06:50:35,795] INFO - prefect.TaskRunner | Task 'tryprint': Starting task run...
    [2019-12-03 06:50:35,796] ERROR - prefect.TaskRunner | Task 'tryprint': unexpected error while running task: TypeError("'TRIGGERFAIL' object does not support indexing",)
    Traceback (most recent call last):
      File "/opt/conda/envs/pipeline/lib/python3.6/site-packages/prefect/engine/task_runner.py", line 259, in run
        executor=executor,
      File "/opt/conda/envs/pipeline/lib/python3.6/site-packages/prefect/engine/task_runner.py", line 719, in run_mapped_task
        upstream_state.result[i],
    TypeError: 'TRIGGERFAIL' object does not support indexing
    [2019-12-03 06:50:35,800] INFO - prefect.TaskRunner | Task 'tryprint': finished task run for task with final state: 'Failed'
    [2019-12-03 06:50:35,802] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
    'TRIGGERFAIL' object does not support indexing
    Seems like the mapped task should also raise triggerfailed if the upstream tasks failed.. maybe it's not so simple as it may just be a subset of upstream tasks which failed... what is the design philosophy on this?
    j

    josh

    2 years ago
    (just for my clarification) It looks like what you’re doing is raising
    FAIL
    which becomes the result of your
    y
    task. The you take that result and pass it into
    y1
    with fail=False so it will reach the
    [k+1 for k in x]
    with
    x
    being the
    FAIL
    returned from
    y
    . This is causing the
    y1
    task to raise the TypeError when it tries to iterate over something that is not iterable. Take a look into using triggers to assist with cases like this! https://docs.prefect.io/api/unreleased/triggers.html e.g. this flow will only run the aggregate task if all upstreams are successful
    from prefect import task, Flow
    from prefect.triggers import all_successful
    
    @task
    def divide(x):
        return 1 / x
    
    @task(trigger=all_successful)
    def aggregate(results):
        print(results)
    
    with Flow("divide-fail") as flow:
        results = divide.map([0, 1, 2])
        aggregate(results)
    flow.run()
    After looking into this more it does feel more to me like undesirable UX because I agree that this feels it would be more natural to see output like:
    [2019-12-03 14:58:33,956] INFO - prefect.TaskRunner | Task 'tryprint': Starting task run...
    [2019-12-03 14:58:33,956] DEBUG - prefect.TaskRunner | Task 'tryprint': TRIGGERFAIL signal raised during execution.
    [2019-12-03 14:58:33,956] DEBUG - prefect.TaskRunner | Task 'tryprint': Handling state change from Pending to TriggerFailed
    [2019-12-03 14:58:33,958] INFO - prefect.TaskRunner | Task 'tryprint': finished task run for task with final state: 'TriggerFailed'
    [2019-12-03 14:58:33,958] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
    [2019-12-03 14:58:33,959] DEBUG - prefect.FlowRunner | Flow 'math': Handling state change from Running to Failed
    Trigger was "all_successful" but some of the upstream tasks failed.
    Where the downstream should also raise a triggerfailed due to the upstreams when it comes to mapping.
    a

    alvin goh

    2 years ago
    you take that result and pass it into
    y1
    with fail=False so it will reach the
    [k+1 for k in x]
    with
    x
    being the
    FAIL
    returned from
    y
    . <----- the y1 should have a TRIGGERFAIL result, after which the TRIGGERFAIL attempts to be iterated over for the mapping operation in the z task, thus the 'does not support indexing error' Maybe a change can be made to not attempt to iterate over results and map tasks when the upstream result is a signal?
    j

    josh

    2 years ago
    Interesting idea! I’m discussing this w/ @Chris White