Hello! I’m trying to create a convert an existing ...
# ask-community
e
Hello! I’m trying to create a convert an existing script into a prefect flow, and the
task.map
functionality doesn’t quite work as I had expected. I’m aggregating a bunch of data sources and rows into a DataFrame, and previously this had been sped up by running it over subsets of keys, with each subset running in a different process. So in prefect, I have a task that splits the keys into subsets, and then maps a task over those subsets. The problem is that there’s a small probability of failure on each subset, due to connection timeouts. I have added some retry logic, but still want to make sure that successful sub-tasks have their results checkpointed, and unsuccessful ones are not. But the
results = task.map(subset=subsets)
code instead just stores the
results
in a single file, and then does not re-run unsuccessful sub-tasks. I tried adding
{map_index}
to the task target filename pattern, but this did not work (_update_: it does work, I just had extra
{
brackets, i.e.,
{{map_index}}
🤦). Here’s the basic flow:
Copy code
all_keys = get_keys_task()
key_subsets = partition_keys_task(all_keys, n_subsets)
data_subsets = get_data_task.map(keys=key_subsets)
all_data = concatenate_subsets_task(data_subsets=data_subsets)
I know I can work around this by writing my own utility to create a list of tasks with their own unique task names, but it seems like part of the point of
.map
ought to be to do this sort of results management for you... Any tips? Maybe there’s just a parameter in the
prefect.task
decorator or the
task.map
function I don’t know about?
k
Hi @Erik Schomburg, have you seen the docs on templating result names? This will separate out the files for you.
e
Yes, though maybe there’s something additional I need to do when using
task.map
? For example, when I add target and location parameters to the
get_data_task
task:
Copy code
@prefect.task(target="{task_name}_{map_index}", checkpoint=True, result=LocalResult(location="{task_name}_{map_index}")
def get_data_task(keys):
    ...
the results across the mapped sub-tasks are stored in a single file called
get_data_task_{map_index}.pkl
, rather than one file per mapped subset called
get_data_task_0.pkl
, etc.
k
One sec let me try this
e
actually, maybe I need a sec…
might’ve had a mistake in the
{
brackets in my template and it didn’t recognize to use the
map_index
variable properly
k
This works on local for me:
Copy code
from prefect import task, Flow
import prefect
from prefect.engine.results import LocalResult

@task(checkpoint=True, result = LocalResult(dir="/Users/kevinkho/Work/scratch/"), target="test-{map_index}.pkl")
def get_values(x):
    return x+1

with Flow("dynamic_tasks") as flow:
    get_values.map([1,2,3,4])
 
flow.run()
as long as the env variable
PREFECT__FLOWS__CHECKPOINTING=true
e
right, yeah, like I said, problem was that in my construction of the target template I accidentally had extra
{...}
because of a bug in a special pattern constructor tool
sorry, thanks for trying this out, my bad
k
No worries at all. Just wanted to make sure my advice was right a well
👍 1