Hi, I’m running the same task multiple times in a ...
# ask-community
h
Hi, I’m running the same task multiple times in a flow using some Parameters that make different stuff happen each time, using the
map
method:
process_tasks.map(flow_description, start_task_id)
My problem is that if one of the mapped tasks fails, the other ones still subsequently run regardless, and I’d like for the whole flow to be cancelled programatically/automatically when that happens. I think the CancelFlowRun API method should be able to do this if I invoke it just before the failed task enters the failed state (via a raise exception). I’m struggling to figure out how to call this thing though, and was hoping someone might be able to help, please. If I try the following:
Copy code
prefect.tasks.prefect.flow_run_cancel.CancelFlowRun.run(context.get("flow_run_id"))
My python engine says it can’t find the
flow_run_cancel
method in any imported Prefect module, and I’ve already imported
task, Flow, Parameter, context, Task, tasks
(for good measure). I’m clearly doing something very wrong and don’t know enough to figure out what it is. Would be very grateful for any help or suggestions to put me out of my misery. Thanks in advance!
Hmmm. Poking around the flow_run_cancel.py source code on GitHub made me wonder if what I should be doing is:
from prefect.client import Client
client = Client()
client.cancel_flow_run()
Am I on the right track, or barking up the wrong tree? The Flow is running in Prefect Cloud BTW, via a Prefect K8S agent running in Azure (if that makes any difference)
(I can see from the source code it’ll pick up the current flow_run_id from the context, so no need to provide it explicitly)
k
Have you seen the docs on state handlers ? You can put the logic to cancel the flow run inside the state handler and attach it to the mapped task. This state handler can be used before the task is marked as failed. I don’t think this will be graceful at the moment and k8s might not shut down when the Flow is cancelled. is raising SKIP not an option for this? This will not run subsequent tasks
h
Thanks very much for these suggestions Kevin. I think I’ll first try
Copy code
from prefect.engine import signals
and then in my task run:
Copy code
raise signals.SKIP(message='skipping!')
as suggested, in order to avoid the k8s agent from persisting
k
Actually I have a new idea
Copy code
with Flow(...) as flow:
    first_task = task_a.map(...)
    check_for_fail = ... # check if first_task has a fail
    with case(check_for_fail, False):
         second_task = task_b.map(first_task)
If you use the conditional block, you can just skip those other mapped tasks by default
h
Thanks for the additional suggestion Kevin. Can you please provide a little more detail regarding that final approach, because I’m struggling to fill in the gaps
(sorry)
k
The
case
task is a Prefect task that is the equivalent of
if-else
. It propagates the SKIP if the condition is not met
Maybe this will help?
h
Sorry, Kevin, I should’ve been more specific. I’m fine with case, it was more the checking for the task failing bit which I’m struggling with, as well as the use of map in the example - not sure I understand why second_task = task_b.map(first_task). In my case I’m only running one task (called process_tasks), but with 4 separate parameters via a single map method, like so:
Copy code
flow_description = Parameter("flows", default=["Task 1", "Task 2", "Task 3","Task 4"])
start_task_id = Parameter("task_id", default=[1, 1, 1, 1])
process_tasks.map(flow_description, start_task_id)
k
Ah gotcha. If you want Task 1's failure to end the Task 2, Task 3, and Task 4 runs, then I think this is very hard because these are treated as 4 different first-class tasks. Raising FAIL or SKIP will only apply to that current task and not to the other mapped ones.
Are you on a DaskExecutor?
h
Ah! Yes, I tried raising SKIP earlier and it completely failed to prevent the other mapped tasks from running exactly as you described above (now I understand why, thanks to your explanation). Maybe I just need to refactor the script to invoke each task separately rather than via map, with the conditional logic you suggested in between each. Which brings me back to the earlier question of how to determine if the previous task failed. In answer to your question, no, not running on a Dask Executor.
k
Yeah to check if a task failed, do something like
isinstance(x, BaseException)
because this will identify most errors.
Copy code
with Flow(...) as flow:
     a = task_a()
     a_fail = task(lambda x: isinstance(x, BaseException))(a) # True or False
if
task_a
succeeded, you will get the value instead of an error