Hey I have been having some trouble with Results i...
# prefect-community
j
Hey I have been having some trouble with Results in Prefect 0.12.3 and would love input as to where I may be going wrong. For example I can't even get this working:
Copy code
from distributed import LocalCluster
from prefect import Flow, task
from prefect.engine.executors import DaskExecutor
from prefect.engine.results import LocalResult

@task(result=LocalResult(dir="single-results/"), target="hello.bytes")
def single_task():
    return list(range(100))


def pipeline(debug: bool = False):
    with Flow("example-local-results") as flow:
        items = single_task()

    cluster = LocalCluster()
    state = flow.run(executor=DaskExecutor(cluster.scheduler_address))
I have also tried adding the
checkpoint=True
but same deal. Nothing shows up.
c
Hey Jackson - if you’re running this locally you additionally need to set the environment variable
Copy code
PREFECT__FLOWS__CHECKPOINTING=true
to “turn on” the feature (it is enabled by default against a backend but toggleable for local runs for testing purposes)
upvote 1
j
Ahhhhh! I don't see that anywhere on the docs for this: https://docs.prefect.io/core/concepts/results.html#persisting-user-created-results I also think it may be a bit out of date because I think
checkpointing=True
as a parameter turned into
checkpoint=True
I will make a PR to update those I get all this working
upvote 1
Now that it's working related question: One of the reasons I was interested in this was to potentially push a dataset to s3 with a nice manifest (single CSV / parquet or similar) as a result of a
map
operation. I now see that stored results of
map
are all individual. Is the go-to recommendation in this situation to basically have a
map
->
gather_and_form_dataset
? Or is there some secret kwarg to let me store all results of a
map
together as one?
c
That’s a good question; I hadn’t considered that before but my first hunch is that this would work:
Copy code
@task(checkpoint=False)
def mapped_task(x):
    ...

@task(checkpoint=True, result=MyResult)
def gather_and_form_dataset(results):
    ...
j
Yep! I expect I will be making a custom result handler as well to do this all nicely but hoping it will be real neat
Cool thanks for all the help
c
for sure!