Hello!
I was wondering how to get the flow result location inside a task? Is this possible to do?
a
Anna Geller
11/18/2021, 9:51 AM
@Ramtin you can get the result location after you ran the task, e.g.:
Copy code
from prefect import task, Flow
from prefect.engine.results import LocalResult
@task(result=LocalResult(location="sample_result.prefect"))
def hello_world():
return "Hello world"
with Flow("local-results") as flow:
hello_world()
state = flow.run()
hello_world = flow.get_tasks()[0]
print(hello_world.result.location)
r
Ramtin
11/18/2021, 9:54 AM
The question was unclear. So what I want to do is to inside a task upload some data to a S3 bucket, but i want this bucket to be the same as the flows result bucket.
Ramtin
11/18/2021, 10:03 AM
Copy code
@task()
def hello_world():
infered_bucket = here i want the flow bucket
return "Hello world"
with Flow("local-results") as flow:
hello_world()
flow.result = S3Result(bucket="some_bucket")
a
Anna Geller
11/18/2021, 10:07 AM
@Ramtin I think the easiest way would be to reuse a global variable name and use boto3 inside of your task:
Copy code
import boto3
from prefect.engine.results import S3Result
bucket = "some_bucket"
@task
def upload_object(file_name):
s3_client = boto3.client('s3')
s3_client.upload_file(file_name, bucket, f"path/on/s3/{file_name}")
with Flow("local-results") as flow:
upload_object()
flow.result = S3Result(bucket=bucket)
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.