Sam Werbalowsky
01/04/2022, 9:02 PMKevin Kho
01/04/2022, 9:07 PMGatewayCluster(…, env={})
. But if you want it to be a Parameter, maybe you have to create it through a task and use a ResourceManager because you can’t set it on the executor.
If you are attaching it on the Executor, it’s not the env var of the registration machine that will get used. It will be the env var of the machine that executes it. The Executor is not serialized along with the Flow. It is instantiated during runtime from the Flow in Storage.Sam Werbalowsky
01/04/2022, 9:18 PMKevin Kho
01/04/2022, 9:23 PM--env
var, you can set the env variable on the dev/staging/prod agent. It then gets passed to the Flow, and the executor can pull it from there. If you want it in the Flow, you would put it in the RunConfig as part of the env
. This value would then be pulled when spinning up the executor.
Yes for the task that sets an env var if you use the Resource Manager. That won’t work for env
being passed to FargateCluster as an ExecutorSam Werbalowsky
01/04/2022, 9:33 PMenv
being passed to KubernetesRun
actually does? I am a little lost on the interactions between the agent and the prefect job pod that gets spun up, and how they connect to the dask scheduler/workers.Kevin Kho
01/04/2022, 9:36 PMenv
passed to the RunConfig adds those env variables to the job. Doc string is hereSam Werbalowsky
01/04/2022, 9:42 PMclass prefect.executors.dask.DaskExecutor(address=None, cluster_class=None, cluster_kwargs=None, adapt_kwargs=None, client_kwargs=None, debug=None, performance_report_path=None)
dask_gateway_url
, case
or other branching logic, etcKevin Kho
01/04/2022, 9:58 PM@task
def some_task():
x = os.environ["x"]
return x
with a DaskExecutor, then this may execute on a worker without the environment variable which can cause the error.
So yes that’s what I would try. I would use the KubernetesRun
to get my env variable on the k8s job, and then pass it in the DaskExecutor
DaskExecutor(...,cluster_kwargs={"env":{"x":os.environ["x"]}})
If you want to use the env var for dask_gateway_url
, I suggest using the callable like:
from dask_gateway import GatewayCluster
def gen_cluster():
env_var = os.environ["x"]
return GatewayCluster(cluster_address=env_var, cluster_kwargs={"env":{"x": 2}})
@task
def my_task(x):
pass
with Flow("example", executor=DaskExecutor(cluster_class=gen_cluster)) as flow:
my_task(x)
flow.run()
Sam Werbalowsky
01/04/2022, 10:09 PM