https://prefect.io logo
d

Dexter Antonio

02/18/2022, 9:36 PM
Hi, I’m trying to store the Results from each Task in a Flow on S3, but I am having some trouble with it. When I set the results object to be an S3Result, nothing ends up being stored in S3. I am able to directly write files with the S3Results object, but the Results from a task are not automatically stored there. I have tried to set checkpointing to True, so I don’t think that is the issue. Here is some example code.
Copy code
MY_RESULTS = S3Result(bucket='my_bucket_without_s3_prefix',location='my_output_folder')
prefect.config.flows.checkpointing = True
!export PREFECT__FLOWS__CHECKPOINTING=true
with Flow("please work", result=MY_RESULTS) as f:
    t1 = my_task()
state = f.run()
!aws s3 ls <s3://my_bucket_name_witohut_s3_prefix/my_output_folder>  # nothing is here
Is there something obvious, which I am missing?
k

Kevin Kho

02/18/2022, 10:06 PM
It won’t be persisted for
flow.run()
unless
PREFECT__FLOWS__CHECKPOINTING=true
but exporting tis env var like this will not work because this env var is loaded in to the Prefect context when you import prefect so it has to be done before you import Prefect
🙏 1
d

Dexter Antonio

02/18/2022, 10:21 PM
Setting the environmental variable
PREFECT__FLOWS__CHECKPOINTING=true
before importing prefect did resolve the issue when using a LocalResults object. Unfortunately, nothing is getting written to my S3 bucket, although the folder specified by the S3Results object is created.
k

Kevin Kho

02/18/2022, 10:21 PM
Can you show me the Flow?
d

Dexter Antonio

02/18/2022, 10:23 PM
Copy code
MY_RESULTS = S3Result(bucket='ml.funcompany',location='dexter/output/')
with Flow("please work", result=MY_RESULTS) as f:
    t2 = sf(slide_names)
    row = get_first_row(t2)
    dapi, protein = load_scene(*get_slide_scene_protein(row))
    dapi_contour = cm(dapi)
    scaled_contour = scale_down_contour(dapi_contour)
    bm = pbm(cropper_task(protein, dapi_contour), dapi_contour)
    score_protein_binary_mask(img=bm, outer_contour=scaled_contour)
Here is an example task
Copy code
class Cropper(Task):
    def __init__(self, default: int, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.default = default
        self.checkpoint = True 

    
    def run(self, img, contour):
        return self.get_organoid_cropper(contour)(img)
    
    @staticmethod
    def get_bbox_from_coords(coords, use_original_dims=True):
        bl_corner = coords.min(axis=0)  # we will use these later
        ur_corner = coords.max(axis=0)
        if use_original_dims:
            return np.concatenate([bl_corner, ur_corner]), bl_corner

        else:
            return np.concatenate([[0, 0], ur_corner - bl_corner]), bl_corner

    @classmethod
    def get_organoid_cropper(cls, contour):
        bbox = cls.get_bbox_from_coords(contour)[0]
        bbox_slice = slice(bbox[0,1], bbox[1,1]) , slice(bbox[0,0], bbox[1,0])
        return lambda img: img[bbox_slice]
These are defined above the flow
Copy code
slide_names = ['SL010334', 'SL010340', 'SL010334']
sf = SceneFetcher(default=1)
cm = DAPIContourMaker(default=1)
cropper_task = Cropper(default=1)
pbm = ProteinBinaryMask()
k

Kevin Kho

02/18/2022, 10:31 PM
I don’t see anything wrong if this is executing successfully. One sec will try
It’s working fine for me:
Copy code
import os
from unittest import result
os.environ["PREFECT__FLOWS__CHECKPOINTING"] ="true"
from prefect import Flow, task, Task 
from prefect.engine.results import LocalResult

class Cropper(Task):
    def __init__(self, default: int, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.default = default
        self.checkpoint=True

    def something(self):
        return 1
    
    def run(self):
        return self.something

res = LocalResult("/Users/kevinkho/Work/fugue-analytics/fugue_analytics", location="4.txt")
cropper = Cropper(default=1)
with Flow("test", result=res) as flow:
    cropper()

flow.run()
maybe you can give an explicit filename for the location?
d

Dexter Antonio

02/18/2022, 10:43 PM
Thanks for trying it out. When I run it locally, it works without issue and creates a bunch of prefect-result* files in the output folder that I have specified. When I try to use aws’s S3 and use the following result object
MY_RESULTS = S3Result(bucket='company',location='dexter/folder/output4')
the “folder” output4 in
S3
is created but it contains no files after running the flow.
Do you have an example of explicitly using the S3Result object that stores multiple files at that output location?
k

Kevin Kho

02/18/2022, 10:53 PM
I think your location needs to be explicit like:
MY_RESULTS = S3Result(bucket='company',location='dexter/folder/output4.txt')
otherwise it will treat it as a folder
👍 1
d

Dexter Antonio

02/18/2022, 10:54 PM
I figured it out. Thanks for your help. The issue was that
MY_RESULTS = S3Result(bucket='ml.system1bio',location='dexter_ihc_slide_analysis/prefect5/output4')
was creating a file titled
output4
for each task and writing to it. Every single task wrote to exactly the same file overriding the previous task output. The solution is to create a different result object for each object that you want to write to S3. I think having a dedicated bucket and not specifying the location might work, but I haven’t tried that.
k

Kevin Kho

02/18/2022, 10:56 PM
You can template but this will only work for runs with an agent
d

Dexter Antonio

02/18/2022, 11:03 PM
Thanks, I’m just going to write a Task to store the results that returns an s3 path then put the s3 paths in a database after running the Flow.
6 Views