Dexter Antonio
02/18/2022, 9:36 PMMY_RESULTS = S3Result(bucket='my_bucket_without_s3_prefix',location='my_output_folder')
prefect.config.flows.checkpointing = True
!export PREFECT__FLOWS__CHECKPOINTING=true
with Flow("please work", result=MY_RESULTS) as f:
t1 = my_task()
state = f.run()
!aws s3 ls <s3://my_bucket_name_witohut_s3_prefix/my_output_folder> # nothing is here
Is there something obvious, which I am missing?Kevin Kho
flow.run()
unless PREFECT__FLOWS__CHECKPOINTING=true
but exporting tis env var like this will not work because this env var is loaded in to the Prefect context when you import prefect so it has to be done before you import PrefectDexter Antonio
02/18/2022, 10:21 PMPREFECT__FLOWS__CHECKPOINTING=true
before importing prefect did resolve the issue when using a LocalResults object. Unfortunately, nothing is getting written to my S3 bucket, although the folder specified by the S3Results object is created.Kevin Kho
Dexter Antonio
02/18/2022, 10:23 PMMY_RESULTS = S3Result(bucket='ml.funcompany',location='dexter/output/')
with Flow("please work", result=MY_RESULTS) as f:
t2 = sf(slide_names)
row = get_first_row(t2)
dapi, protein = load_scene(*get_slide_scene_protein(row))
dapi_contour = cm(dapi)
scaled_contour = scale_down_contour(dapi_contour)
bm = pbm(cropper_task(protein, dapi_contour), dapi_contour)
score_protein_binary_mask(img=bm, outer_contour=scaled_contour)
Here is an example task
class Cropper(Task):
def __init__(self, default: int, *args, **kwargs):
super().__init__(*args, **kwargs)
self.default = default
self.checkpoint = True
def run(self, img, contour):
return self.get_organoid_cropper(contour)(img)
@staticmethod
def get_bbox_from_coords(coords, use_original_dims=True):
bl_corner = coords.min(axis=0) # we will use these later
ur_corner = coords.max(axis=0)
if use_original_dims:
return np.concatenate([bl_corner, ur_corner]), bl_corner
else:
return np.concatenate([[0, 0], ur_corner - bl_corner]), bl_corner
@classmethod
def get_organoid_cropper(cls, contour):
bbox = cls.get_bbox_from_coords(contour)[0]
bbox_slice = slice(bbox[0,1], bbox[1,1]) , slice(bbox[0,0], bbox[1,0])
return lambda img: img[bbox_slice]
slide_names = ['SL010334', 'SL010340', 'SL010334']
sf = SceneFetcher(default=1)
cm = DAPIContourMaker(default=1)
cropper_task = Cropper(default=1)
pbm = ProteinBinaryMask()
Kevin Kho
import os
from unittest import result
os.environ["PREFECT__FLOWS__CHECKPOINTING"] ="true"
from prefect import Flow, task, Task
from prefect.engine.results import LocalResult
class Cropper(Task):
def __init__(self, default: int, *args, **kwargs):
super().__init__(*args, **kwargs)
self.default = default
self.checkpoint=True
def something(self):
return 1
def run(self):
return self.something
res = LocalResult("/Users/kevinkho/Work/fugue-analytics/fugue_analytics", location="4.txt")
cropper = Cropper(default=1)
with Flow("test", result=res) as flow:
cropper()
flow.run()
Dexter Antonio
02/18/2022, 10:43 PMMY_RESULTS = S3Result(bucket='company',location='dexter/folder/output4')
the “folder” output4 in S3
is created but it contains no files after running the flow.Kevin Kho
MY_RESULTS = S3Result(bucket='company',location='dexter/folder/output4.txt')
otherwise it will treat it as a folderDexter Antonio
02/18/2022, 10:54 PMMY_RESULTS = S3Result(bucket='ml.system1bio',location='dexter_ihc_slide_analysis/prefect5/output4')
was creating a file titled output4
for each task and writing to it. Every single task wrote to exactly the same file overriding the previous task output. The solution is to create a different result object for each object that you want to write to S3. I think having a dedicated bucket and not specifying the location might work, but I haven’t tried that.Kevin Kho
Dexter Antonio
02/18/2022, 11:03 PM