Hi all, I'm trying to debug DaskExecutor which ref...
# prefect-community
a
Hi all, I'm trying to debug DaskExecutor which refuses to run task concurrently. There is a similar thread but reading it didn't help. A very simple example for reproduction from that thread:
Copy code
from random import randrange
from prefect import task, Flow, Parameter
from prefect.engine.executors import DaskExecutor


@task
def random_num(stop):
    logger = prefect.context.get("logger")
    number = randrange(stop)

    time.sleep(5)  # just so that we can tell by looking at logs

    print(f"Your number is {number}")
    return number


@task
def sum_numbers(numbers):
    print(sum(numbers))


with Flow("parallel-execution") as flow:
    stop = Parameter("stop")

    number_1 = random_num(stop)
    number_2 = random_num(stop)
    number_3 = random_num(stop)

    sum_numbers = sum_numbers(numbers=[number_1, number_2, number_3])
Now, it does run the flow concurrently if I specify:
Copy code
if __name__ == "__main__":
    state = flow.run(parameters={"stop": 5}, DaskExecutor())
But it runs task sequentially if I set the executor before running, either with
Copy code
flow.environment.executor = DaskExecutor(cluster_kwargs={"n_workers": 3})
or
Copy code
with Flow(
    "parallel-execution", environment=LocalEnvironment(executor=DaskExecutor())
) as flow:
The interesting thing is, if I
print(flow.environment.executor)
in the last two cases, it does show DaskExecutor, but tasks are still being executed in order. All of the above was done locally, but the problem persists even if I try to deploy it to Cloud. Any advice is very appreciated, I'm not experienced with Dask that much
e
AFAIK environment is a parameter that only concerns cloud/server registered runs, and is ignored for local
flow.run()
scenarios. No idea about the cloud though, it should adhere to the environment and its executor.
a
Thank you for the tip, it made me pay more attention to the way we deploy (via an automated script) flows to Cloud. Turns out it was overriding the default environment:
Copy code
flow_object.storage = Docker(
        ...
)
flow_object.environment = RemoteEnvironment(labels=["some_label"])
flow_object.register(build=True)
The thread linked above was actually very relevant, so much for my reading comprehension, haha. The fix would be to add executor to this RemoteEnvironment and finalize it before the Storage, I think
👍 1
The Cloud's UI is actually helpfully describing the Environment attached to the flow (thanks, Prefect team!) and I didn't notice that at first
🚀 1
So I believe this resolves my problem, thanks again @emre