Dmitry Golubets
09/11/2024, 4:21 PMChris White
PREFECT_RESULTS_PERSIST_BY_DEFAULT
?Chris White
Dmitry Golubets
09/11/2024, 4:47 PMPREFECT_RESULTS_PERSIST_BY_DEFAULT
set, but I use attributes on the flow definition:Dmitry Golubets
09/11/2024, 4:47 PM@flow(
log_prints=True,
persist_result=True,
result_storage="s3-bucket/s3"
)
Chris White
Dmitry Golubets
09/11/2024, 4:49 PMfrom dataclasses import dataclass
from datetime import date
from time import sleep
from typing import List
from prefect import flow, task
from prefect_aws.s3 import S3Bucket
s3_bucket_block = S3Bucket.load("s3")
@dataclass
class ModelData:
virtual_district: str
@dataclass
class ModelResults:
virtual_district: str
@task(log_prints=True)
def build_virtual_districts():
vds = [f"D{i}" for i in range(3)]
for vd in vds:
yield vd
sleep(1)
@task(log_prints=True, persist_result=True)
def load_data(virtual_district: str) -> ModelData:
sleep(4)
return ModelData(virtual_district=virtual_district)
@task(log_prints=True)
def compute_model(data: ModelData) -> ModelResults:
print(f"Computing model for {data.virtual_district}")
sleep(2)
return ModelResults(virtual_district=data.virtual_district)
@task(log_prints=True)
def save_results(results: ModelResults):
sleep(2)
@task(log_prints=True)
def build_model_for_virtual_district(virtual_district: str):
data = load_data(virtual_district)
results = compute_model(data)
save_results(results)
@flow(
log_prints=True,
persist_result=True,
result_storage="s3-bucket/s3",
)
def build_model(run_date: date, start_date: date = date(2006, 1, 1)):
print(f"Building model..")
virtual_districts = build_virtual_districts()
for vd in virtual_districts:
build_model_for_virtual_district(vd)
return "ok"
if __name__ == "__main__":
build_model(date.fromisoformat("2224-01-01"))
Dmitry Golubets
09/11/2024, 4:49 PMDmitry Golubets
09/11/2024, 4:50 PMChris White
Chris White
Dmitry Golubets
09/11/2024, 4:51 PMs3_bucket_block = S3Bucket.load("s3")
then the flow run doesn't start, it hangsDmitry Golubets
09/11/2024, 4:52 PMChris White