M. Siddiqui
01/06/2022, 11:35 AMAnna Geller
Anna Geller
M. Siddiqui
01/06/2022, 11:45 AMAnna Geller
from prefect.executors import LocalDaskExecutor
flow.executor = LocalDaskExecutor()
M. Siddiqui
01/06/2022, 11:48 AMM. Siddiqui
01/07/2022, 1:09 PMfrom random import randrange
import time
from prefect import task, Flow, Parameter
from prefect.executors import LocalDaskExecutor
EXECUTOR_TYPE = LocalDaskExecutor(scheduler="threads", num_workers=8)
@task(tags=["dbt_shell"])
def random_num(stop):
number = randrange(stop)
time.sleep(10)
print(f"Your number is {number}")
return number
@task()
def sum_numbers(numbers):
print(sum(numbers))
with Flow("parallel-execution") as flow:
stop = Parameter("stop", 1000)
number_1 = random_num(stop)
number_2 = random_num(stop)
number_3 = random_num(stop)
number_4 = random_num(stop)
number_5 = random_num(stop)
number_6 = random_num(stop)
number_7 = random_num(stop)
number_8 = random_num(stop)
number_9 = random_num(stop)
number_10 = random_num(stop)
number_11 = random_num(stop)
number_12 = random_num(stop)
number_13 = random_num(stop)
number_14 = random_num(stop)
number_15 = random_num(stop)
sum_numbers = sum_numbers(numbers=[
number_1,
number_2,
number_3,
number_4,
number_5,
number_6,
number_7,
number_8,
number_9,
number_10,
number_11,
number_12,
number_13,
number_14,
number_15,
])
But even though these tasks seem to be understood by Prefect as parallel tasks. They run sequentially, one after the other.
I'm running these tasks via an ECSAgent and as you can see in the image, the container assigned is allotted 4 vcpus and 8 GB RAM just to test out that the system isn't throttled because of lack of threads.
I can't really figure out what the issue is.
Any idea how I could debug this ?M. Siddiqui
01/07/2022, 1:12 PMM. Siddiqui
01/07/2022, 1:12 PMAnna Geller
with Flow("parallel-execution", executor=EXECUTOR_TYPE) as flow:
Anna Geller
M. Siddiqui
01/07/2022, 1:18 PMM. Siddiqui
01/07/2022, 1:38 PMAnna Geller
M. Siddiqui
01/07/2022, 1:45 PMAnna Geller