Cody Webb
08/15/2023, 12:29 AMNate
08/15/2023, 12:41 AMrun_deployment(... , timeout=0)
then it will return immediately instead of waiting for it to finish, but if you need the result of the flow run, you'd need to fetch that manually laterCody Webb
08/15/2023, 12:44 AMNate
08/15/2023, 12:46 AMCody Webb
08/15/2023, 12:58 AMNate
08/15/2023, 4:55 AMslug-name/block-name
e.g. gcs/my-result-storage-bucket
from prefect import flow, task
from prefect.blocks.core import Block
from prefect.variables import get
from prefect_dask import DaskTaskRunner
storage = Block.load(get("default_result_storage"))
@task(result_storage_key="{foo}.pkl")
def task_that_writes_inputs_to_a_filesystem(foo: str) -> str:
# just return the input for demonstration purposes
return foo
@flow(
task_runner=DaskTaskRunner(),
result_storage=storage,
)
def subflow_that_writes_to_a_filesystem(some_params: list):
return task_that_writes_inputs_to_a_filesystem.map(foo=some_params)
and prove its working by retrieving results
if __name__ == "__main__":
from prefect.results import PersistedResult
from prefect.settings import PREFECT_RESULTS_PERSIST_BY_DEFAULT, temporary_settings
with temporary_settings({PREFECT_RESULTS_PERSIST_BY_DEFAULT: True}):
inputs = ["a", "b", "c"]
subflow_that_writes_to_a_filesystem(inputs)
persisted_results = [
PersistedResult(
storage_key=f"{foo}.pkl",
storage_block_id=storage._block_document_id,
serializer_type="pickle",
).get()
for foo in inputs
]
assert persisted_results == inputs
in prod you could set PREFECT_RESULTS_PERSIST_BY_DEFAULT
as an env varCody Webb
08/16/2023, 1:02 AMNate
08/16/2023, 1:04 AMCody Webb
08/16/2023, 1:06 AMNate
08/16/2023, 1:16 AMCody Webb
08/16/2023, 1:37 AMNate
08/16/2023, 5:35 PM