Is there anyway to batch mapped tasks? Specificall...
# prefect-server
j
Is there anyway to batch mapped tasks? Specifically I have a flow that will kick of many runs of another parametrised run but I need to minimise the number of concurrent runs
z
Hi Josh, there’s not a first-class way to do it right now but you can do something like this
Copy code
import time
import random

import prefect
from prefect import Flow, task, Parameter

MAX_SIZE = 30


@task
def dynamic_list_of_tasks(window_size):
    size = random.randint(window_size, MAX_SIZE)
    size += size % window_size  # Ensure it is divisable for the fixed window
    return [i for i in range(size)]


@task
def fixed_window(tasks: list, window_size: int):
    windowed = []
    while tasks:
        windowed.append(tasks[:window_size])
        tasks = tasks[window_size:]
    return windowed


@task
def display(i):
    <http://prefect.context.logger.info|prefect.context.logger.info>(f"Processing {i}")


window_size = Parameter("window_size", default=2)

with Flow("windowed-tasks") as flow:
    tasks = dynamic_list_of_tasks(window_size)
    windowed_tasks = fixed_window(tasks, window_size)
    display.map(windowed_tasks)

flow.run()
# flow.register("default")
👀 1
j
So I basically want to map
StartFlowRun
task for around 100 different param combinations but only want say 10 mak at a time to run
That looks perfect!
z
Ah I think you’re asking for a slightly different thing than this is
j
No I think I can make it work!
z
Well — this could still work actually.
😄
There’s a feature request for limiting the number of concurrent items in a map though
j
Will do!
Is there perhaps a way I can do it at the k8s level? Since my map is creating other flow runs and thus other pods - could I use Resource Quotas to limit the number of running jobs perhaps...
No that would just stop the agent from submitting new jobs not wait until finished...
z
Hmm the jobs could be submitted and they might wait until there are pod quotas available? That sounds feasible. I haven't tried anything like it though.;
j
If creating or updating a resource violates a quota constraint, the request will fail with HTTP status code 403 FORBIDDEN with a message explaining the constraint that would have been violated.
So I think not?
z
Jobs will attempt to create a pod forever I think though
j
Yes - so don't put the limit on the jobs but on the pods!
I don't think the above example quite does what I need but maybe something similar might...
Thanks!