https://prefect.io logo
Title
c

Cab Maddux

02/14/2020, 1:42 PM
Hi! I'm running flows using the k8s executor/agent in staging/production environment and local executor in development environment. For many workloads have the primary flow trigger work either: 1. If in staging/production environment - in a separate k8s job with a createNamespacedJob task followed by a custom waitForNamespacedJob task. 2. If in development environment - using docker createContainer, runContainer, waitOnContainer tasks Sometime these are created based on mapped inputs so work can be done in parallel - but obviously need some limits. In staging/production I can use tags for task concurrency limits which is all good 👍. How would you recommend I limit task concurrency when in development environment creating/running/waiting on containers?
j

josh

02/14/2020, 2:15 PM
Hey @Cab Maddux for your local development if you're using the Local Executor then tasks should only happen synchronously and therefore won't have any concurrency
b

Braun Reyes

02/14/2020, 3:22 PM
I found you can also use the dask executor locally and leverage executor kwargs to tune concurrency for cpu/io bound workloads
if __name__ == '__main__':
    from prefect.engine.executors import DaskExecutor
    flow.schedule = None
    executor = DaskExecutor(n_workers=1, threads_per_worker=3)
    flow.run(executor=executor)
this would be 1 cpu core and 3 thread(io bound)
c

Cab Maddux

02/14/2020, 3:57 PM
Thanks @josh and @Braun Reyes. I'll look at maybe trying the Dask executor locally. For local synchronous execution since I'm using a runContainer task, although the actual tasks will trigger synchronously, the running of the containers will happen concurrently.
b

Braun Reyes

02/14/2020, 3:58 PM
threads per worker should work really well since the tasks are not really doing any CPU intensive operations
c

Cab Maddux

02/14/2020, 4:00 PM
Aah true, sounds good I'll try it @Braun Reyes. Now that I think about it, I'm not going to be able to control task concurrency then with tags on k8s either I guess (even if I can only create 3 k8s jobs concurrently, I could have an unlimited number running at any given time). Maybe the solution is to create a single task that runs a job/container to completion. That way they will run synchronously (or with limited concurrency if running Dask) locally and I'll have parallelism remotely