Hello. From the Prefect Cloud GUI I can change memory/cpu request for a KubernetesRun ? How I can do...
g
Hello. From the Prefect Cloud GUI I can change memory/cpu request for a KubernetesRun ? How I can do that when I start a flow from Python? And is there a way to change the executor too? I’d like to change memory/cpu request for Dask Workers pod depending on the effort of the computation requested.
a
@Giovanni Giacco you can change the memory/cpu request for a
KubernetesRun
from both Python and UI. To set this in the UI, you can use the “Run” tab and the Run Configuration section. From the flow code you can do that as follows:
Copy code
with Flow(
        FLOW_NAME,
        storage=STORAGE,
        run_config=KubernetesRun(
            labels=["k8s"],
            cpu_request=0.5,
            memory_request="2Gi",
        ),
) as flow:
And if you want to set a specific executor, you can do:
Copy code
from prefect.executors import DaskExecutor
flow.executor = DaskExecutor()
to change memory/cpu request for Dask Workers:
Copy code
dask-worker <tcp://scheduler>:port --memory-limit="4 GiB"
And depending on the Dask Distributed class, you may also set this on the cluster class, e.g. with 
coiled.Cluster
, you can set: • scheduler_memory - defaults to 4 GiB • worker_memory - defaults to 8 GiB To set that on your flow:
Copy code
import coiled
from prefect.executors import DaskExecutor

flow.executor = DaskExecutor(
    cluster_class=coiled.Cluster,
    cluster_kwargs={
        "software": "user/software_env_name",
        "shutdown_on_close": True,
        "name": "prefect-cluster",
        "scheduler_memory": "4 GiB",
        "worker_memory": "8 GiB",
    },
)
g
Thank you for the answer @Anna Geller. But how I can do that at runtime? I’d like to change those parameters at runtime depending on the input parameters. Let me explain better what I’d like to accomplish: • I have a parent flow. It defines the amount of cpu/memory a child flow needs to execute • The parent flow starts the child flow with cpu/memory values computed at the first step.
a
You can do it this way:
Copy code
from prefect import Flow
from prefect.tasks.prefect import create_flow_run

with Flow("parent") as parent_flow:
    child_run_id = create_flow_run(flow_name="child", run_config=KubernetesRun(
            labels=["k8s"],
            cpu_request=0.5,
            memory_request="2Gi",
        ))