d

    Daniel Burkhardt

    1 year ago
    Hi, I'm interested in better understanding how to use persistent caching to help debug failed flows comprising mapped tasks. Because it doesn't seem there's anyway to cache input to disk, I want to know what's the preferred way to identify the input to a failed task. I'm currently using
    {map_index}
    in the Result location, and it seems like the output of
    first_map_fn[0]
    always get' passed to
    second_map_fn[0]
    even if
    first_map_fn[1]
    finishes first. Is this guaranteed? Is there a better way to identify which Result is used as input to a specific task run? Please find example flow in thread.
    Example Flow:
    from prefect import Flow, task
    from prefect.engine.results import LocalResult
    from prefect.executors import LocalDaskExecutor
    import time
    
    '''
    For this script to work, you must first enable checkpointing. The easiest way to do this is
    $ export PREFECT__FLOWS__CHECKPOINTING=true
    
    '''
    
    numbers = [1, 2, 3]
    
    
    local_result = LocalResult(
        dir='~/.prefect/cache_test/',
        location="{task_name}.{task_run_id}.{map_index}.pickle"
    )
    
    
    @task(checkpoint=True, result=local_result)
    def map_add(x):
        if x == 1:
            # Add delay so map_add[0] is the last to finish
            time.sleep(10)
        return x + 1
    
    
    @task(checkpoint=True, result=local_result)
    def map_power(x):
        assert x ** 2 != 4
        return x ** 2
    
    
    @task(checkpoint=True, result=local_result)
    def reduce_sum(x):
        return sum(x)
    
    
    with Flow('Map Reduce') as flow:
        add_result = map_add.map(numbers)
        pow_result = map_power.map(add_result)
        reduced_result = reduce_sum(pow_result)
        print(reduced_result)
    
    flow.visualize()
    
    state = flow.run(executor=LocalDaskExecutor())
    message has been deleted
    When I run this flow, I get the following output:
    [2021-07-20 15:39:07+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'Map Reduce'
    [2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_add': Starting task run...
    [2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_add': Finished task run for task with final state: 'Mapped'
    [2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_power': Starting task run...
    [2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_power': Finished task run for task with final state: 'Mapped'
    [2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_add[1]': Starting task run...
    [2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_add[0]': Starting task run...
    [2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_add[2]': Starting task run...
    [2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_add[1]': Finished task run for task with final state: 'Success'
    [2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_add[2]': Finished task run for task with final state: 'Success'
    [2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_power[1]': Starting task run...
    [2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_power[2]': Starting task run...
    [2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_power[1]': Finished task run for task with final state: 'Success'
    [2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_power[2]': Finished task run for task with final state: 'Success'
    [2021-07-20 15:39:17+0000] INFO - prefect.TaskRunner | Task 'map_add[0]': Finished task run for task with final state: 'Success'
    [2021-07-20 15:39:17+0000] INFO - prefect.TaskRunner | Task 'map_power[0]': Starting task run...
    [2021-07-20 15:39:17+0000] ERROR - prefect.TaskRunner | Task 'map_power[0]': Exception encountered during task execution!
    Traceback (most recent call last):
      File "/srv/conda/envs/saturn/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 861, in get_task_run_state
        value = prefect.utilities.executors.run_task_with_timeout(
      File "/srv/conda/envs/saturn/lib/python3.8/site-packages/prefect/utilities/executors.py", line 327, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File "caching.py", line 31, in map_power
        assert x ** 2 != 4
    AssertionError
    [2021-07-20 15:39:17+0000] INFO - prefect.TaskRunner | Task 'map_power[0]': Finished task run for task with final state: 'Failed'
    [2021-07-20 15:39:17+0000] INFO - prefect.TaskRunner | Task 'reduce_sum': Starting task run...
    [2021-07-20 15:39:17+0000] INFO - prefect.TaskRunner | Task 'reduce_sum': Finished task run for task with final state: 'TriggerFailed'
    [2021-07-20 15:39:17+0000] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey @Daniel Burkhardt, first, yes it is guaranteed that the output of
    first_map_fn[0]
    will go to
    second_map_fn[0]
    . Could you tell me more about these inputs? Are they small or are they DataFrames?
    d

    Daniel Burkhardt

    1 year ago
    The way the script is set up, one
    map_power
    task will fail. The flow of information that leads to this is:
    numbers[0] -> 1 -> map_add -> 2 -> map_power -> AssertionError
    . I want to trace this using the logs. From experimentation, it seems that it's guaranteed that
    map_add.map(numbers)
    will spawn tasks:
    map_add[0], map_add[1], ..., map_add[len(numbers)]
    and map_add[0] will always get numbers[0] as input. From what you're saying, this is guaranteed. Is that documented somewhere?
    The input for our actual flows are large Python objects containing collections of arrays
    Kevin Kho

    Kevin Kho

    1 year ago
    It is mentioned here. Specifically,
    However, if a mapped task relies on another mapped task, Prefect does not reduce the upstream result. Instead, it connects the nth upstream child to the nth downstream child, creating independent parallel pipelines.
    d

    Daniel Burkhardt

    1 year ago
    This seems unreliable to me. Is there any other way to know what was the input of the failed task was when you're using checkpointing like this? Ideally it would say
    map_fn[0] failed on input /path/to/checkpointed_result.pickle
    or similar when it prints the stack trace
    Kevin Kho

    Kevin Kho

    1 year ago
    Yep I know what you are saying. Let me think of a way to get this logged.
    I am still figuring out how to get the Result of an upstream task in the state handler (not 100% sure it’s possible without querying the database through the client). The closest thing I have now is to use the result manually and pass in the location to show the mapping relationship:
    from prefect import task, Flow
    import prefect
    from prefect.engine.results import LocalResult
    from prefect.engine.signals import FAIL
    
    @task(checkpoint=True)
    def abc(x):
        map_index = prefect.context.get("map_index")
        result=LocalResult(dir="./output/",location=f"{map_index}.txt")
        result.write(x)
        return result.location
    
    @task
    def subtract(loc):
        logger = prefect.context.get("logger")
        result=LocalResult(dir="./output/")
        x = result.read(location=loc).value
        if x == 2:
            raise FAIL(f"The {loc} had a value of {x}")
    
        <http://logger.info|logger.info>(loc)
        <http://logger.info|logger.info>(f"Got the value {x}")
        return x
    
    with Flow("map-test") as flow:
        a = abc.map([0,1,2,3,4])
        b = subtract.map(a)
    
    flow.run()
    Still working on this.
    d

    Daniel Burkhardt

    1 year ago
    Thank you, this is informative. I wonder if there's room a feature here. I don't know how the input caching works, but it seems like it's should have information about where the input is if you're using output caching?
    Kevin Kho

    Kevin Kho

    1 year ago
    Maybe I misunderstood earlier. I thought we were trying to trace where an upstream task was saved so that a downstream task could find it in future flow runs. The
    target
    is just checking if a file exists, and then the task itself would just load it from that location (but this is on the task level without upstream dependencies). If you want to see that in action, you can turn on the DEBUG level logs and it will say where it’s loading data from. So I think what you might have been asking is how the map-index would work with targets? I think the map-index might not be good since the guarantees are not strong that the order is the same and you may load a different file (I think this is what you were alluding to). In this case I would use something else like using a task input if you have any smaller ones available to create unique files names.
    Syncing tomorrow morning with an engineer on this and I’ll get back to you.
    Chatted with the team. The issue here is that the location of an upstream task is tracked by orchestration is decoupled from the code logic, so getting the upstream result location is not something the downstream task is aware of or can do. If the location is an important concept in your flow, you would need to have it as an input or return it directly from the upstream task. I guess nothing new and we have working variations of this in this thread. Sorry about that, but do let me know if there’s anything I can help with.