Paul Reyna
08/14/2023, 10:35 PMresult_storage_key
for each task?Marvin
08/14/2023, 10:36 PMresult_storage_key
for each task. You can use the result_storage_key
option which defaults to a null value, generating a unique identifier for each result.
This key can be formatted with access to all of the modules in prefect.runtime
and the run's parameters. For example, if you run a flow with three runs of the same task, each task run can write its result to a unique file based on a parameter, such as the task name.
Here's an example:
python
from prefect import flow, task
from prefect.filesystems import S3
@flow(result_storage=S3(bucket_path="my-bucket"))
def my_flow():
hello_world()
hello_world(name="foo")
hello_world(name="bar")
@task(persist_result=True, result_storage_key="hello-{parameters[name]}.json")
def hello_world(name: str = "world"):
return f"hello {name}"
my_flow()
In this example, after running the flow, you can see three persisted result files in your storage directory, each with a unique key based on the name
parameter.
Remember, this is just a simple example. The actual implementation might be a bit more complex and require a bit more effort. But then again, what doesn't?Paul Reyna
08/14/2023, 10:38 PMwith Flow(result=S3Bucket(bucket="mybucket", location=f"{flow_name}/{task_name}" as flow:
so in Prefect 2 can we do something similar and define that S3 key to contain information about the tasks being run?with Flow(result=S3Bucket(bucket="mybucket", location=f"{flow_name}/{task_name}" as flow:
so in Prefect 2 can we do something similar and define that S3 key to contain information about the tasks being run?Marvin
08/14/2023, 10:41 PMResult
class's location
attribute. This attribute is a template string that can be formatted with runtime information to determine where the result should be stored.
Here's an example:
python
from prefect import Flow, Task
from prefect.engine.results import S3Result
class MyTask(Task):
def run(self):
return "Hello, world!"
with Flow("my-flow", result=S3Result(bucket="mybucket", location="{flow_name}/{task_name}")) as flow:
t1 = MyTask()
t2 = MyTask()
flow.run()
In this example, the results of t1
and t2
will be stored in mybucket/my-flow/MyTask
, respectively. The location
attribute supports a variety of placeholders, including {flow_name}
, {task_name}
, {task_slug}
, {task_run_id}
, {task_run_slug}
, and {parameters}
.
Remember, life... don't talk to me about life.Paul Reyna
08/14/2023, 10:43 PMMarvin
08/14/2023, 10:44 PMS3Result
class has been replaced with the S3
class in the prefect.tasks.aws.s3
module. You can still specify a location for your results at the flow level, but you'll need to use the result
attribute of the Flow
class, and the location
attribute of the S3
class.
Here's an example:
python
from prefect import Flow, Task
from prefect.tasks.aws.s3 import S3
class MyTask(Task):
def run(self):
return "Hello, world!"
with Flow("my-flow") as flow:
t1 = MyTask()
t2 = MyTask()
flow.result = S3(bucket="mybucket", key="{flow_name}/{task_name}")
flow.run()
In this example, the results of t1
and t2
will be stored in mybucket/my-flow/MyTask
, respectively. The key
attribute supports a variety of placeholders, including {flow_name}
, {task_name}
, {task_slug}
, {task_run_id}
, {task_run_slug}
, and {parameters}
.
However, it's worth noting that there are some issues with using the S3
class with mapping and targets, as reported in this GitHub issue. You may need to include {filename}
in your key
attribute when using mapping, like so:
python
flow.result = S3(bucket="mybucket", key="{flow_name}/{task_name}/{filename}")
I hope this helps. If not, well, I'm used to my efforts being in vain.