Lucas Beck
09/16/2021, 9:21 AM[x1,x2,...xn] that can be processed in parallel through a DAG. Out of this n inputs, let's say that a fraction of those k fail due to various reasons. We would then find the issues, fix them and like to rerun the flow for those inputs. When n and k is large, then it is quite some work to go through the UI and/or logs to find all the inputs that fail in order to start the next run. What I am doing right now is naming the tasks with their inputs, that way I can filter failed tasks in the UI. This is not optimal, as it is still involves quite some manual work copying and pasting stuff. I initially thought about logging which inputs failed at the end of the flow execution. However, I cannot think of a trivial way of doing this without it involving some form of global variable that needs to be passed around to different tasks and be write/read thread safe. It just does not sound like the proper way of tackling this. How do others approach this?
Challenge 2
Second, my tasks are spinning up Kubernetes jobs/pods to run computational heavy workloads. If the task execution goes well, then I use DeleteNameSpaceJob to clean up once a job completes. But when I manually cancel those flow runs or once a job fails, I end up with lots of clutter that I need to manually clean up in the k8s cluster. I have been thinking about using state handlers to deal with this, calling the DeleteNameSpaceJob once the new state is Cancelled or Failed. However, the state handler needs to know the job name it needs to delete, which is a piece of information that the tasks knows about. My struggle is on how to send the job_name from the task to the state handler? As I understand the state handler has a specific signature, so I cannot pass extra parameters. I tried using the context for that, but what I have written in the context within the task does not seem to be propagated to the state handler. Any ideas here?
Thanks!Andor Tóth
09/16/2021, 9:31 AMAndor Tóth
09/16/2021, 9:31 AMlog table)Lucas Beck
09/16/2021, 11:27 AMLucas Beck
09/16/2021, 11:36 AMTask , that way I can save the job_name into the class that get passed to the state handler.Andor Tóth
09/16/2021, 11:43 AMLucas Beck
09/16/2021, 12:02 PMall_finished as the triggerKevin Kho
Kevin Kho
import prefect
from prefect import task, Flow
from prefect.tasks.control_flow import FilterTask
@task()
def abc(x):
if x == 2 or x == 3:
raise ValueError(f"I got the value {x}")
return x+1
@task()
def log_stuff(x):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(x)
return x
with Flow("test") as flow:
a = abc.map([1,2,3,4,5,6])
failed_tasks = FilterTask(lambda x: isinstance(x, BaseException))(a)
log_stuff.map(failed_tasks)
flow.run()
You can feed it to some task to log all of the outputsLucas Beck
09/16/2021, 2:03 PMKevin Kho
always_trigger would work to delete it..but you do need the id…. You can indeed subclass it to return the id. I think that’s the only way cuz that task doesn’tKevin Kho
Lucas Beck
09/16/2021, 2:10 PM