Lukas N.
09/03/2020, 2:22 PMcopy_data_to_s3
which takes data (after expensive computation) on input copies them to s3 and outputs the s3 path (not the data!) where they were stored. Other downstream tasks work with that s3 path. Now if some of the downstream tasks fail, I need to re-run the entire flow, whereas with S3Result I could just rerun the part after s3 data copy. Now the tricky part is I want to output the path on s3 and not the actual the data which is what S3Result does by default, seems like it should be easy modification but I can't come up with a solution. Anyone able to help?read
method of S3Result
to return the location but that doesn't work when you run the flow for the first timeMark McDonald
09/03/2020, 2:48 PMDylan
09/03/2020, 3:34 PMLukas N.
09/03/2020, 3:59 PMs3_result = S3Result(
bucket=DATA_BUCKET,
location='test/{task_name}_{scheduled_start_time:%Y-%d}.txt',
)
@task
def expensive_task():
return [1, 2, 3]
@task(result=s3_result)
def copy_to_s3_task(x):
return x
@task
def display(x):
<http://prefect.context.logger.info|prefect.context.logger.info>(f'Display: {x}')
if random.random() < 0.7:
raise Exception('Random failure')
with Flow('flow') as flow:
a = expensive_task()
b = copy_to_s3_task(a)
display(b)
and basically I don't want to see the log Display: [1, 2, 3]
(the data) but rather Display: test/copy_to_s3_task_2020-03.txt
(the s3 path).
So at first I've created my custom implementation of S3Result
which overrides the read(...)
method to return the location instead of the data. However, on the first run the S3 target doesn't exist yet, so you don't call any read
you call write
which returns the data, not the location... That's where it hit me, here is the Result
that does the trick
class CustomS3Result(S3Result):
def write(self, value: Any, **kwargs: Any) -> Result:
result = super().write(value, **kwargs)
result.value = result.location
return result
def read(self, location: str) -> Result:
self.logger.debug(
"Starting to download result from {}...".format(location)
)
new = self.copy()
new.location = location
new.value = location
return new
Dylan
09/03/2020, 4:00 PMMarvin
09/03/2020, 4:02 PM