Hi, I’m having a DAG with basic fan-out, fan-in id...
# ask-community
j
Hi, I’m having a DAG with basic fan-out, fan-in idea. Some of the fanned out tasks may fail, but I still want to continue the fan-in task for the succesful tasks. Thus if I have upstream: [x, y, z, FAILURE], the downstream still should process for input [x, y, z]. What I did; • Added trigger=all_finished to downstream task • Iterate over the input, filter out exception Although this worked fine locally while developing, I now got an edge case that one of the upstream tasks is TriggerFailed. How can I mitigate this? And, still process the succesfull upstreams?
Copy code
@task(trigger=all_finished)
def start_dbt_snapshot_flow_task(mysql_database: str, mysql_tables: List[str]):
    # When upstream task fails, it is either an exception or signals.FAIL (which also derives from
    # exception).
    mysql_tables = [x for x in mysql_tables if not isinstance(x, Exception)]


##

mysql_tables = []
for spec in specs:
    ##
    table = CreateSyncTask(spec)
    mysql_tables.append(table)

start_dbt_snapshot_flow_task(mysql_database, mysql_tables)
Okay looking at the UI, it seems that the implicit created List task failed?
e
I usually use
FilterTask
with no filter for these cases, which filters out exceptions, failed upstream tasks etc. https://docs.prefect.io/api/0.13.19/tasks/control_flow.html#filtertask
j
Oe that’s great, wasn’t aware of that one, thanks!
Any clue how I would incorporate it in my flow? Prefect makes that implicit List task, as I’m not mapping something, i’m unsure how to use it 😓
e
yeah I think this is fine, FilterTask expects a list. If you are using square brackets and letting prefect create the List task, make sure the created list task has
trigger=all_successful
, If not, you probably need to call
List
task explicitly.
Copy code
from prefect.triggers import all_finished
from prefect.tasks.core.collections import List
from prefect.tasks.control_flow import FilterTask
    
collect_tasks = List(trigger=all_finished)(upstream_task_1, upstream_task_2, upstream_task_3, ....)
successful = FilterTask()(collect_tasks)
j
Hm if I call the List task explicitly, i end up with two List tasks..
Copy code
mysql_tables = ListTask()(mysql_tables)
        mysql_tables = FilterTask()(mysql_tables)
        start_dbt_snapshot_flow_task(mysql_database, mysql_tables)
Hmm I suppose this works, not sure if it is valid? (Note thee asterix)
Copy code
mysql_tables = ListTask(trigger=all_finished)(*mysql_tables)
mysql_tables = FilterTask()(mysql_tables)
start_dbt_snapshot_flow_task(mysql_database, mysql_tables)
e
Yeah prefect translating things like lists into a
List
task is both magical and cryptic 😅 . I guess prefect tries to create a task out of some non task thing, precisely when it is passed to
Task.__call__()
. • Without the asterisk, you were passing a list of prefect tasks, which is not a task, so prefect tries to convert it into a
List
task. • With the asterisk you end up passing a number of prefect tasks, so all ends up well. • Today we both learned something new 👍 Asterisk one seems good, should work.
👍 2
103 Views