I'm trying to understand how persistence of result...
# ask-community
d
I'm trying to understand how persistence of results works. I have configured my flow to store results in S3 and I can see them saved there, BUT only when the flow run is done. I would expect them to be saved task by task as the run makes progress.. Now if I just terminate my run before it's done - there is nothing saved. What's the point? Do I misunderstand the feature?
c
Hey Dmitry! Could you share what version of Prefect you're using and whether you have configured
PREFECT_RESULTS_PERSIST_BY_DEFAULT
?
For reference, we simplified result persistence and caching significantly in 3.0, and the documentation for that version is here: https://docs.prefect.io/3.0/develop/results Would love any feedback if anything is missing from the docs for that version
d
Hey @Chris White, I use 3.0. I don't have
PREFECT_RESULTS_PERSIST_BY_DEFAULT
set, but I use attributes on the flow definition:
Copy code
@flow(
    log_prints=True,
    persist_result=True,
    result_storage="s3-bucket/s3"
)
c
and could you show me an example configuration on your task? The results should get persisted as the tasks complete, so I'd like to understand what's going on here
d
Copy code
from dataclasses import dataclass
from datetime import date
from time import sleep
from typing import List
from prefect import flow, task

from prefect_aws.s3 import S3Bucket

s3_bucket_block = S3Bucket.load("s3")


@dataclass
class ModelData:
    virtual_district: str


@dataclass
class ModelResults:
    virtual_district: str


@task(log_prints=True)
def build_virtual_districts():
    vds = [f"D{i}" for i in range(3)]
    for vd in vds:
        yield vd
        sleep(1)


@task(log_prints=True, persist_result=True)
def load_data(virtual_district: str) -> ModelData:
    sleep(4)
    return ModelData(virtual_district=virtual_district)


@task(log_prints=True)
def compute_model(data: ModelData) -> ModelResults:
    print(f"Computing model for {data.virtual_district}")
    sleep(2)
    return ModelResults(virtual_district=data.virtual_district)


@task(log_prints=True)
def save_results(results: ModelResults):
    sleep(2)


@task(log_prints=True)
def build_model_for_virtual_district(virtual_district: str):
    data = load_data(virtual_district)
    results = compute_model(data)
    save_results(results)


@flow(
    log_prints=True,
    persist_result=True,
    result_storage="s3-bucket/s3",
)
def build_model(run_date: date, start_date: date = date(2006, 1, 1)):
    print(f"Building model..")
    virtual_districts = build_virtual_districts()
    for vd in virtual_districts:
        build_model_for_virtual_district(vd)
    return "ok"


if __name__ == "__main__":
    build_model(date.fromisoformat("2224-01-01"))
this is my toy example
the docs say that tasks should inherit persistence settings, but I tried the explicit setting one one of the tasks too
c
yea that's right, they should inherit the settings
is there any chance you would mind converting this to a GitHub issue, maybe with a more minimal reproducible example that I could run on my machine?
d
another weird problem I have is if I omit
Copy code
s3_bucket_block = S3Bucket.load("s3")
then the flow run doesn't start, it hangs
‼️ 1
sure, I can create an issue on github
c
thank you, i'll keep a look out for it and we'll get to the bottom of this