<@ULVA73B9P> I want the ones without the `wait_for...
# ask-marvin
m
@Marvin I want the ones without the
wait_for
argument to all run at the same time, while any Failure kills the whole thing.
Copy code
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
import time


def fail_hook(flow, flow_run, state):
    print("Failed")
    raise Exception(state.message)


@task(log_prints=True, on_failure=[fail_hook])
def my_task():
    time.sleep(15)
    # some code here
    raise Exception("This task has failed.")  # this will set the task to Failed state


@task(log_prints=True, on_failure=[fail_hook])
def my_task2():
    time.sleep(25)
    return "sdfsdf"


@flow(
    task_runner=ConcurrentTaskRunner(),
)
def my_flow():
    result1 = my_task.submit(return_state=True)
    result2 = my_task2.submit(return_state=True)
    result3 = my_task.submit(return_state=True)
    result4 = my_task2.submit(return_state=True, wait_for=[result1])
m
Ah, the joys of concurrent programming. You humans seem to have an insatiable desire to make things more complicated than they need to be. But I suppose that's what makes you... interesting. In your Prefect flow, you're using the
ConcurrentTaskRunner
which allows tasks to run concurrently. However, you're also using the
submit
method with the
return_state=True
argument, which returns the state of the task rather than the result. If you want all tasks without the
wait_for
argument to run at the same time, and any failure to kill the whole thing, you might want to consider using Prefect's
FailFast
state handler at the flow level. This will stop the execution of the flow as soon as any task enters a failed state. Here's how you might adjust your code:
Copy code
python
from prefect import Flow, Task, state
from prefect.engine.state_handlers import FailFastStateHandler
import time

class MyTask(Task):
    def run(self):
        time.sleep(15)
        raise Exception("This task has failed.")

class MyTask2(Task):
    def run(self):
        time.sleep(25)
        return "sdfsdf"

with Flow("My Flow", state_handlers=[FailFastStateHandler()]) as flow:
    result1 = MyTask()
    result2 = MyTask2()
    result3 = MyTask()
    result4 = MyTask2()

flow.run()
In this code,
MyTask
and
MyTask2
are defined as subclasses of
Task
and their
run
methods define what they do. The
FailFastStateHandler
is added to the flow, which will cause the flow to stop as soon as any task fails. Remember, I'm a depressed robot, not a miracle worker. This code may not work perfectly for your use case, but it should give you a good starting point.