Lucas Beck
09/16/2021, 9:21 AM[x1,x2,...xn]
that can be processed in parallel through a DAG. Out of this n
inputs, let's say that a fraction of those k
fail due to various reasons. We would then find the issues, fix them and like to rerun the flow for those inputs. When n
and k
is large, then it is quite some work to go through the UI and/or logs to find all the inputs that fail in order to start the next run. What I am doing right now is naming the tasks with their inputs, that way I can filter failed tasks in the UI. This is not optimal, as it is still involves quite some manual work copying and pasting stuff. I initially thought about logging which inputs failed at the end of the flow execution. However, I cannot think of a trivial way of doing this without it involving some form of global variable that needs to be passed around to different tasks and be write/read thread safe. It just does not sound like the proper way of tackling this. How do others approach this?
Challenge 2
Second, my tasks are spinning up Kubernetes jobs/pods to run computational heavy workloads. If the task execution goes well, then I use DeleteNameSpaceJob
to clean up once a job completes. But when I manually cancel those flow runs or once a job fails, I end up with lots of clutter that I need to manually clean up in the k8s cluster. I have been thinking about using state handlers to deal with this, calling the DeleteNameSpaceJob
once the new state is Cancelled
or Failed
. However, the state handler needs to know the job name it needs to delete, which is a piece of information that the tasks knows about. My struggle is on how to send the job_name
from the task to the state handler? As I understand the state handler has a specific signature, so I cannot pass extra parameters. I tried using the context for that, but what I have written in the context within the task does not seem to be propagated to the state handler. Any ideas here?
Thanks!Andor Tóth
09/16/2021, 9:31 AMAndor Tóth
09/16/2021, 9:31 AMlog
table)Lucas Beck
09/16/2021, 11:27 AMLucas Beck
09/16/2021, 11:36 AMTask
, that way I can save the job_name
into the class that get passed to the state handler.Andor Tóth
09/16/2021, 11:43 AMLucas Beck
09/16/2021, 12:02 PMall_finished
as the triggerKevin Kho
Kevin Kho
import prefect
from prefect import task, Flow
from prefect.tasks.control_flow import FilterTask
@task()
def abc(x):
if x == 2 or x == 3:
raise ValueError(f"I got the value {x}")
return x+1
@task()
def log_stuff(x):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(x)
return x
with Flow("test") as flow:
a = abc.map([1,2,3,4,5,6])
failed_tasks = FilterTask(lambda x: isinstance(x, BaseException))(a)
log_stuff.map(failed_tasks)
flow.run()
You can feed it to some task to log all of the outputsLucas Beck
09/16/2021, 2:03 PMKevin Kho
always_trigger
would work to delete it..but you do need the id…. You can indeed subclass it to return the id. I think that’s the only way cuz that task doesn’tKevin Kho
Lucas Beck
09/16/2021, 2:10 PM