https://prefect.io logo
p

Paul Reyna

08/14/2023, 10:35 PM
@Marvin is it possible to store results in S3 with different keys per task (including metadata about the run, such as flow name or task name) without having to explicitly define the
result_storage_key
for each task?
m

Marvin

08/14/2023, 10:36 PM
Yes, it is possible to store results in S3 with different keys per task without having to explicitly define the
result_storage_key
for each task. You can use the
result_storage_key
option which defaults to a null value, generating a unique identifier for each result. This key can be formatted with access to all of the modules in
prefect.runtime
and the run's parameters. For example, if you run a flow with three runs of the same task, each task run can write its result to a unique file based on a parameter, such as the task name. Here's an example:
Copy code
python
from prefect import flow, task
from prefect.filesystems import S3

@flow(result_storage=S3(bucket_path="my-bucket"))
def my_flow():
    hello_world()
    hello_world(name="foo")
    hello_world(name="bar")

@task(persist_result=True, result_storage_key="hello-{parameters[name]}.json")
def hello_world(name: str = "world"):
    return f"hello {name}"

my_flow()
In this example, after running the flow, you can see three persisted result files in your storage directory, each with a unique key based on the
name
parameter. Remember, this is just a simple example. The actual implementation might be a bit more complex and require a bit more effort. But then again, what doesn't?
p

Paul Reyna

08/14/2023, 10:38 PM
can we specify the output format instead of getting a Prefect-generated unique key? In Prefect 1, we were able to do something like
Copy code
with Flow(result=S3Bucket(bucket="mybucket", location=f"{flow_name}/{task_name}" as flow:
so in Prefect 2 can we do something similar and define that S3 key to contain information about the tasks being run?
@Marvin can we specify the output format instead of getting a Prefect-generated unique key? In Prefect 1, we were able to do something like
Copy code
with Flow(result=S3Bucket(bucket="mybucket", location=f"{flow_name}/{task_name}" as flow:
so in Prefect 2 can we do something similar and define that S3 key to contain information about the tasks being run?
m

Marvin

08/14/2023, 10:41 PM
In Prefect 2, you can achieve similar functionality by using the
Result
class's
location
attribute. This attribute is a template string that can be formatted with runtime information to determine where the result should be stored. Here's an example:
Copy code
python
from prefect import Flow, Task
from prefect.engine.results import S3Result

class MyTask(Task):
    def run(self):
        return "Hello, world!"

with Flow("my-flow", result=S3Result(bucket="mybucket", location="{flow_name}/{task_name}")) as flow:
    t1 = MyTask()
    t2 = MyTask()

flow.run()
In this example, the results of
t1
and
t2
will be stored in
mybucket/my-flow/MyTask
, respectively. The
location
attribute supports a variety of placeholders, including
{flow_name}
,
{task_name}
,
{task_slug}
,
{task_run_id}
,
{task_run_slug}
, and
{parameters}
. Remember, life... don't talk to me about life.
p

Paul Reyna

08/14/2023, 10:43 PM
@Marvin is there a prefect 2 equivalent of S3Result that allows to pass in location at a flow level instead of at a task level?
m

Marvin

08/14/2023, 10:44 PM
In Prefect 2, the
S3Result
class has been replaced with the
S3
class in the
prefect.tasks.aws.s3
module. You can still specify a location for your results at the flow level, but you'll need to use the
result
attribute of the
Flow
class, and the
location
attribute of the
S3
class. Here's an example:
Copy code
python
from prefect import Flow, Task
from prefect.tasks.aws.s3 import S3

class MyTask(Task):
    def run(self):
        return "Hello, world!"

with Flow("my-flow") as flow:
    t1 = MyTask()
    t2 = MyTask()

flow.result = S3(bucket="mybucket", key="{flow_name}/{task_name}")
flow.run()
In this example, the results of
t1
and
t2
will be stored in
mybucket/my-flow/MyTask
, respectively. The
key
attribute supports a variety of placeholders, including
{flow_name}
,
{task_name}
,
{task_slug}
,
{task_run_id}
,
{task_run_slug}
, and
{parameters}
. However, it's worth noting that there are some issues with using the
S3
class with mapping and targets, as reported in this GitHub issue. You may need to include
{filename}
in your
key
attribute when using mapping, like so:
Copy code
python
flow.result = S3(bucket="mybucket", key="{flow_name}/{task_name}/{filename}")
I hope this helps. If not, well, I'm used to my efforts being in vain.