Joël Luijmes
06/24/2021, 7:26 AMJoël Luijmes
06/24/2021, 7:27 AM@task(trigger=all_finished)
def start_dbt_snapshot_flow_task(mysql_database: str, mysql_tables: List[str]):
# When upstream task fails, it is either an exception or signals.FAIL (which also derives from
# exception).
mysql_tables = [x for x in mysql_tables if not isinstance(x, Exception)]
##
mysql_tables = []
for spec in specs:
##
table = CreateSyncTask(spec)
mysql_tables.append(table)
start_dbt_snapshot_flow_task(mysql_database, mysql_tables)
Joël Luijmes
06/24/2021, 7:28 AMemre
06/24/2021, 8:18 AMFilterTask
with no filter for these cases, which filters out exceptions, failed upstream tasks etc.
https://docs.prefect.io/api/0.13.19/tasks/control_flow.html#filtertaskJoël Luijmes
06/24/2021, 8:22 AMJoël Luijmes
06/24/2021, 8:26 AMJoël Luijmes
06/24/2021, 8:26 AMemre
06/24/2021, 8:49 AMtrigger=all_successful
, If not, you probably need to call List
task explicitly.
from prefect.triggers import all_finished
from prefect.tasks.core.collections import List
from prefect.tasks.control_flow import FilterTask
collect_tasks = List(trigger=all_finished)(upstream_task_1, upstream_task_2, upstream_task_3, ....)
successful = FilterTask()(collect_tasks)
Joël Luijmes
06/24/2021, 12:08 PMJoël Luijmes
06/24/2021, 12:09 PMJoël Luijmes
06/24/2021, 12:09 PMmysql_tables = ListTask()(mysql_tables)
mysql_tables = FilterTask()(mysql_tables)
start_dbt_snapshot_flow_task(mysql_database, mysql_tables)
Joël Luijmes
06/24/2021, 12:12 PMmysql_tables = ListTask(trigger=all_finished)(*mysql_tables)
mysql_tables = FilterTask()(mysql_tables)
start_dbt_snapshot_flow_task(mysql_database, mysql_tables)
emre
06/24/2021, 1:58 PMList
task is both magical and cryptic 😅 . I guess prefect tries to create a task out of some non task thing, precisely when it is passed to Task.__call__()
.
• Without the asterisk, you were passing a list of prefect tasks, which is not a task, so prefect tries to convert it into a List
task.
• With the asterisk you end up passing a number of prefect tasks, so all ends up well.
• Today we both learned something new 👍
Asterisk one seems good, should work.