Hey everyone, is it possible to break a flow if a ...
# ask-community
t
Hey everyone, is it possible to break a flow if a mapped task runs on error without executing the following mapped tasks? If not, any suggestions how to solve this problem? Szenario: Executing a pipeline for a list of files (1...n) but if one runs on failure the flow should stopp immediately. I tried it with state handlers and the flow runs on error. But all instances of the mapped task get executed before the flow runs on error.
k
Hey @Timo, I think the guarantees around stopping a mapped process are weak, especially if executed by a Dask cluster. Did you try the ENDRUN signal though?
t
Hey @Kevin Kho, thanks for your response. I don't like to run it on a Dask cluster because the sequence of the files matters. I tried ENDRUN but can't get it to work.
Copy code
from prefect import task, Flow
from prefect.engine.signals import FAIL, ENDRUN
from prefect.engine.state import State, Success


def stateh(obj, old, new: State):
    if new.is_failed():
        raise ENDRUN(new)
    return new


@task
def say_hello(name):
    print(f"Hello {name}")


@task(state_handlers=[stateh])
def forerror(para):
    if para == 2:
        raise FAIL("it's 2")
    else:
        print(para)
        return para + 1

with Flow("hello_flow") as flow:
    ls = [1, 2, 3]

    sh = say_hello("John")
    e = forerror.map(ls)


if __name__ == "__main__":
    flow.run()
k
I think in this case, doing it in the state handler is too late for you so you need to raise
ENDRUN
instead of fail when
para == 2
t
This did not work either 😞
Copy code
from prefect import task, Flow
from prefect.engine.signals import FAIL, ENDRUN
from prefect.engine.state import Failed, State

def stateh(obj, old, new: State):
    if new.is_failed():
        raise ENDRUN(new)
    return new


@task
def say_hello(name):
    print(f"Hello {name}")


# @task(state_handlers=[stateh])
@task
def forerror(para):
    if para == 2:
        # raise FAIL("it's 2")
        state = Failed("it's 2")
        raise ENDRUN(state)
    else:
        print(para)
        return para + 1


with Flow("hello_flow") as flow:
    ls = [1, 2, 3]

    sh = say_hello("John")
    e = forerror.map(ls)


if __name__ == "__main__":
    flow.run()
Copy code
[2021-06-16 16:20:11+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'hello_flow'
[2021-06-16 16:20:11+0200] DEBUG - prefect.FlowRunner | Using executor type LocalExecutor
[2021-06-16 16:20:11+0200] DEBUG - prefect.FlowRunner | Flow 'hello_flow': Handling state change from Scheduled to Running
[2021-06-16 16:20:11+0200] INFO - prefect.TaskRunner | Task 'say_hello': Starting task run...
[2021-06-16 16:20:11+0200] DEBUG - prefect.TaskRunner | Task 'say_hello': Handling state change from Pending to Running
[2021-06-16 16:20:11+0200] DEBUG - prefect.TaskRunner | Task 'say_hello': Calling task.run() method...
Hello John
[2021-06-16 16:20:11+0200] DEBUG - prefect.TaskRunner | Task 'say_hello': Handling state change from Running to Success
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'say_hello': Finished task run for task with final state: 'Success'
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror': Starting task run...
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror': Handling state change from Pending to Mapped
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror': Finished task run for task with final state: 'Mapped'
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror[0]': Starting task run...
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[0]': Handling state change from Pending to Running
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[0]': Calling task.run() method...
1
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[0]': Handling state change from Running to Success
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror[0]': Finished task run for task with final state: 'Success'
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror[1]': Starting task run...
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[1]': Handling state change from Pending to Running
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[1]': Calling task.run() method...
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[1]': Handling state change from Running to Failed
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror[1]': Finished task run for task with final state: 'Failed'
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror[2]': Starting task run...
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[2]': Handling state change from Pending to Running
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[2]': Calling task.run() method...
3
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[2]': Handling state change from Running to Success
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror[2]': Finished task run for task with final state: 'Success'
[2021-06-16 16:20:12+0200] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
[2021-06-16 16:20:12+0200] DEBUG - prefect.FlowRunner | Flow 'hello_flow': Handling state change from Running to Failed
k
Will give this a shot
Yeah I think there is not way to cancel like this when the task is mapped, because map was designed for parallel execution and it’s hard to cancel other parallel threads.
👍 1
t
Thanks for the response. This totally makes sense. What i really need in this case is some kind of looping (https://docs.prefect.io/core/advanced_tutorials/task-looping.html) . But currently I have no idea how to loop over a bunch of tasks or the entire flow.
I found this way in another community thread. It works with a static input. But it doesn't work with a depending task which delivers the input (use
ls = get_list()
instead of
ls = [1,2,3]
). I receive a
Task is not iterable
error if I use the output of the get_list() task
Copy code
from prefect import task, Flow
from prefect.engine.signals import FAIL

import prefect

LOGGER = prefect.context.get("logger")


@task
def say_hello(name):
    <http://LOGGER.info|LOGGER.info>(f"Hello {name}")
    return name


@task
def forerror(para):
    if para == 2:
        raise FAIL("it's 2")
    else:
        print(para)
        return para + 1


@task
def get_list():
    return [1, 2, 3]


with Flow("hello_flow") as flow:
    # ls = get_list()
    ls = [1, 2, 3]
    h_tasks = [say_hello("John") for x in ls]
    e_tasks = [forerror(para=x) for x in ls]

    for i in range(0, len(ls)):
        e_tasks[i].set_upstream(h_tasks[i])
        if i > 0:
            e_tasks[i].set_upstream(e_tasks[i - 1])

if __name__ == "__main__":
    flow.run()
k
I think we can get this to work. I’ll work on this in a bit
This might be the easiest for your use case:
Copy code
from prefect import task, Flow
from prefect.engine.signals import FAIL
import prefect
LOGGER = prefect.context.get("logger")
@task
def say_hello(name):
    <http://LOGGER.info|LOGGER.info>(f"Hello {name}")
    return name
@task
def forerror(para):
    if para == 2:
        raise FAIL("it's 2")
    else:
        print(para)
        return para + 1        
@task
def get_list():
    return [1, 2, 3]

@task
def helper(ls):
    for x in ls:
        say_hello.run("John")
        y = forerror.run(para=x)   
    return y

with Flow("hello_flow") as flow:
    ls = get_list()
    helper(ls)

flow.run()
❤️ 1
I can help you dive into Task Looping if you want. I’m aware this doesn’t give you monitoring for each of 1, 2, and 3. We can shape this into Task Looping if you want.
t
Thank you very much. This is working as expected. Don't know that I could run tasks within a task by simply using
run()
(which totally makes sense because it's all python)... Downside of this approach is, that I can't monitor each "sub" task as you said. Therefore implementing Task Looping would be great. Could I reuse existing "offical" Prefect tasks within a Task Looping constuct? The example at the docs shows only custom tasks (with the @task decorator) (raising the LOOP signal). A another question I have: Could I use the map function with each or one of the constructs? E.g. I have a list of files which is splitted by days (
[[file1-day1.zip,file2-day1.zip,....,fileN-day1.zip], [file1-day2.zip,file2-day2.zip,....,fileN-day2.zip]]
. Now I like to iterate over the sequence but within the sequence I like to use map to extract all zip files with the Unzip-task. Currently I got
ValueError: Could not infer an active Flow context.
. As I discovered I could use
StartFlowRun
to start another flow which implements the mapping routine. But this could be not tested locally as
StartFlowRun
only works with cloud or server.