Jacob Hayes
location="{flow_run_id}/{task_run_id}.prefect_result"
task_run_id
Kevin Kho
from functools import partial from prefect import Flow, Task, task, unmapped from prefect.engine.results import GCSResult PREFECT_RESULTS_BUCKET = "..." RetryFlow = partial( Flow, result=GCSResult( bucket=PREFECT_RESULTS_BUCKET, location="{flow_run_id}/{task_full_name}_{task_run_id}.prefect_result", ), ) @task def mytask( files, table, schema, buffer_table, db, ): ... with RetryFlow(name=...) as flow: ... out = mytask.map( files=list_files(path), # Returns list of files table=unmapped(base_table), schema=unmapped(export_schema), buffer_table=unmapped(base_buffer_table), db=unmapped(db), ) ...
Flow.result
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.