Erik Schomburg
12/07/2021, 6:31 PMtask.map
functionality doesn’t quite work as I had expected. I’m aggregating a bunch of data sources and rows into a DataFrame, and previously this had been sped up by running it over subsets of keys, with each subset running in a different process. So in prefect, I have a task that splits the keys into subsets, and then maps a task over those subsets. The problem is that there’s a small probability of failure on each subset, due to connection timeouts. I have added some retry logic, but still want to make sure that successful sub-tasks have their results checkpointed, and unsuccessful ones are not. But the results = task.map(subset=subsets)
code instead just stores the results
in a single file, and then does not re-run unsuccessful sub-tasks. I tried adding {map_index}
to the task target filename pattern, {
brackets, i.e., {{map_index}}
🤦).
Here’s the basic flow:
all_keys = get_keys_task()
key_subsets = partition_keys_task(all_keys, n_subsets)
data_subsets = get_data_task.map(keys=key_subsets)
all_data = concatenate_subsets_task(data_subsets=data_subsets)
I know I can work around this by writing my own utility to create a list of tasks with their own unique task names, but it seems like part of the point of .map
ought to be to do this sort of results management for you... Any tips? Maybe there’s just a parameter in the prefect.task
decorator or the task.map
function I don’t know about?Kevin Kho
Erik Schomburg
12/07/2021, 7:52 PMtask.map
? For example, when I add target and location parameters to the get_data_task
task:
@prefect.task(target="{task_name}_{map_index}", checkpoint=True, result=LocalResult(location="{task_name}_{map_index}")
def get_data_task(keys):
...
the results across the mapped sub-tasks are stored in a single file called get_data_task_{map_index}.pkl
, rather than one file per mapped subset called get_data_task_0.pkl
, etc.Kevin Kho
Erik Schomburg
12/07/2021, 7:57 PMErik Schomburg
12/07/2021, 7:58 PM{
brackets in my template and it didn’t recognize to use the map_index
variable properlyKevin Kho
from prefect import task, Flow
import prefect
from prefect.engine.results import LocalResult
@task(checkpoint=True, result = LocalResult(dir="/Users/kevinkho/Work/scratch/"), target="test-{map_index}.pkl")
def get_values(x):
return x+1
with Flow("dynamic_tasks") as flow:
get_values.map([1,2,3,4])
flow.run()
as long as the env variable PREFECT__FLOWS__CHECKPOINTING=true
Erik Schomburg
12/07/2021, 8:09 PM{...}
because of a bug in a special pattern constructor toolErik Schomburg
12/07/2021, 8:09 PMKevin Kho