<@ULVA73B9P> I have a set of tasks that require sp...
# ask-marvin
c
@Marvin I have a set of tasks that require specific resource allocation to run. example a certain number of cpus, or amount of memory. this amount of resources a given task should use are decided outside the task or the flow: the task or the flow themselves do not know the amount of resources before-hand. Also a flow might execute some tasks with some resources and some other with other resources. Is there a way to run a task with a runner that can manage dynamic resources this way?
m
thought for 314.6 seconds
n
when you say "task" do you mean this generically or "prefect Task"
job_variables
are how you do this for deployments, ie override job variables at runtime, e.g. cpu or mem when using k8s work pool
m
Great question — and there are a couple of ways to do this in Prefect 3.x depending on what you mean by “resources” and where you’re running. High-level: - Prefect workers (Process/Docker/Kubernetes) set resources per flow run, not per task. - For dynamic, per-task CPU/memory control, use a task runner that integrates with a compute scheduler that supports per-task resources, like Ray or Dask. Option A: Ray (per-task CPU/memory/GPU with strong controls) - Prefect + Ray lets you set resource options per task invocation using a context manager. - You can source the resource values at runtime (parameters, config service, database, etc.) and apply them right where you call the task. Docs: prefect-ray integration Example:
Copy code
from prefect import flow, task
from prefect_ray.task_runners import RayTaskRunner
from prefect_ray.context import remote_options

@task
def heavy_compute(x):
    # do CPU/memory-heavy work here
    return x * x

@flow(task_runner=RayTaskRunner(init_kwargs={"address": "auto"}))
def my_flow(resource_config: dict):
    # resource_config could be fetched dynamically outside this flow too
    cpus = resource_config["heavy_compute"]["cpus"]
    mem_bytes = resource_config["heavy_compute"]["mem_bytes"]

    # Apply Ray resource options for just this call
    with remote_options(num_cpus=cpus, memory=mem_bytes):
        fut = heavy_compute.submit(42)

    return fut.result()

# Example runtime input
# my_flow({"heavy_compute": {"cpus": 2, "mem_bytes": 4 * 1024**3}})
Notes: - Works best when you have a Ray cluster (local or remote) with enough capacity; Ray enforces per-task resource requests. - You can also specify GPUs with
num_gpus
and custom resources. Option B: Dask (per-task constraints via resource tokens) - Dask supports “resources” as scheduling constraints. You annotate tasks with resource requirements; workers must advertise matching resource tokens. - This is powerful for custom resource keys (like GPU=1, or MEM tokens), but it’s a constraint model rather than hard container limits. Docs: - prefect-dask integration - Dask resources guide Example:
Copy code
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner
import dask

@task
def transform(x):
    return x + 1

@flow(task_runner=DaskTaskRunner())  # configure your Dask cluster as needed
def my_flow(resource_config: dict):
    # e.g., {"transform": {"CPU": 2, "MEM": 8}}  # tokens/units you define
    resources = resource_config["transform"]

    # Annotate just this call with resource constraints
    with dask.annotate(resources=resources):
        fut = transform.submit(10)

    return fut.result()
Important: You must start Dask workers with those resource tokens (e.g.,
--resources "CPU=4,MEM=16"
) or configure them via your Dask cluster (like dask-kubernetes worker templates). Alternative (if you need container-level isolation per “task”): - Split the heavy work into subflows or separate deployments and route them to different work pools with different infrastructure (e.g., different KubernetesJob templates with CPU/memory requests/limits). Trigger them from a parent flow via
run_deployment
. This gives you per-execution container resources, but at a flow/subflow level rather than a single task function. What would help me recommend the best fit: - Where are you running this (Kubernetes, Docker, local, Ray/Dask cluster)? - Do you need strict per-task isolation and limits at the container level, or just scheduler-level allocation? - Are resources CPU-only, or also memory/GPU? And how are they determined (params, service, DB)? If you can share that, I can tailor an example to your stack.
c
@Nate prefect task The idea is to be able to run a prefect task in a runtime with specific resources. This normally mean execute a command in the task runtime. but this command, depending on what it is, requires different resoruces. the task is parametrised to be generic. See a task something like
Copy code
@task
def simulate_scenario(...): ...
where depending on what you simulate it requires different resources. the combinations of resources and scenarios are too big to have a specialized task for that combination
n
are you bound to using tasks in some way? could
simulate_scenario
just as easily be a flow? https://docs.prefect.io/v3/advanced/submit-flows-directly-to-dynamic-infrastructure#submitting-workflows-to-specific-infrastructure it sounds like this (beta) approach might be useful
this is effectively convenient short-hand for • setting up a deployment with a work pool that defines a job infra template • kicking off that deployment but doing that on the fly, because maybe your infra spec depends on runtime values
💯 1
im assuming here that the tasks you're talking about are not going to run in the parent process, based on what you said, if they are, these suggestions are overcomplicated 🙂
c
Thanks @Nate no, it can by anything. I used task since I need/strongly prefer it to be a runtime different from the parent flow, but it can be a subflow as well. the resource requirements are decided runtime (maybe decided before run in some configuration but definitively somewhere decoupled from prefect and injected into it) one of the other limitations, which I didn't mention to try simplify Marvin questions is we are using a HPC grid hosted in-house. We submit jobs to it booking cpu,memory and other bits, via grid CLI ATM. THe only solution to inteface with it, with prefect, is dask + jobqueue (SGECluster). the problem with jobqueue SGECluster is that it pre-allocates workers. So all workers in a same cluster have the same amount of resources allocated. I need many clusters to repesent different "resource configurations" But also it means that dask preallocates the resources and run workers against that, so that resource is now booked. It's not dynamic scheduling. One solution was to use a flow calling a subflow. The subflow is called passing a lazy cluster instantiation
Copy code
@flow
def simulate_scenarios(...): ...

@flow
def orchestrate(...):
   cluster_definition = create_cluster_definition(...based on resources required...)
   resourced_simulation = simulate_scenarios.with_options(task_runner=cluster_definition)
   ...
or something like that. But I was trying to understand if this is a common enough use case to have better solutions (e.g. more idiomatic)