Ben Davison
07/21/2020, 12:36 PM@task(name="Update a tables partition")
def update_partitions(table, task_start_date=None, task_end_date=None):
# print(task_start_date)
# print(task_end_date)
print(table)
with Flow(name="demo", on_failure=datadog_notifier) as flow:
start_date = Parameter('task_start_date', required=False)
end_date = Parameter('task_end_date', required=False)
tables_to_create = get_uncreated_tables()
tables_already_created = get_created_tables()
create_table.map(tables_to_create)
# tables to create will have created
update_partitions.map(tables_to_create + tables_already_created, task_start_date=start_date, task_end_date=end_date)
Other tasks removed for clarity, but they return a list of objects.
Results in an error:
[2020-07-21 12:36:43] INFO - prefect.TaskRunner | Task 'Update a tables partition': Starting task run...
[2020-07-21 12:36:43] DEBUG - prefect.TaskRunner | Task 'Update a tables partition': Handling state change from Pending to Failed
[2020-07-21 12:36:43] INFO - prefect.TaskRunner | Task 'Update a tables partition': finished task run for task with final state: 'Failed'
[2020-07-21 12:36:43] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
[2020-07-21 12:36:43] DEBUG - prefect.FlowRunner | Flow 'demo': Handling state change from Running to Failed
Removing the parameters runs everything fine.Alberto de Santos
10/17/2020, 2:44 PMdatadog_notifier
deals with the failure? Thanks!