Hey all! Is there a way inside a flow using ConcurrentTaskRunner to specify "at most X tasks at once please". We've had a lot of issues using tag based concurrency (mostly slots not being released if a task crashes)
b
Bianca Hoch
12/05/2024, 12:25 AM
Hi Sam! The first thing that comes to mind as an alternative would be applying Global concurrency limits to your tasks
s
Samuel Hinton
12/05/2024, 1:53 AM
Hmm, its tough because we really only want to limit it for this single flow so we dont get a slap on the wrist re s3 rate limiting. An ideal API if this is ever considered would be something like:
Copy code
@flow(task_runner=ConcurrentTaskRunner(maximum_concurrency=10))
def my_flow:
for blah in blahs:
my_task.submit(blah)
Samuel Hinton
12/05/2024, 1:56 AM
Oh I was going to point out anyio has limiters that would do this, and I actually see there's even a TODO for this in the ConcurrentTaskRunner:
Copy code
# TODO: Consider adding `max_workers` support using anyio capacity limiters
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.