Thread
#prefect-community
    Erik Schomburg

    Erik Schomburg

    9 months ago
    Hello! I’m trying to create a convert an existing script into a prefect flow, and the
    task.map
    functionality doesn’t quite work as I had expected. I’m aggregating a bunch of data sources and rows into a DataFrame, and previously this had been sped up by running it over subsets of keys, with each subset running in a different process. So in prefect, I have a task that splits the keys into subsets, and then maps a task over those subsets. The problem is that there’s a small probability of failure on each subset, due to connection timeouts. I have added some retry logic, but still want to make sure that successful sub-tasks have their results checkpointed, and unsuccessful ones are not. But the
    results = task.map(subset=subsets)
    code instead just stores the
    results
    in a single file, and then does not re-run unsuccessful sub-tasks. I tried adding
    {map_index}
    to the task target filename pattern, but this did not work (update: it does work, I just had extra
    {
    brackets, i.e.,
    {{map_index}}
    🤦). Here’s the basic flow:
    all_keys = get_keys_task()
    key_subsets = partition_keys_task(all_keys, n_subsets)
    data_subsets = get_data_task.map(keys=key_subsets)
    all_data = concatenate_subsets_task(data_subsets=data_subsets)
    I know I can work around this by writing my own utility to create a list of tasks with their own unique task names, but it seems like part of the point of
    .map
    ought to be to do this sort of results management for you... Any tips? Maybe there’s just a parameter in the
    prefect.task
    decorator or the
    task.map
    function I don’t know about?
    Kevin Kho

    Kevin Kho

    9 months ago
    Hi @Erik Schomburg, have you seen the docs on templating result names? This will separate out the files for you.
    Erik Schomburg

    Erik Schomburg

    9 months ago
    Yes, though maybe there’s something additional I need to do when using
    task.map
    ? For example, when I add target and location parameters to the
    get_data_task
    task:
    @prefect.task(target="{task_name}_{map_index}", checkpoint=True, result=LocalResult(location="{task_name}_{map_index}")
    def get_data_task(keys):
        ...
    the results across the mapped sub-tasks are stored in a single file called
    get_data_task_{map_index}.pkl
    , rather than one file per mapped subset called
    get_data_task_0.pkl
    , etc.
    Kevin Kho

    Kevin Kho

    9 months ago
    One sec let me try this
    Erik Schomburg

    Erik Schomburg

    9 months ago
    actually, maybe I need a sec…
    might’ve had a mistake in the
    {
    brackets in my template and it didn’t recognize to use the
    map_index
    variable properly
    Kevin Kho

    Kevin Kho

    9 months ago
    This works on local for me:
    from prefect import task, Flow
    import prefect
    from prefect.engine.results import LocalResult
    
    @task(checkpoint=True, result = LocalResult(dir="/Users/kevinkho/Work/scratch/"), target="test-{map_index}.pkl")
    def get_values(x):
        return x+1
    
    with Flow("dynamic_tasks") as flow:
        get_values.map([1,2,3,4])
     
    flow.run()
    as long as the env variable
    PREFECT__FLOWS__CHECKPOINTING=true
    Erik Schomburg

    Erik Schomburg

    9 months ago
    right, yeah, like I said, problem was that in my construction of the target template I accidentally had extra
    {...}
    because of a bug in a special pattern constructor tool
    sorry, thanks for trying this out, my bad
    Kevin Kho

    Kevin Kho

    9 months ago
    No worries at all. Just wanted to make sure my advice was right a well