Jackson Maxfield Brown
07/10/2020, 9:36 PMfrom distributed import LocalCluster
from prefect import Flow, task
from prefect.engine.executors import DaskExecutor
from prefect.engine.results import LocalResult
@task(result=LocalResult(dir="single-results/"), target="hello.bytes")
def single_task():
return list(range(100))
def pipeline(debug: bool = False):
with Flow("example-local-results") as flow:
items = single_task()
cluster = LocalCluster()
state = flow.run(executor=DaskExecutor(cluster.scheduler_address))
I have also tried adding the checkpoint=True
but same deal. Nothing shows up.Chris White
07/10/2020, 9:40 PMPREFECT__FLOWS__CHECKPOINTING=true
to “turn on” the feature (it is enabled by default against a backend but toggleable for local runs for testing purposes)Jackson Maxfield Brown
07/10/2020, 9:43 PMcheckpointing=True
as a parameter turned into checkpoint=True
map
operation. I now see that stored results of map
are all individual.
Is the go-to recommendation in this situation to basically have a map
-> gather_and_form_dataset
? Or is there some secret kwarg to let me store all results of a map
together as one?Chris White
07/10/2020, 10:01 PM@task(checkpoint=False)
def mapped_task(x):
...
@task(checkpoint=True, result=MyResult)
def gather_and_form_dataset(results):
...
Jackson Maxfield Brown
07/10/2020, 10:02 PMChris White
07/10/2020, 10:07 PM