Thread
#prefect-community
    t

    Timo

    1 year ago
    Hey everyone, is it possible to break a flow if a mapped task runs on error without executing the following mapped tasks? If not, any suggestions how to solve this problem? Szenario: Executing a pipeline for a list of files (1...n) but if one runs on failure the flow should stopp immediately. I tried it with state handlers and the flow runs on error. But all instances of the mapped task get executed before the flow runs on error.
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey @Timo, I think the guarantees around stopping a mapped process are weak, especially if executed by a Dask cluster. Did you try the ENDRUN signal though?
    t

    Timo

    1 year ago
    Hey @Kevin Kho, thanks for your response. I don't like to run it on a Dask cluster because the sequence of the files matters. I tried ENDRUN but can't get it to work.
    from prefect import task, Flow
    from prefect.engine.signals import FAIL, ENDRUN
    from prefect.engine.state import State, Success
    
    
    def stateh(obj, old, new: State):
        if new.is_failed():
            raise ENDRUN(new)
        return new
    
    
    @task
    def say_hello(name):
        print(f"Hello {name}")
    
    
    @task(state_handlers=[stateh])
    def forerror(para):
        if para == 2:
            raise FAIL("it's 2")
        else:
            print(para)
            return para + 1
    
    with Flow("hello_flow") as flow:
        ls = [1, 2, 3]
    
        sh = say_hello("John")
        e = forerror.map(ls)
    
    
    if __name__ == "__main__":
        flow.run()
    Kevin Kho

    Kevin Kho

    1 year ago
    I think in this case, doing it in the state handler is too late for you so you need to raise
    ENDRUN
    instead of fail when
    para == 2
    t

    Timo

    1 year ago
    This did not work either 😞
    from prefect import task, Flow
    from prefect.engine.signals import FAIL, ENDRUN
    from prefect.engine.state import Failed, State
    
    def stateh(obj, old, new: State):
        if new.is_failed():
            raise ENDRUN(new)
        return new
    
    
    @task
    def say_hello(name):
        print(f"Hello {name}")
    
    
    # @task(state_handlers=[stateh])
    @task
    def forerror(para):
        if para == 2:
            # raise FAIL("it's 2")
            state = Failed("it's 2")
            raise ENDRUN(state)
        else:
            print(para)
            return para + 1
    
    
    with Flow("hello_flow") as flow:
        ls = [1, 2, 3]
    
        sh = say_hello("John")
        e = forerror.map(ls)
    
    
    if __name__ == "__main__":
        flow.run()
    [2021-06-16 16:20:11+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'hello_flow'
    [2021-06-16 16:20:11+0200] DEBUG - prefect.FlowRunner | Using executor type LocalExecutor
    [2021-06-16 16:20:11+0200] DEBUG - prefect.FlowRunner | Flow 'hello_flow': Handling state change from Scheduled to Running
    [2021-06-16 16:20:11+0200] INFO - prefect.TaskRunner | Task 'say_hello': Starting task run...
    [2021-06-16 16:20:11+0200] DEBUG - prefect.TaskRunner | Task 'say_hello': Handling state change from Pending to Running
    [2021-06-16 16:20:11+0200] DEBUG - prefect.TaskRunner | Task 'say_hello': Calling task.run() method...
    Hello John
    [2021-06-16 16:20:11+0200] DEBUG - prefect.TaskRunner | Task 'say_hello': Handling state change from Running to Success
    [2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'say_hello': Finished task run for task with final state: 'Success'
    [2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror': Starting task run...
    [2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror': Handling state change from Pending to Mapped
    [2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror': Finished task run for task with final state: 'Mapped'
    [2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror[0]': Starting task run...
    [2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[0]': Handling state change from Pending to Running
    [2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[0]': Calling task.run() method...
    1
    [2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[0]': Handling state change from Running to Success
    [2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror[0]': Finished task run for task with final state: 'Success'
    [2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror[1]': Starting task run...
    [2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[1]': Handling state change from Pending to Running
    [2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[1]': Calling task.run() method...
    [2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[1]': Handling state change from Running to Failed
    [2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror[1]': Finished task run for task with final state: 'Failed'
    [2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror[2]': Starting task run...
    [2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[2]': Handling state change from Pending to Running
    [2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[2]': Calling task.run() method...
    3
    [2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[2]': Handling state change from Running to Success
    [2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror[2]': Finished task run for task with final state: 'Success'
    [2021-06-16 16:20:12+0200] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
    [2021-06-16 16:20:12+0200] DEBUG - prefect.FlowRunner | Flow 'hello_flow': Handling state change from Running to Failed
    Kevin Kho

    Kevin Kho

    1 year ago
    Will give this a shot
    Yeah I think there is not way to cancel like this when the task is mapped, because map was designed for parallel execution and it’s hard to cancel other parallel threads.
    t

    Timo

    1 year ago
    Thanks for the response. This totally makes sense. What i really need in this case is some kind of looping (https://docs.prefect.io/core/advanced_tutorials/task-looping.html) . But currently I have no idea how to loop over a bunch of tasks or the entire flow.
    I found this way in another community thread. It works with a static input. But it doesn't work with a depending task which delivers the input (use
    ls = get_list()
    instead of
    ls = [1,2,3]
    ). I receive a
    Task is not iterable
    error if I use the output of the get_list() task
    from prefect import task, Flow
    from prefect.engine.signals import FAIL
    
    import prefect
    
    LOGGER = prefect.context.get("logger")
    
    
    @task
    def say_hello(name):
        <http://LOGGER.info|LOGGER.info>(f"Hello {name}")
        return name
    
    
    @task
    def forerror(para):
        if para == 2:
            raise FAIL("it's 2")
        else:
            print(para)
            return para + 1
    
    
    @task
    def get_list():
        return [1, 2, 3]
    
    
    with Flow("hello_flow") as flow:
        # ls = get_list()
        ls = [1, 2, 3]
        h_tasks = [say_hello("John") for x in ls]
        e_tasks = [forerror(para=x) for x in ls]
    
        for i in range(0, len(ls)):
            e_tasks[i].set_upstream(h_tasks[i])
            if i > 0:
                e_tasks[i].set_upstream(e_tasks[i - 1])
    
    if __name__ == "__main__":
        flow.run()
    Kevin Kho

    Kevin Kho

    1 year ago
    I think we can get this to work. I’ll work on this in a bit
    This might be the easiest for your use case:
    from prefect import task, Flow
    from prefect.engine.signals import FAIL
    import prefect
    LOGGER = prefect.context.get("logger")
    @task
    def say_hello(name):
        <http://LOGGER.info|LOGGER.info>(f"Hello {name}")
        return name
    @task
    def forerror(para):
        if para == 2:
            raise FAIL("it's 2")
        else:
            print(para)
            return para + 1        
    @task
    def get_list():
        return [1, 2, 3]
    
    @task
    def helper(ls):
        for x in ls:
            say_hello.run("John")
            y = forerror.run(para=x)   
        return y
    
    with Flow("hello_flow") as flow:
        ls = get_list()
        helper(ls)
    
    flow.run()
    I can help you dive into Task Looping if you want. I’m aware this doesn’t give you monitoring for each of 1, 2, and 3. We can shape this into Task Looping if you want.
    t

    Timo

    1 year ago
    Thank you very much. This is working as expected. Don't know that I could run tasks within a task by simply using
    run()
    (which totally makes sense because it's all python)... Downside of this approach is, that I can't monitor each "sub" task as you said. Therefore implementing Task Looping would be great. Could I reuse existing "offical" Prefect tasks within a Task Looping constuct? The example at the docs shows only custom tasks (with the @task decorator) (raising the LOOP signal). A another question I have: Could I use the map function with each or one of the constructs? E.g. I have a list of files which is splitted by days (
    [[file1-day1.zip,file2-day1.zip,....,fileN-day1.zip], [file1-day2.zip,file2-day2.zip,....,fileN-day2.zip]]
    . Now I like to iterate over the sequence but within the sequence I like to use map to extract all zip files with the Unzip-task. Currently I got
    ValueError: Could not infer an active Flow context.
    . As I discovered I could use
    StartFlowRun
    to start another flow which implements the mapping routine. But this could be not tested locally as
    StartFlowRun
    only works with cloud or server.