Aric Huang
04/14/2022, 6:19 PM@task(result=GCSResult(bucket=<bucket>))
method of configuring a task result, is the bucket path fixed at flow registration time? If so, is there a way it can be dynamically set at flow run time? What I'm hoping to do is have flows that can be registered to run on different clusters (using agent labels), and have their GCSResult bucket path be configured via an env var on the cluster. That way we can re-use the same flow code across different clusters but have different results buckets depending on the cluster.GCSResult
inside a task and calling GCSResult.write
directly would get the behavior I'm looking for. Curious whether there's a way to do it with the @task
decorator thoughAnna Geller
04/14/2022, 6:25 PMGCSResult.write
Aric Huang
04/14/2022, 6:27 PMAnna Geller
04/14/2022, 6:33 PMenv = "dev"
gcs_bucket = get_bucket_name(env)
gcs_result = GCSResult(bucket=gcs_bucket, location='{task_name}.txt')
Kevin Kho
04/14/2022, 6:33 PMAnna Geller
04/14/2022, 6:34 PMAric Huang
04/14/2022, 6:37 PM@task
.Kevin Kho
04/14/2022, 6:39 PMprefect.context.parameters
to get the current environment. You can also use the Result interface inside the task if it helps
@task
def abc():
res = GCSResult(bucket = prefect.context.parameters[..], location=...)
res.write(something)
return res.location