https://prefect.io logo
Title
a

alvin goh

04/28/2020, 7:31 AM
Is it possible to have a List operation complete even if some upstream tasks fail? I want to collect the results of upstream tasks regardless of whether they fail.
from prefect import task, Flow
from prefect.engine import signals
import random
from prefect.triggers import all_finished

@task()
def randomly_fail():
    x = random.random()
    if x > 0.7:
        raise ValueError("x is too large")

        
@task(trigger=all_finished)
def print_result(l):
    print(l)
    return l
with Flow("random-mapping") as f:
    result = [randomly_fail() for i in range(10)]
    print_result(result)
    
f.run()
output shows that the list operation failed so i cant print the final results.
[2020-04-28 07:33:16,639] INFO - prefect.TaskRunner | Task 'randomly_fail': Starting task run...
[2020-04-28 07:33:16,640] ERROR - prefect.TaskRunner | Unexpected error: ValueError('x is too large',)
Traceback (most recent call last):
  File "c:\tools\anaconda3\envs\prefect\lib\site-packages\prefect\engine\runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "c:\tools\anaconda3\envs\prefect\lib\site-packages\prefect\engine\task_runner.py", line 884, in get_task_run_state
    self.task.run, timeout=self.task.timeout, **raw_inputs
  File "c:\tools\anaconda3\envs\prefect\lib\site-packages\prefect\utilities\executors.py", line 185, in timeout_handler
    return fn(*args, **kwargs)
  File "<ipython-input-6-020915229ace>", line 10, in randomly_fail
    raise ValueError("x is too large")
ValueError: x is too large
[2020-04-28 07:33:16,647] INFO - prefect.TaskRunner | Task 'randomly_fail': finished task run for task with final state: 'Failed'
[2020-04-28 07:33:16,662] INFO - prefect.TaskRunner | Task 'List': Starting task run...
[2020-04-28 07:33:16,669] INFO - prefect.TaskRunner | Task 'List': finished task run for task with final state: 'TriggerFailed'
[2020-04-28 07:33:16,685] INFO - prefect.TaskRunner | Task 'print_result': Starting task run...
Trigger was "all_successful" but some of the upstream tasks failed.
[2020-04-28 07:33:16,693] INFO - prefect.TaskRunner | Task 'print_result': finished task run for task with final state: 'Success'
[2020-04-28 07:33:16,694] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
d

David Ojeda

04/28/2020, 7:40 AM
Sure, have a look at the trigger option of a task: https://docs.prefect.io/api/latest/triggers.html#functions
:upvote: 2
a

alvin goh

04/28/2020, 11:38 AM
ah my careless, i did List(*results) instead of List()(*results). it works as expected!