Daniel Burkhardt
07/20/2021, 3:45 PM{map_index}
in the Result location, and it seems like the output of first_map_fn[0]
always get' passed to second_map_fn[0]
even if first_map_fn[1]
finishes first. Is this guaranteed? Is there a better way to identify which Result is used as input to a specific task run?
Please find example flow in thread.Daniel Burkhardt
07/20/2021, 3:45 PMfrom prefect import Flow, task
from prefect.engine.results import LocalResult
from prefect.executors import LocalDaskExecutor
import time
'''
For this script to work, you must first enable checkpointing. The easiest way to do this is
$ export PREFECT__FLOWS__CHECKPOINTING=true
'''
numbers = [1, 2, 3]
local_result = LocalResult(
dir='~/.prefect/cache_test/',
location="{task_name}.{task_run_id}.{map_index}.pickle"
)
@task(checkpoint=True, result=local_result)
def map_add(x):
if x == 1:
# Add delay so map_add[0] is the last to finish
time.sleep(10)
return x + 1
@task(checkpoint=True, result=local_result)
def map_power(x):
assert x ** 2 != 4
return x ** 2
@task(checkpoint=True, result=local_result)
def reduce_sum(x):
return sum(x)
with Flow('Map Reduce') as flow:
add_result = map_add.map(numbers)
pow_result = map_power.map(add_result)
reduced_result = reduce_sum(pow_result)
print(reduced_result)
flow.visualize()
state = flow.run(executor=LocalDaskExecutor())
Daniel Burkhardt
07/20/2021, 3:45 PMDaniel Burkhardt
07/20/2021, 3:47 PM[2021-07-20 15:39:07+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'Map Reduce'
[2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_add': Starting task run...
[2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_add': Finished task run for task with final state: 'Mapped'
[2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_power': Starting task run...
[2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_power': Finished task run for task with final state: 'Mapped'
[2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_add[1]': Starting task run...
[2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_add[0]': Starting task run...
[2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_add[2]': Starting task run...
[2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_add[1]': Finished task run for task with final state: 'Success'
[2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_add[2]': Finished task run for task with final state: 'Success'
[2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_power[1]': Starting task run...
[2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_power[2]': Starting task run...
[2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_power[1]': Finished task run for task with final state: 'Success'
[2021-07-20 15:39:07+0000] INFO - prefect.TaskRunner | Task 'map_power[2]': Finished task run for task with final state: 'Success'
[2021-07-20 15:39:17+0000] INFO - prefect.TaskRunner | Task 'map_add[0]': Finished task run for task with final state: 'Success'
[2021-07-20 15:39:17+0000] INFO - prefect.TaskRunner | Task 'map_power[0]': Starting task run...
[2021-07-20 15:39:17+0000] ERROR - prefect.TaskRunner | Task 'map_power[0]': Exception encountered during task execution!
Traceback (most recent call last):
File "/srv/conda/envs/saturn/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 861, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/srv/conda/envs/saturn/lib/python3.8/site-packages/prefect/utilities/executors.py", line 327, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "caching.py", line 31, in map_power
assert x ** 2 != 4
AssertionError
[2021-07-20 15:39:17+0000] INFO - prefect.TaskRunner | Task 'map_power[0]': Finished task run for task with final state: 'Failed'
[2021-07-20 15:39:17+0000] INFO - prefect.TaskRunner | Task 'reduce_sum': Starting task run...
[2021-07-20 15:39:17+0000] INFO - prefect.TaskRunner | Task 'reduce_sum': Finished task run for task with final state: 'TriggerFailed'
[2021-07-20 15:39:17+0000] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
Kevin Kho
first_map_fn[0]
will go to second_map_fn[0]
. Could you tell me more about these inputs? Are they small or are they DataFrames?Daniel Burkhardt
07/20/2021, 3:52 PMmap_power
task will fail. The flow of information that leads to this is: numbers[0] -> 1 -> map_add -> 2 -> map_power -> AssertionError
. I want to trace this using the logs. From experimentation, it seems that it's guaranteed that map_add.map(numbers)
will spawn tasks: map_add[0], map_add[1], ..., map_add[len(numbers)]
and map_add[0] will always get numbers[0] as input.
From what you're saying, this is guaranteed. Is that documented somewhere?Daniel Burkhardt
07/20/2021, 3:53 PMKevin Kho
However, if a mapped task relies on another mapped task, Prefect does not reduce the upstream result. Instead, it connects the nth upstream child to the nth downstream child, creating independent parallel pipelines.
Daniel Burkhardt
07/20/2021, 3:57 PMmap_fn[0] failed on input /path/to/checkpointed_result.pickle
or similar when it prints the stack traceKevin Kho
Kevin Kho
from prefect import task, Flow
import prefect
from prefect.engine.results import LocalResult
from prefect.engine.signals import FAIL
@task(checkpoint=True)
def abc(x):
map_index = prefect.context.get("map_index")
result=LocalResult(dir="./output/",location=f"{map_index}.txt")
result.write(x)
return result.location
@task
def subtract(loc):
logger = prefect.context.get("logger")
result=LocalResult(dir="./output/")
x = result.read(location=loc).value
if x == 2:
raise FAIL(f"The {loc} had a value of {x}")
<http://logger.info|logger.info>(loc)
<http://logger.info|logger.info>(f"Got the value {x}")
return x
with Flow("map-test") as flow:
a = abc.map([0,1,2,3,4])
b = subtract.map(a)
flow.run()
Kevin Kho
Daniel Burkhardt
07/20/2021, 5:29 PMKevin Kho
target
is just checking if a file exists, and then the task itself would just load it from that location (but this is on the task level without upstream dependencies). If you want to see that in action, you can turn on the DEBUG level logs and it will say where it’s loading data from.
So I think what you might have been asking is how the map-index would work with targets? I think the map-index might not be good since the guarantees are not strong that the order is the same and you may load a different file (I think this is what you were alluding to). In this case I would use something else like using a task input if you have any smaller ones available to create unique files names.Kevin Kho
Kevin Kho