Arsenii
09/10/2020, 9:38 AMfrom random import randrange
from prefect import task, Flow, Parameter
from prefect.engine.executors import DaskExecutor
@task
def random_num(stop):
logger = prefect.context.get("logger")
number = randrange(stop)
time.sleep(5) # just so that we can tell by looking at logs
print(f"Your number is {number}")
return number
@task
def sum_numbers(numbers):
print(sum(numbers))
with Flow("parallel-execution") as flow:
stop = Parameter("stop")
number_1 = random_num(stop)
number_2 = random_num(stop)
number_3 = random_num(stop)
sum_numbers = sum_numbers(numbers=[number_1, number_2, number_3])
Now, it does run the flow concurrently if I specify:
if __name__ == "__main__":
state = flow.run(parameters={"stop": 5}, DaskExecutor())
But it runs task sequentially if I set the executor before running, either with
flow.environment.executor = DaskExecutor(cluster_kwargs={"n_workers": 3})
or
with Flow(
"parallel-execution", environment=LocalEnvironment(executor=DaskExecutor())
) as flow:
The interesting thing is, if I print(flow.environment.executor)
in the last two cases, it does show DaskExecutor, but tasks are still being executed in order.
All of the above was done locally, but the problem persists even if I try to deploy it to Cloud. Any advice is very appreciated, I'm not experienced with Dask that muchemre
09/10/2020, 10:08 AMflow.run()
scenarios.
No idea about the cloud though, it should adhere to the environment and its executor.Arsenii
09/10/2020, 10:23 AMflow_object.storage = Docker(
...
)
flow_object.environment = RemoteEnvironment(labels=["some_label"])
flow_object.register(build=True)
The thread linked above was actually very relevant, so much for my reading comprehension, haha. The fix would be to add executor to this RemoteEnvironment and finalize it before the Storage, I think