https://prefect.io logo
Title
a

Arsenii

09/10/2020, 9:38 AM
Hi all, I'm trying to debug DaskExecutor which refuses to run task concurrently. There is a similar thread but reading it didn't help. A very simple example for reproduction from that thread:
from random import randrange
from prefect import task, Flow, Parameter
from prefect.engine.executors import DaskExecutor


@task
def random_num(stop):
    logger = prefect.context.get("logger")
    number = randrange(stop)

    time.sleep(5)  # just so that we can tell by looking at logs

    print(f"Your number is {number}")
    return number


@task
def sum_numbers(numbers):
    print(sum(numbers))


with Flow("parallel-execution") as flow:
    stop = Parameter("stop")

    number_1 = random_num(stop)
    number_2 = random_num(stop)
    number_3 = random_num(stop)

    sum_numbers = sum_numbers(numbers=[number_1, number_2, number_3])
Now, it does run the flow concurrently if I specify:
if __name__ == "__main__":
    state = flow.run(parameters={"stop": 5}, DaskExecutor())
But it runs task sequentially if I set the executor before running, either with
flow.environment.executor = DaskExecutor(cluster_kwargs={"n_workers": 3})
or
with Flow(
    "parallel-execution", environment=LocalEnvironment(executor=DaskExecutor())
) as flow:
The interesting thing is, if I
print(flow.environment.executor)
in the last two cases, it does show DaskExecutor, but tasks are still being executed in order. All of the above was done locally, but the problem persists even if I try to deploy it to Cloud. Any advice is very appreciated, I'm not experienced with Dask that much
e

emre

09/10/2020, 10:08 AM
AFAIK environment is a parameter that only concerns cloud/server registered runs, and is ignored for local
flow.run()
scenarios. No idea about the cloud though, it should adhere to the environment and its executor.
a

Arsenii

09/10/2020, 10:23 AM
Thank you for the tip, it made me pay more attention to the way we deploy (via an automated script) flows to Cloud. Turns out it was overriding the default environment:
flow_object.storage = Docker(
        ...
)
flow_object.environment = RemoteEnvironment(labels=["some_label"])
flow_object.register(build=True)
The thread linked above was actually very relevant, so much for my reading comprehension, haha. The fix would be to add executor to this RemoteEnvironment and finalize it before the Storage, I think
👍 1
The Cloud's UI is actually helpfully describing the Environment attached to the flow (thanks, Prefect team!) and I didn't notice that at first
🚀 1
So I believe this resolves my problem, thanks again @emre