Paul Reyna
08/14/2023, 10:35 PMresult_storage_key for each task?Marvin
08/14/2023, 10:36 PMresult_storage_key for each task. You can use the result_storage_key option which defaults to a null value, generating a unique identifier for each result.
This key can be formatted with access to all of the modules in prefect.runtime and the run's parameters. For example, if you run a flow with three runs of the same task, each task run can write its result to a unique file based on a parameter, such as the task name.
Here's an example:
python
from prefect import flow, task
from prefect.filesystems import S3
@flow(result_storage=S3(bucket_path="my-bucket"))
def my_flow():
hello_world()
hello_world(name="foo")
hello_world(name="bar")
@task(persist_result=True, result_storage_key="hello-{parameters[name]}.json")
def hello_world(name: str = "world"):
return f"hello {name}"
my_flow()
In this example, after running the flow, you can see three persisted result files in your storage directory, each with a unique key based on the name parameter.
Remember, this is just a simple example. The actual implementation might be a bit more complex and require a bit more effort. But then again, what doesn't?Paul Reyna
08/14/2023, 10:38 PMwith Flow(result=S3Bucket(bucket="mybucket", location=f"{flow_name}/{task_name}" as flow:
so in Prefect 2 can we do something similar and define that S3 key to contain information about the tasks being run?Paul Reyna
08/14/2023, 10:41 PMwith Flow(result=S3Bucket(bucket="mybucket", location=f"{flow_name}/{task_name}" as flow:
so in Prefect 2 can we do something similar and define that S3 key to contain information about the tasks being run?Marvin
08/14/2023, 10:41 PMResult class's location attribute. This attribute is a template string that can be formatted with runtime information to determine where the result should be stored.
Here's an example:
python
from prefect import Flow, Task
from prefect.engine.results import S3Result
class MyTask(Task):
def run(self):
return "Hello, world!"
with Flow("my-flow", result=S3Result(bucket="mybucket", location="{flow_name}/{task_name}")) as flow:
t1 = MyTask()
t2 = MyTask()
flow.run()
In this example, the results of t1 and t2 will be stored in mybucket/my-flow/MyTask, respectively. The location attribute supports a variety of placeholders, including {flow_name}, {task_name}, {task_slug}, {task_run_id}, {task_run_slug}, and {parameters}.
Remember, life... don't talk to me about life.Paul Reyna
08/14/2023, 10:43 PMMarvin
08/14/2023, 10:44 PMS3Result class has been replaced with the S3 class in the prefect.tasks.aws.s3 module. You can still specify a location for your results at the flow level, but you'll need to use the result attribute of the Flow class, and the location attribute of the S3 class.
Here's an example:
python
from prefect import Flow, Task
from prefect.tasks.aws.s3 import S3
class MyTask(Task):
def run(self):
return "Hello, world!"
with Flow("my-flow") as flow:
t1 = MyTask()
t2 = MyTask()
flow.result = S3(bucket="mybucket", key="{flow_name}/{task_name}")
flow.run()
In this example, the results of t1 and t2 will be stored in mybucket/my-flow/MyTask, respectively. The key attribute supports a variety of placeholders, including {flow_name}, {task_name}, {task_slug}, {task_run_id}, {task_run_slug}, and {parameters}.
However, it's worth noting that there are some issues with using the S3 class with mapping and targets, as reported in this GitHub issue. You may need to include {filename} in your key attribute when using mapping, like so:
python
flow.result = S3(bucket="mybucket", key="{flow_name}/{task_name}/{filename}")
I hope this helps. If not, well, I'm used to my efforts being in vain.