I have my flows all converted from prefect core to...
# ask-community
r
I have my flows all converted from prefect core to running in prefect server and coming back to getting some parallelism I thought that this:
Copy code
from time import sleep

import prefect
from prefect import Flow, task
from prefect.executors import LocalDaskExecutor, DaskExecutor
from prefect.storage import Docker


@task
def slow_task(item, sleep_time=4):
    logger = prefect.context.get('logger')
    <http://logger.info|logger.info>(f"==== IN TASK item {item} sleeping {sleep_time}")
    sleep(sleep_time)
    <http://logger.info|logger.info>(f"## Awake {item}")
    return item


@prefect.task
def produce_range(rangelen):
    return range(rangelen)


with Flow("parallel") as flow:
    nrange = produce_range(6)
    numbers = slow_task.map(item=nrange)

flow.executor = LocalDaskExecutor(workers=10, scheduler="processes")
#flow.executor = DaskExecutor()

if 'local_script_path' in prefect.context:
    flow.storage = Docker(registry_url='<http://old7.mianos.com:5000|old7.mianos.com:5000>')


if __name__ == '__main__':
    flow.run()
Would start 10 processes inside the docker container running, returning after a few seconds but it executes each task serially unless I run it without any arguments to LocalDaskExecutor. Under k8s DaskExecutor seems to start a few threads but nowhere near enough to what I want (30+). I feel I am missing something.
also if I use:
Copy code
flow.executor = DaskExecutor(cluster_kwargs={
    "n_workers": 10
})
There are still only 4 threads created, is there a spelling mistake there?
n
Hi @Rob Fowler - I think the correct arg for LocalDaskExecutor is
num_workers
, try this:
Copy code
LocalDaskExecutor(scheduler="threads", num_workers=10)
r
oh, I should point out, if I run this locally, ie python flow.py, it works perfectly, 10 processes. All the mapped flows run at the same time. That is why I think something is wrong.
n
@Rob Fowler - it works correctly in which scenario?
For context, here's where the prefect executor spins up the local dask scheduler: https://github.com/PrefectHQ/prefect/blob/5de58efaba956b431335d99acab07eaf6a362e1b/src/prefect/executors/dask.py#L528
r
works fine locally running it on the command line python flow.py, as above. But it is probably this wrong name. It works locally as the argument is passed to Dask as kwargs. I'll fix it and try when I get to work. Thanks.
n
Sounds good!
r
All good. I have updated all my flows. I think the old variable was from ages ago.
n
Great! Glad you got it sorted 👍