Thread
#prefect-community
    j

    Jacob Hayes

    1 year ago
    How can mapped tasks be checkpointed/cached? A default Flow level result with
    location="{flow_run_id}/{task_run_id}.prefect_result"
    works for the unmapped tasks, but I don't see saved results for the mapped tasks (even though each one has a unique
    task_run_id
    ).
    Kevin Kho

    Kevin Kho

    1 year ago
    Hi @Jacob Hayes! Is the goal here to persist the result of the task somewhere? Or are you looking in the context of a retry?
    j

    Jacob Hayes

    1 year ago
    Preventing a retry (task does an insert, so retry adds duplicates - will eventually refactor for idempotency, but can't quite yet)
    Kevin Kho

    Kevin Kho

    1 year ago
    Can I see how you configured the results for the mapped task?
    j

    Jacob Hayes

    1 year ago
    sure! it's more or less:
    from functools import partial
    
    from prefect import Flow, Task, task, unmapped
    from prefect.engine.results import GCSResult
    
    PREFECT_RESULTS_BUCKET = "..."
    
    RetryFlow = partial(
        Flow,
        result=GCSResult(
            bucket=PREFECT_RESULTS_BUCKET,
            location="{flow_run_id}/{task_full_name}_{task_run_id}.prefect_result",
        ),
    )
    
    
    @task
    def mytask(
        files, table, schema, buffer_table, db,
    ):
        ...
    
    
    with RetryFlow(name=...) as flow:
        ...
        out = mytask.map(
            files=list_files(path),  # Returns list of files
            table=unmapped(base_table),
            schema=unmapped(export_schema),
            buffer_table=unmapped(base_buffer_table),
            db=unmapped(db),
        )
        ...
    I don't set a result on the mapped task directly, just have a default
    Flow.result
    Kevin Kho

    Kevin Kho

    1 year ago
    Have you tried something like this?
    j

    Jacob Hayes

    1 year ago
    Ah, thanks for the link, didn't see anything about mapped tasks on caching page. I am using
    task_run_id
    in the default Result, but I'll try setting it explicitly on the mapped task