emre
09/04/2019, 11:51 AMtable_names and loads some files in and s3 bucket, under prefix= table_name. I wanted to add a state_handler that deletes any leftover file from the failed task, should my task go in a Failed state.
The problem is that the files I want to delete are identifiable by a task input. I have failed to find a way to access input parameters from the state_handler callback. I thought task.inputs() would get me what I wanted, but that had only type informations. Any suggestions?Mikhail Akimov
09/04/2019, 11:54 AMemre
09/04/2019, 11:59 AMemre
09/04/2019, 12:01 PMChris White
mapped_task = my_task.map(inputs)
cleanup_task = Task(trigger=any_failed)(inputs, upstream_tasks=[mapped_task])
the cleanup task only runs if mapped_task or inputs fails, and has access to all of the same inputs that the mapped tasks hademre
09/04/2019, 7:48 PMmax_retries=5 cleanup won’t trigger if the most retries used by any of the mapped task is 4. But I still want to cleanup leftovers from runs 1-2 for a 3 retry task, and runs 1-2-3 for a 4 retry task etc.emre
09/04/2019, 8:00 PMinput to my mapped_task and task_run_count from prefect.context. I will then pass the task_run_count alongside input to my cleanup_task, which is set to always_run.
I can then cleanup for filename patterns that are generated with a task_run_count < run_count_input to the cleanup_task. I should also watch out if all retries fail.