Hi again, After getting some great help here, I h...
# ask-community
l
Hi again, After getting some great help here, I have two new things I am struggling with, perhaps someone here has some input. Challenge 1 Firstly, we have a pretty common use case where we have a flow that receives a list of inputs
[x1,x2,...xn]
that can be processed in parallel through a DAG. Out of this
n
inputs, let's say that a fraction of those
k
fail due to various reasons. We would then find the issues, fix them and like to rerun the flow for those inputs. When
n
and
k
is large, then it is quite some work to go through the UI and/or logs to find all the inputs that fail in order to start the next run. What I am doing right now is naming the tasks with their inputs, that way I can filter failed tasks in the UI. This is not optimal, as it is still involves quite some manual work copying and pasting stuff. I initially thought about logging which inputs failed at the end of the flow execution. However, I cannot think of a trivial way of doing this without it involving some form of global variable that needs to be passed around to different tasks and be write/read thread safe. It just does not sound like the proper way of tackling this. How do others approach this? Challenge 2 Second, my tasks are spinning up Kubernetes jobs/pods to run computational heavy workloads. If the task execution goes well, then I use
DeleteNameSpaceJob
to clean up once a job completes. But when I manually cancel those flow runs or once a job fails, I end up with lots of clutter that I need to manually clean up in the k8s cluster. I have been thinking about using state handlers to deal with this, calling the
DeleteNameSpaceJob
once the new state is
Cancelled
or
Failed
. However, the state handler needs to know the job name it needs to delete, which is a piece of information that the tasks knows about. My struggle is on how to send the
job_name
from the task to the state handler? As I understand the state handler has a specific signature, so I cannot pass extra parameters. I tried using the context for that, but what I have written in the context within the task does not seem to be propagated to the state handler. Any ideas here? Thanks!
a
for C1: maybe feed the results to a final task which logs which inputs did not provided results in the end
or use the database directly (
log
table)
l
@Andor Tóth, but if a task fails in the middle of the DAG then the final task will never get triggered.
Regarding C2, I am thinking about subclassing
Task
, that way I can save the
job_name
into the class that get passed to the state handler.
a
l
Uuuu, didn't know about that. Ok, that looks promising. Just have a task with
all_finished
as the trigger
k
Andor has a really good suggestion (better than what I was thining) for C2. I have a code snippet for C1 one sec
Collect your errors like this:
Copy code
import prefect
from prefect import task, Flow
from prefect.tasks.control_flow import FilterTask

@task()
def abc(x):
    if x == 2 or x == 3:
        raise ValueError(f"I got the value {x}")
    return x+1

@task()
def log_stuff(x):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(x)
    return x

with Flow("test") as flow:
    a = abc.map([1,2,3,4,5,6])
    failed_tasks = FilterTask(lambda x: isinstance(x, BaseException))(a)
    log_stuff.map(failed_tasks)
    
flow.run()
You can feed it to some task to log all of the outputs
l
Cool @Kevin Kho, thanks! Indeed that works for C1. For C2, were you referring to what I mention? I don't think @Andor Tóth said anything about C2.
k
Ah lol I see what you mean. I thought an
always_trigger
would work to delete it..but you do need the id…. You can indeed subclass it to return the id. I think that’s the only way cuz that task doesn’t
👍 1
You can also subclass it and persist the id using the KV Store so that it can be loaded later https://docs.prefect.io/orchestration/concepts/kv_store.html
l
All right cool, I will try the subclassing as we are not using cloud ATM.