Dexter Antonio

    Dexter Antonio

    7 months ago
    Hi, I’m trying to store the Results from each Task in a Flow on S3, but I am having some trouble with it. When I set the results object to be an S3Result, nothing ends up being stored in S3. I am able to directly write files with the S3Results object, but the Results from a task are not automatically stored there. I have tried to set checkpointing to True, so I don’t think that is the issue. Here is some example code.
    MY_RESULTS = S3Result(bucket='my_bucket_without_s3_prefix',location='my_output_folder')
    prefect.config.flows.checkpointing = True
    !export PREFECT__FLOWS__CHECKPOINTING=true
    with Flow("please work", result=MY_RESULTS) as f:
        t1 = my_task()
    state = f.run()
    !aws s3 ls <s3://my_bucket_name_witohut_s3_prefix/my_output_folder>  # nothing is here
    Is there something obvious, which I am missing?
    Kevin Kho

    Kevin Kho

    7 months ago
    It won’t be persisted for
    flow.run()
    unless
    PREFECT__FLOWS__CHECKPOINTING=true
    but exporting tis env var like this will not work because this env var is loaded in to the Prefect context when you import prefect so it has to be done before you import Prefect
    Dexter Antonio

    Dexter Antonio

    7 months ago
    Setting the environmental variable
    PREFECT__FLOWS__CHECKPOINTING=true
    before importing prefect did resolve the issue when using a LocalResults object. Unfortunately, nothing is getting written to my S3 bucket, although the folder specified by the S3Results object is created.
    Kevin Kho

    Kevin Kho

    7 months ago
    Can you show me the Flow?
    Dexter Antonio

    Dexter Antonio

    7 months ago
    MY_RESULTS = S3Result(bucket='ml.funcompany',location='dexter/output/')
    with Flow("please work", result=MY_RESULTS) as f:
        t2 = sf(slide_names)
        row = get_first_row(t2)
        dapi, protein = load_scene(*get_slide_scene_protein(row))
        dapi_contour = cm(dapi)
        scaled_contour = scale_down_contour(dapi_contour)
        bm = pbm(cropper_task(protein, dapi_contour), dapi_contour)
        score_protein_binary_mask(img=bm, outer_contour=scaled_contour)
    Here is an example task
    class Cropper(Task):
        def __init__(self, default: int, *args, **kwargs):
            super().__init__(*args, **kwargs)
            self.default = default
            self.checkpoint = True 
    
        
        def run(self, img, contour):
            return self.get_organoid_cropper(contour)(img)
        
        @staticmethod
        def get_bbox_from_coords(coords, use_original_dims=True):
            bl_corner = coords.min(axis=0)  # we will use these later
            ur_corner = coords.max(axis=0)
            if use_original_dims:
                return np.concatenate([bl_corner, ur_corner]), bl_corner
    
            else:
                return np.concatenate([[0, 0], ur_corner - bl_corner]), bl_corner
    
        @classmethod
        def get_organoid_cropper(cls, contour):
            bbox = cls.get_bbox_from_coords(contour)[0]
            bbox_slice = slice(bbox[0,1], bbox[1,1]) , slice(bbox[0,0], bbox[1,0])
            return lambda img: img[bbox_slice]
    These are defined above the flow
    slide_names = ['SL010334', 'SL010340', 'SL010334']
    sf = SceneFetcher(default=1)
    cm = DAPIContourMaker(default=1)
    cropper_task = Cropper(default=1)
    pbm = ProteinBinaryMask()
    Kevin Kho

    Kevin Kho

    7 months ago
    I don’t see anything wrong if this is executing successfully. One sec will try
    It’s working fine for me:
    import os
    from unittest import result
    os.environ["PREFECT__FLOWS__CHECKPOINTING"] ="true"
    from prefect import Flow, task, Task 
    from prefect.engine.results import LocalResult
    
    class Cropper(Task):
        def __init__(self, default: int, *args, **kwargs):
            super().__init__(*args, **kwargs)
            self.default = default
            self.checkpoint=True
    
        def something(self):
            return 1
        
        def run(self):
            return self.something
    
    res = LocalResult("/Users/kevinkho/Work/fugue-analytics/fugue_analytics", location="4.txt")
    cropper = Cropper(default=1)
    with Flow("test", result=res) as flow:
        cropper()
    
    flow.run()
    maybe you can give an explicit filename for the location?
    Dexter Antonio

    Dexter Antonio

    7 months ago
    Thanks for trying it out. When I run it locally, it works without issue and creates a bunch of prefect-result* files in the output folder that I have specified. When I try to use aws’s S3 and use the following result object
    MY_RESULTS = S3Result(bucket='company',location='dexter/folder/output4')
    the “folder” output4 in
    S3
    is created but it contains no files after running the flow.
    Do you have an example of explicitly using the S3Result object that stores multiple files at that output location?
    Kevin Kho

    Kevin Kho

    7 months ago
    I think your location needs to be explicit like:
    MY_RESULTS = S3Result(bucket='company',location='dexter/folder/output4.txt')
    otherwise it will treat it as a folder
    Dexter Antonio

    Dexter Antonio

    7 months ago
    I figured it out. Thanks for your help. The issue was that
    MY_RESULTS = S3Result(bucket='ml.system1bio',location='dexter_ihc_slide_analysis/prefect5/output4')
    was creating a file titled
    output4
    for each task and writing to it. Every single task wrote to exactly the same file overriding the previous task output. The solution is to create a different result object for each object that you want to write to S3. I think having a dedicated bucket and not specifying the location might work, but I haven’t tried that.
    Kevin Kho

    Kevin Kho

    7 months ago
    You can template but this will only work for runs with an agent
    Dexter Antonio

    Dexter Antonio

    7 months ago
    Thanks, I’m just going to write a Task to store the results that returns an s3 path then put the s3 paths in a database after running the Flow.