Hi, I'm looking into running Prefect on Kubernetes...
# prefect-community
a
Hi, I'm looking into running Prefect on Kubernetes clusters in different regions, and had a question about handling GCSResults. If we use the
@task(result=GCSResult(bucket=<bucket>))
method of configuring a task result, is the bucket path fixed at flow registration time? If so, is there a way it can be dynamically set at flow run time? What I'm hoping to do is have flows that can be registered to run on different clusters (using agent labels), and have their GCSResult bucket path be configured via an env var on the cluster. That way we can re-use the same flow code across different clusters but have different results buckets depending on the cluster.
I think creating a
GCSResult
inside a task and calling
GCSResult.write
directly would get the behavior I'm looking for. Curious whether there's a way to do it with the
@task
decorator though
a
I wonder, does it matter, though? because results are primarily used for restarts and recovery from failure. You could use the same bucket but prefix it with the dev/prod using Result location templating to separate this out
to write your data to the end destination, it would be easier to use your custom logic to ensure your data is written the right way and in the right format rather than relying on
GCSResult.write
a
What we're concerned about is some data cannot leave the region, so we would want to have the result buckets located in a specific region for each cluster
a
I understand the problem but I don't have any better idea than writing a different flow file for dev vs prod - you could have on dev branch GCSResult with "dev" bucket and in "main" branch flow code with prod bucket. Alternatively, you could have some function returning the bucket name based on your environment.
Copy code
env = "dev"
gcs_bucket = get_bucket_name(env)
gcs_result = GCSResult(bucket=gcs_bucket, location='{task_name}.txt')
k
Yeah I’m with Anna here. Only location can be configured at runtime, not the bucket
👍 1
a
This will be easier in 2.0!
👍 1
a
Got it, thanks! We do have custom logic for saving a flow's final output which can get the bucket path dynamically, so that's fine for us now - was more curious if it could work with results configured directly in
@task
.
k
Yeah..it’s likely better to decouple yourself from the task/result interface and persist that yourself inside the task. You could use
prefect.context.parameters
to get the current environment. You can also use the Result interface inside the task if it helps
Copy code
@task
def abc():
    res = GCSResult(bucket = prefect.context.parameters[..], location=...)
    res.write(something)
    return res.location