How can mapped tasks be checkpointed/cached? A def...
# ask-community
j
How can mapped tasks be checkpointed/cached? A default Flow level result with
location="{flow_run_id}/{task_run_id}.prefect_result"
works for the unmapped tasks, but I don't see saved results for the mapped tasks (even though each one has a unique
task_run_id
).
k
Hi @Jacob Hayes! Is the goal here to persist the result of the task somewhere? Or are you looking in the context of a retry?
j
Preventing a retry (task does an insert, so retry adds duplicates - will eventually refactor for idempotency, but can't quite yet)
k
Can I see how you configured the results for the mapped task?
j
sure! it's more or less:
Copy code
from functools import partial

from prefect import Flow, Task, task, unmapped
from prefect.engine.results import GCSResult

PREFECT_RESULTS_BUCKET = "..."

RetryFlow = partial(
    Flow,
    result=GCSResult(
        bucket=PREFECT_RESULTS_BUCKET,
        location="{flow_run_id}/{task_full_name}_{task_run_id}.prefect_result",
    ),
)


@task
def mytask(
    files, table, schema, buffer_table, db,
):
    ...


with RetryFlow(name=...) as flow:
    ...
    out = mytask.map(
        files=list_files(path),  # Returns list of files
        table=unmapped(base_table),
        schema=unmapped(export_schema),
        buffer_table=unmapped(base_buffer_table),
        db=unmapped(db),
    )
    ...
I don't set a result on the mapped task directly, just have a default
Flow.result
k
Have you tried something like this?
j
Ah, thanks for the link, didn't see anything about mapped tasks on caching page. I am using
task_run_id
in the default Result, but I'll try setting it explicitly on the mapped task