I have a task that maps on a list of `table_names`...
# prefect-community
e
I have a task that maps on a list of
table_names
and loads some files in and s3 bucket, under prefix=
table_name
. I wanted to add a
state_handler
that deletes any leftover file from the failed task, should my task go in a
Failed
state. The problem is that the files I want to delete are identifiable by a task input. I have failed to find a way to access input parameters from the state_handler callback. I thought
task.inputs()
would get me what I wanted, but that had only type informations. Any suggestions?
m
Why not use another task with any_failed trigger?
e
how would that work if only a subset of my mapped tasks fails?
Also I retry multiple times before moving to the next tasks, and each retry would add new folders. I suspect a downstream task wouldn’t be able to act on each retried task run.
c
I think Mikhail’s trigger suggestion is the best way to go here. I assume you add new folders for each retry in some patterned way based on the inputs, so I’d recommend re-using that pattern for deleting the files.
Copy code
mapped_task = my_task.map(inputs)
cleanup_task = Task(trigger=any_failed)(inputs, upstream_tasks=[mapped_task])
the cleanup task only runs if
mapped_task
or
inputs
fails, and has access to all of the same inputs that the mapped tasks had
e
@Chris White This doesn’t quite result in the behavior I was hoping for, but it gave me some ideas. This is definitely the right direction if task inputs do not exist in the state handlers. My main concern is that when using, say,
max_retries=5
cleanup won’t trigger if the most retries used by any of the mapped task is 4. But I still want to cleanup leftovers from runs 1-2 for a 3 retry task, and runs 1-2-3 for a 4 retry task etc.
I think I got this, filename pattern will use the
input
to my
mapped_task
and
task_run_count
from
prefect.context
. I will then pass the
task_run_count
alongside
input
to my
cleanup_task
, which is set to
always_run
. I can then cleanup for filename patterns that are generated with a
task_run_count
<
run_count_input
to the
cleanup_task
. I should also watch out if all retries fail.