My understanding of the KubernetesRun is that the ...
# prefect-server
s
My understanding of the KubernetesRun is that the env passed only applies to the prefect job pod that spins up - is there a straightforward way to pass env vars to the dask worker pods (using desk gateway)? I am thinking of using parameters based off the environment during registration, but not sure if that is the best approach.
k
If you know the env ahead of time of the Flow Run, then it would be in the cluster creation
GatewayCluster(…, env={})
. But if you want it to be a Parameter, maybe you have to create it through a task and use a ResourceManager because you can’t set it on the executor. If you are attaching it on the Executor, it’s not the env var of the registration machine that will get used. It will be the env var of the machine that executes it. The Executor is not serialized along with the Flow. It is instantiated during runtime from the Flow in Storage.
s
Hrmm…Ok - so basically I want to define the environment variables in our CI (for dev/staging/prod) and then have those be configured to the dask worker. I will try the first option, as we are using a custom Flow class anyway to make it easier to handle thing like environments storage, etc.
I haven’t had issue making it a parameter, but then we can’t rely on environment variables if we are creating flows in any other repo. I suppose another alternative could just be having a task that sets an environment var right?
k
I think if the agent is spun up with the
--env
var, you can set the env variable on the dev/staging/prod agent. It then gets passed to the Flow, and the executor can pull it from there. If you want it in the Flow, you would put it in the RunConfig as part of the
env
. This value would then be pulled when spinning up the executor. Yes for the task that sets an env var if you use the Resource Manager. That won’t work for
env
being passed to FargateCluster as an Executor
s
got it - we are using a single agent deployed from the helm chart, so I don’t think that would work. At the risk of repeating something here, can you clarify what the
env
being passed to
KubernetesRun
actually does? I am a little lost on the interactions between the agent and the prefect job pod that gets spun up, and how they connect to the dask scheduler/workers.
k
An env variable on the agent gets passed to the Flow. This can be overridden with the RunConfig, which allows you to trigger a Flow Run with a different RunConfig from the UI. So env can be passed in both places but the RunConfig takes precedence. So when the FlowRun starts, it creates the process with the a merge of the agent and RunConfig settings
Environment variables can be set on both places. The Prefect pod gets spun up, it gets the Flow from storage, and runs it. This involves spinning up the executor. So the job pod is the one that creates the Dask cluster.
Am I clearing things up or just making it muddier?
env
passed to the RunConfig adds those env variables to the job. Doc string is here
s
Got it (I think) - so I can see in the UI that my run_config variables are being set (the environment variables section is populated). This ONLY applies to the job (so I guess flow logic, but not actual tasks?) What I need to see is if we can pass those to the dask worker i.e. probably as part of the cluster_kwargs here.
Copy code
class prefect.executors.dask.DaskExecutor(address=None, cluster_class=None, cluster_kwargs=None, adapt_kwargs=None, client_kwargs=None, debug=None, performance_report_path=None)
and the env variables on the job can be used for things like
dask_gateway_url
,
case
or other branching logic, etc
k
So if you do:
Copy code
@task
def some_task():
   x = os.environ["x"]
   return x
with a DaskExecutor, then this may execute on a worker without the environment variable which can cause the error. So yes that’s what I would try. I would use the
KubernetesRun
to get my env variable on the k8s job, and then pass it in the
DaskExecutor
Copy code
DaskExecutor(...,cluster_kwargs={"env":{"x":os.environ["x"]}})
If you want to use the env var for
dask_gateway_url
, I suggest using the callable like:
Copy code
from dask_gateway import GatewayCluster


def gen_cluster():
    env_var = os.environ["x"]
    return GatewayCluster(cluster_address=env_var, cluster_kwargs={"env":{"x": 2}})

@task
def my_task(x):
    pass

with Flow("example", executor=DaskExecutor(cluster_class=gen_cluster)) as flow:
    my_task(x)

flow.run()
s
Thanks!! Always appreciate the help I’ll test this stuff out.