Hi, I'm interested in better understanding how to ...
# ask-community
d
Hi, I'm interested in better understanding how to use persistent caching to help debug failed flows comprising mapped tasks. Because it doesn't seem there's anyway to cache input to disk, I want to know what's the preferred way to identify the input to a failed task. I'm currently using
{map_index}
in the Result location, and it seems like the output of
first_map_fn[0]
always get' passed to
second_map_fn[0]
even if
first_map_fn[1]
finishes first. Is this guaranteed? Is there a better way to identify which Result is used as input to a specific task run? Please find example flow in thread.
Example Flow:
Copy code
from prefect import Flow, task
from prefect.engine.results import LocalResult
from prefect.executors import LocalDaskExecutor
import time

'''
For this script to work, you must first enable checkpointing. The easiest way to do this is
$ export PREFECT__FLOWS__CHECKPOINTING=true

'''

numbers = [1, 2, 3]


local_result = LocalResult(
    dir='~/.prefect/cache_test/',
    location="{task_name}.{task_run_id}.{map_index}.pickle"
)


@task(checkpoint=True, result=local_result)
def map_add(x):
    if x == 1:
        # Add delay so map_add[0] is the last to finish
        time.sleep(10)
    return x + 1


@task(checkpoint=True, result=local_result)
def map_power(x):
    assert x ** 2 != 4
    return x ** 2


@task(checkpoint=True, result=local_result)
def reduce_sum(x):
    return sum(x)


with Flow('Map Reduce') as flow:
    add_result = map_add.map(numbers)
    pow_result = map_power.map(add_result)
    reduced_result = reduce_sum(pow_result)
    print(reduced_result)

flow.visualize()

state = flow.run(executor=LocalDaskExecutor())
message has been deleted
When I run this flow, I get the following output:
Copy code
[2021-07-20 15:39:07+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'Map Reduce'
[2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_add': Starting task run...
[2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_add': Finished task run for task with final state: 'Mapped'
[2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_power': Starting task run...
[2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_power': Finished task run for task with final state: 'Mapped'
[2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_add[1]': Starting task run...
[2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_add[0]': Starting task run...
[2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_add[2]': Starting task run...
[2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_add[1]': Finished task run for task with final state: 'Success'
[2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_add[2]': Finished task run for task with final state: 'Success'
[2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_power[1]': Starting task run...
[2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_power[2]': Starting task run...
[2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_power[1]': Finished task run for task with final state: 'Success'
[2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_power[2]': Finished task run for task with final state: 'Success'
[2021-07-20 15:39:17+0000] INFO - prefect.TaskRunner | Task 'map_add[0]': Finished task run for task with final state: 'Success'
[2021-07-20 15:39:17+0000] INFO - prefect.TaskRunner | Task 'map_power[0]': Starting task run...
[2021-07-20 15:39:17+0000] ERROR - prefect.TaskRunner | Task 'map_power[0]': Exception encountered during task execution!
Traceback (most recent call last):
  File "/srv/conda/envs/saturn/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 861, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/srv/conda/envs/saturn/lib/python3.8/site-packages/prefect/utilities/executors.py", line 327, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "caching.py", line 31, in map_power
    assert x ** 2 != 4
AssertionError
[2021-07-20 15:39:17+0000] INFO - prefect.TaskRunner | Task 'map_power[0]': Finished task run for task with final state: 'Failed'
[2021-07-20 15:39:17+0000] INFO - prefect.TaskRunner | Task 'reduce_sum': Starting task run...
[2021-07-20 15:39:17+0000] INFO - prefect.TaskRunner | Task 'reduce_sum': Finished task run for task with final state: 'TriggerFailed'
[2021-07-20 15:39:17+0000] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
k
Hey @Daniel Burkhardt, first, yes it is guaranteed that the output of
first_map_fn[0]
will go to
second_map_fn[0]
. Could you tell me more about these inputs? Are they small or are they DataFrames?
d
The way the script is set up, one
map_power
task will fail. The flow of information that leads to this is:
numbers[0] -> 1 -> map_add -> 2 -> map_power -> AssertionError
. I want to trace this using the logs. From experimentation, it seems that it's guaranteed that
map_add.map(numbers)
will spawn tasks:
map_add[0], map_add[1], ..., map_add[len(numbers)]
and map_add[0] will always get numbers[0] as input. From what you're saying, this is guaranteed. Is that documented somewhere?
The input for our actual flows are large Python objects containing collections of arrays
k
It is mentioned here. Specifically,
Copy code
However, if a mapped task relies on another mapped task, Prefect does not reduce the upstream result. Instead, it connects the nth upstream child to the nth downstream child, creating independent parallel pipelines.
d
This seems unreliable to me. Is there any other way to know what was the input of the failed task was when you're using checkpointing like this? Ideally it would say
map_fn[0] failed on input /path/to/checkpointed_result.pickle
or similar when it prints the stack trace
k
Yep I know what you are saying. Let me think of a way to get this logged.
🙌 1
I am still figuring out how to get the Result of an upstream task in the state handler (not 100% sure it’s possible without querying the database through the client). The closest thing I have now is to use the result manually and pass in the location to show the mapping relationship:
Copy code
from prefect import task, Flow
import prefect
from prefect.engine.results import LocalResult
from prefect.engine.signals import FAIL

@task(checkpoint=True)
def abc(x):
    map_index = prefect.context.get("map_index")
    result=LocalResult(dir="./output/",location=f"{map_index}.txt")
    result.write(x)
    return result.location

@task
def subtract(loc):
    logger = prefect.context.get("logger")
    result=LocalResult(dir="./output/")
    x = result.read(location=loc).value
    if x == 2:
        raise FAIL(f"The {loc} had a value of {x}")

    <http://logger.info|logger.info>(loc)
    <http://logger.info|logger.info>(f"Got the value {x}")
    return x

with Flow("map-test") as flow:
    a = abc.map([0,1,2,3,4])
    b = subtract.map(a)

flow.run()
🙌 1
Still working on this.
d
Thank you, this is informative. I wonder if there's room a feature here. I don't know how the input caching works, but it seems like it's should have information about where the input is if you're using output caching?
k
Maybe I misunderstood earlier. I thought we were trying to trace where an upstream task was saved so that a downstream task could find it in future flow runs. The
target
is just checking if a file exists, and then the task itself would just load it from that location (but this is on the task level without upstream dependencies). If you want to see that in action, you can turn on the DEBUG level logs and it will say where it’s loading data from. So I think what you might have been asking is how the map-index would work with targets? I think the map-index might not be good since the guarantees are not strong that the order is the same and you may load a different file (I think this is what you were alluding to). In this case I would use something else like using a task input if you have any smaller ones available to create unique files names.
Syncing tomorrow morning with an engineer on this and I’ll get back to you.
Chatted with the team. The issue here is that the location of an upstream task is tracked by orchestration is decoupled from the code logic, so getting the upstream result location is not something the downstream task is aware of or can do. If the location is an important concept in your flow, you would need to have it as an input or return it directly from the upstream task. I guess nothing new and we have working variations of this in this thread. Sorry about that, but do let me know if there’s anything I can help with.