Horatiu Bota
12/06/2021, 6:34 PMs3result.write(overwrite=True)
)?Kevin Kho
Horatiu Bota
12/06/2021, 6:40 PMHoratiu Bota
12/06/2021, 6:40 PMKevin Kho
Horatiu Bota
12/06/2021, 6:41 PMHoratiu Bota
12/06/2021, 6:51 PMdef long_running_task(df, params):
# do some work with df and params
with Flow("test_flow", result=S3Result(bucket="bucket_name")) as flow:
task = task(
target='./cache/result.csv',
checkpoint=True,
max_retries=1,
retry_delay=pd.Timedelta(seconds=3),
)
s3_result = task(long_running_task)(df=df, params=params)
# is this possible?
s3_result.write(overwrite=True)
Horatiu Bota
12/06/2021, 6:53 PMKevin Kho
target
to a file. targets
are a form of caching in Prefect where if the file already exists, the task won’t run. What you want to do is instead use
task(…,result=S3Result(..,location=…))
the location will achieve the same thing but it not a caching mechanism so the task will still re-runKevin Kho
target
, the result will just be loaded from there instead of running the taskHoratiu Bota
12/06/2021, 6:56 PMKevin Kho
Horatiu Bota
12/06/2021, 6:59 PMKevin Kho
def long_running_task(df, params):
# do some work with df and params
return modified_df. # instead of the S3Result
Kevin Kho
Horatiu Bota
12/06/2021, 7:01 PMKevin Kho
Horatiu Bota
12/06/2021, 7:02 PMs3_result
above) in the with
block, it's an S3ResultKevin Kho
with Flow(...) as flow:
df = first_task(df)
df = second_task(df)
Prefect handles resolving those results and passing them to the next task even if the type might be S3Result
Horatiu Bota
12/06/2021, 7:05 PMHoratiu Bota
12/06/2021, 7:05 PMKevin Kho