Giovanni Giacco
11/26/2021, 1:41 PMAnna Geller
KubernetesRun
from both Python and UI. To set this in the UI, you can use the “Run” tab and the Run Configuration section.
From the flow code you can do that as follows:
with Flow(
FLOW_NAME,
storage=STORAGE,
run_config=KubernetesRun(
labels=["k8s"],
cpu_request=0.5,
memory_request="2Gi",
),
) as flow:
And if you want to set a specific executor, you can do:
from prefect.executors import DaskExecutor
flow.executor = DaskExecutor()
to change memory/cpu request for Dask Workers:
dask-worker <tcp://scheduler>:port --memory-limit="4 GiB"
And depending on the Dask Distributed class, you may also set this on the cluster class, e.g. with coiled.Cluster
, you can set:
• scheduler_memory - defaults to 4 GiB
• worker_memory - defaults to 8 GiB
To set that on your flow:
import coiled
from prefect.executors import DaskExecutor
flow.executor = DaskExecutor(
cluster_class=coiled.Cluster,
cluster_kwargs={
"software": "user/software_env_name",
"shutdown_on_close": True,
"name": "prefect-cluster",
"scheduler_memory": "4 GiB",
"worker_memory": "8 GiB",
},
)
Giovanni Giacco
11/26/2021, 1:59 PMAnna Geller
from prefect import Flow
from prefect.tasks.prefect import create_flow_run
with Flow("parent") as parent_flow:
child_run_id = create_flow_run(flow_name="child", run_config=KubernetesRun(
labels=["k8s"],
cpu_request=0.5,
memory_request="2Gi",
))
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by