Jacob Hayes
04/06/2021, 7:09 PMlocation="{flow_run_id}/{task_run_id}.prefect_result"
works for the unmapped tasks, but I don't see saved results for the mapped tasks (even though each one has a unique task_run_id
).Kevin Kho
Jacob Hayes
04/06/2021, 7:21 PMKevin Kho
Jacob Hayes
04/06/2021, 8:08 PMfrom functools import partial
from prefect import Flow, Task, task, unmapped
from prefect.engine.results import GCSResult
PREFECT_RESULTS_BUCKET = "..."
RetryFlow = partial(
Flow,
result=GCSResult(
bucket=PREFECT_RESULTS_BUCKET,
location="{flow_run_id}/{task_full_name}_{task_run_id}.prefect_result",
),
)
@task
def mytask(
files, table, schema, buffer_table, db,
):
...
with RetryFlow(name=...) as flow:
...
out = mytask.map(
files=list_files(path), # Returns list of files
table=unmapped(base_table),
schema=unmapped(export_schema),
buffer_table=unmapped(base_buffer_table),
db=unmapped(db),
)
...
Jacob Hayes
04/06/2021, 8:08 PMFlow.result
Kevin Kho
Jacob Hayes
04/06/2021, 9:42 PMtask_run_id
in the default Result, but I'll try setting it explicitly on the mapped task