Huw Ringer
10/23/2021, 11:01 PMmap
method: process_tasks.map(flow_description, start_task_id)
My problem is that if one of the mapped tasks fails, the other ones still subsequently run regardless, and I’d like for the whole flow to be cancelled programatically/automatically when that happens. I think the CancelFlowRun API method should be able to do this if I invoke it just before the failed task enters the failed state (via a raise exception). I’m struggling to figure out how to call this thing though, and was hoping someone might be able to help, please. If I try the following:
prefect.tasks.prefect.flow_run_cancel.CancelFlowRun.run(context.get("flow_run_id"))
My python engine says it can’t find the flow_run_cancel
method in any imported Prefect module, and I’ve already imported task, Flow, Parameter, context, Task, tasks
(for good measure). I’m clearly doing something very wrong and don’t know enough to figure out what it is. Would be very grateful for any help or suggestions to put me out of my misery. Thanks in advance!Huw Ringer
10/23/2021, 11:13 PMfrom prefect.client import Client
client = Client()
client.cancel_flow_run()
Am I on the right track, or barking up the wrong tree? The Flow is running in Prefect Cloud BTW, via a Prefect K8S agent running in Azure (if that makes any difference)Huw Ringer
10/23/2021, 11:14 PMKevin Kho
Huw Ringer
10/24/2021, 10:53 AMfrom prefect.engine import signals
and then in my task run:
raise signals.SKIP(message='skipping!')
as suggested, in order to avoid the k8s agent from persistingKevin Kho
with Flow(...) as flow:
first_task = task_a.map(...)
check_for_fail = ... # check if first_task has a fail
with case(check_for_fail, False):
second_task = task_b.map(first_task)
If you use the conditional block, you can just skip those other mapped tasks by defaultHuw Ringer
10/24/2021, 9:32 PMHuw Ringer
10/24/2021, 9:32 PMKevin Kho
case
task is a Prefect task that is the equivalent of if-else
. It propagates the SKIP if the condition is not metKevin Kho
Huw Ringer
10/24/2021, 10:10 PMflow_description = Parameter("flows", default=["Task 1", "Task 2", "Task 3","Task 4"])
start_task_id = Parameter("task_id", default=[1, 1, 1, 1])
process_tasks.map(flow_description, start_task_id)
Kevin Kho
Kevin Kho
Huw Ringer
10/24/2021, 10:48 PMKevin Kho
isinstance(x, BaseException)
because this will identify most errors.
with Flow(...) as flow:
a = task_a()
a_fail = task(lambda x: isinstance(x, BaseException))(a) # True or False
if task_a
succeeded, you will get the value instead of an error