Rob Fowler
04/19/2021, 11:46 AMfrom time import sleep
import prefect
from prefect import Flow, task
from prefect.executors import LocalDaskExecutor, DaskExecutor
from prefect.storage import Docker
@task
def slow_task(item, sleep_time=4):
logger = prefect.context.get('logger')
<http://logger.info|logger.info>(f"==== IN TASK item {item} sleeping {sleep_time}")
sleep(sleep_time)
<http://logger.info|logger.info>(f"## Awake {item}")
return item
@prefect.task
def produce_range(rangelen):
return range(rangelen)
with Flow("parallel") as flow:
nrange = produce_range(6)
numbers = slow_task.map(item=nrange)
flow.executor = LocalDaskExecutor(workers=10, scheduler="processes")
#flow.executor = DaskExecutor()
if 'local_script_path' in prefect.context:
flow.storage = Docker(registry_url='<http://old7.mianos.com:5000|old7.mianos.com:5000>')
if __name__ == '__main__':
flow.run()
Would start 10 processes inside the docker container running, returning after a few seconds but it executes each task serially unless I run it without any arguments to LocalDaskExecutor.
Under k8s DaskExecutor seems to start a few threads but nowhere near enough to what I want (30+).
I feel I am missing something.Rob Fowler
04/19/2021, 12:01 PMflow.executor = DaskExecutor(cluster_kwargs={
"n_workers": 10
})
There are still only 4 threads created, is there a spelling mistake there?nicholas
num_workers
, try this:
LocalDaskExecutor(scheduler="threads", num_workers=10)
Rob Fowler
04/19/2021, 9:30 PMnicholas
nicholas
Rob Fowler
04/19/2021, 10:32 PMnicholas
Rob Fowler
04/20/2021, 3:39 AMnicholas