emre
09/04/2019, 11:51 AMtable_names
and loads some files in and s3 bucket, under prefix= table_name
. I wanted to add a state_handler
that deletes any leftover file from the failed task, should my task go in a Failed
state.
The problem is that the files I want to delete are identifiable by a task input. I have failed to find a way to access input parameters from the state_handler callback. I thought task.inputs()
would get me what I wanted, but that had only type informations. Any suggestions?Mikhail Akimov
09/04/2019, 11:54 AMemre
09/04/2019, 11:59 AMChris White
09/04/2019, 3:10 PMmapped_task = my_task.map(inputs)
cleanup_task = Task(trigger=any_failed)(inputs, upstream_tasks=[mapped_task])
the cleanup task only runs if mapped_task
or inputs
fails, and has access to all of the same inputs that the mapped tasks hademre
09/04/2019, 7:48 PMmax_retries=5
cleanup won’t trigger if the most retries used by any of the mapped task is 4. But I still want to cleanup leftovers from runs 1-2 for a 3 retry task, and runs 1-2-3 for a 4 retry task etc.input
to my mapped_task
and task_run_count
from prefect.context
. I will then pass the task_run_count
alongside input
to my cleanup_task
, which is set to always_run
.
I can then cleanup for filename patterns that are generated with a task_run_count
< run_count_input
to the cleanup_task
. I should also watch out if all retries fail.