https://prefect.io logo
Title
l

Lukas N.

09/03/2020, 2:22 PM
Hi everyone 👋, I've recently discovered Results which seem like a great fit for one of my task. Basically, I've got a task
copy_data_to_s3
which takes data (after expensive computation) on input copies them to s3 and outputs the s3 path (not the data!) where they were stored. Other downstream tasks work with that s3 path. Now if some of the downstream tasks fail, I need to re-run the entire flow, whereas with S3Result I could just rerun the part after s3 data copy. Now the tricky part is I want to output the path on s3 and not the actual the data which is what S3Result does by default, seems like it should be easy modification but I can't come up with a solution. Anyone able to help?
👀 1
I tried overriding the
read
method of
S3Result
to return the location but that doesn't work when you run the flow for the first time
m

Mark McDonald

09/03/2020, 2:48 PM
could you use a location template in the result object? https://docs.prefect.io/core/advanced_tutorials/using-results.html#templating-a-task-s-location this way the s3 key is deterministic - so you can reference it elsewhere in the flow. There is probably a better way
d

Dylan

09/03/2020, 3:34 PM
@Lukas N. I think that should work
I think your location should be instantiated on the first run
Can you show me the error you’re seeing?
l

Lukas N.

09/03/2020, 3:59 PM
Oh, I think I got it now. I wasn't getting any errors on the first run, I wasn't just getting the right value. I've got this toy example:
s3_result = S3Result(
    bucket=DATA_BUCKET,
    location='test/{task_name}_{scheduled_start_time:%Y-%d}.txt',
)

@task
def expensive_task():
    return [1, 2, 3]

@task(result=s3_result)
def copy_to_s3_task(x):
    return x

@task
def display(x):
    <http://prefect.context.logger.info|prefect.context.logger.info>(f'Display: {x}')

    if random.random() < 0.7:
        raise Exception('Random failure')


with Flow('flow') as flow:
    a = expensive_task()
    b = copy_to_s3_task(a)
    display(b)
and basically I don't want to see the log
Display: [1, 2, 3]
(the data) but rather
Display: test/copy_to_s3_task_2020-03.txt
(the s3 path). So at first I've created my custom implementation of
S3Result
which overrides the
read(...)
method to return the location instead of the data. However, on the first run the S3 target doesn't exist yet, so you don't call any
read
you call
write
which returns the data, not the location... That's where it hit me, here is the
Result
that does the trick
class CustomS3Result(S3Result):

    def write(self, value: Any, **kwargs: Any) -> Result:
        result = super().write(value, **kwargs)
        result.value = result.location
        return result

    def read(self, location: str) -> Result:
        self.logger.debug(
            "Starting to download result from {}...".format(location)
        )

        new = self.copy()
        new.location = location
        new.value = location
        return new
d

Dylan

09/03/2020, 4:00 PM
Awesome!
@Marvin archive “S3 Result that Returns URI instead of the Data”
🎉 2