Cosimo Alfarano
09/18/2025, 3:41 PMMarvin
09/18/2025, 3:42 PMNate
09/18/2025, 3:45 PMNate
09/18/2025, 3:46 PMjob_variables
are how you do this for deployments, ie override job variables at runtime, e.g. cpu or mem when using k8s work poolMarvin
09/18/2025, 3:47 PMfrom prefect import flow, task
from prefect_ray.task_runners import RayTaskRunner
from prefect_ray.context import remote_options
@task
def heavy_compute(x):
# do CPU/memory-heavy work here
return x * x
@flow(task_runner=RayTaskRunner(init_kwargs={"address": "auto"}))
def my_flow(resource_config: dict):
# resource_config could be fetched dynamically outside this flow too
cpus = resource_config["heavy_compute"]["cpus"]
mem_bytes = resource_config["heavy_compute"]["mem_bytes"]
# Apply Ray resource options for just this call
with remote_options(num_cpus=cpus, memory=mem_bytes):
fut = heavy_compute.submit(42)
return fut.result()
# Example runtime input
# my_flow({"heavy_compute": {"cpus": 2, "mem_bytes": 4 * 1024**3}})
Notes:
- Works best when you have a Ray cluster (local or remote) with enough capacity; Ray enforces per-task resource requests.
- You can also specify GPUs with num_gpus
and custom resources.
Option B: Dask (per-task constraints via resource tokens)
- Dask supports “resources” as scheduling constraints. You annotate tasks with resource requirements; workers must advertise matching resource tokens.
- This is powerful for custom resource keys (like GPU=1, or MEM tokens), but it’s a constraint model rather than hard container limits.
Docs:
- prefect-dask integration
- Dask resources guide
Example:
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner
import dask
@task
def transform(x):
return x + 1
@flow(task_runner=DaskTaskRunner()) # configure your Dask cluster as needed
def my_flow(resource_config: dict):
# e.g., {"transform": {"CPU": 2, "MEM": 8}} # tokens/units you define
resources = resource_config["transform"]
# Annotate just this call with resource constraints
with dask.annotate(resources=resources):
fut = transform.submit(10)
return fut.result()
Important: You must start Dask workers with those resource tokens (e.g., --resources "CPU=4,MEM=16"
) or configure them via your Dask cluster (like dask-kubernetes worker templates).
Alternative (if you need container-level isolation per “task”):
- Split the heavy work into subflows or separate deployments and route them to different work pools with different infrastructure (e.g., different KubernetesJob templates with CPU/memory requests/limits). Trigger them from a parent flow via run_deployment
. This gives you per-execution container resources, but at a flow/subflow level rather than a single task function.
What would help me recommend the best fit:
- Where are you running this (Kubernetes, Docker, local, Ray/Dask cluster)?
- Do you need strict per-task isolation and limits at the container level, or just scheduler-level allocation?
- Are resources CPU-only, or also memory/GPU? And how are they determined (params, service, DB)?
If you can share that, I can tailor an example to your stack.Cosimo Alfarano
09/18/2025, 4:49 PM@task
def simulate_scenario(...): ...
where depending on what you simulate it requires different resources.
the combinations of resources and scenarios are too big to have a specialized task for that combinationNate
09/18/2025, 4:54 PMsimulate_scenario
just as easily be a flow?
https://docs.prefect.io/v3/advanced/submit-flows-directly-to-dynamic-infrastructure#submitting-workflows-to-specific-infrastructure
it sounds like this (beta) approach might be usefulNate
09/18/2025, 4:55 PMNate
09/18/2025, 4:57 PMCosimo Alfarano
09/18/2025, 5:20 PM@flow
def simulate_scenarios(...): ...
@flow
def orchestrate(...):
cluster_definition = create_cluster_definition(...based on resources required...)
resourced_simulation = simulate_scenarios.with_options(task_runner=cluster_definition)
...
or something like that.
But I was trying to understand if this is a common enough use case to have better solutions (e.g. more idiomatic)