Hello everyone ! Hope you guys had a great start t...
# ask-community
m
Hello everyone ! Hope you guys had a great start to the year 😄 I was trying to run a flow with over 250+ tasks on Prefect Cloud Free Plan. It was painfully slow since only 1 task was running at any given time. I know we can set task concurrency limits on the Standard Plan, but what is the limit on the Free plan ? I couldn't find any docs related to that.
a
I’m not aware of any limits with regards to the task and flow concurrency. But your problem seems to be the opposite, you want to run tasks in parallel?
m
@Anna Geller thanks for responding back. So by definition, Prefect Cloud imposes no concurrency limits on tasks running in parallel ? And if I want to impose a limit, then I would opt for the Standard Plan ?
a
Correct to 1. Wrong to 2. To impose limits, you would need to set the concurrency limits as described in the docs. If you want to run your tasks in parallel you would need to attach one of Dask executors e.g.
Copy code
from prefect.executors import LocalDaskExecutor

flow.executor = LocalDaskExecutor()
upvote 1
m
Understood, thank you so much !
🙌 1
@Anna Geller I am trying to figure out why my tasks are not running in parallel. To create a sample test flow, I used the one available on Prefect's Docs and tweaked it a little.
Copy code
from random import randrange
import time

from prefect import task, Flow, Parameter
from prefect.executors import LocalDaskExecutor

EXECUTOR_TYPE = LocalDaskExecutor(scheduler="threads", num_workers=8)

@task(tags=["dbt_shell"])
def random_num(stop):
    number = randrange(stop)
    time.sleep(10)
    print(f"Your number is {number}")
    return number

@task()
def sum_numbers(numbers):
    print(sum(numbers))

with Flow("parallel-execution") as flow:
    stop = Parameter("stop", 1000)

    number_1 = random_num(stop)
    number_2 = random_num(stop)
    number_3 = random_num(stop)
    number_4 = random_num(stop)
    number_5 = random_num(stop)
    number_6 = random_num(stop)
    number_7 = random_num(stop)
    number_8 = random_num(stop)
    number_9 = random_num(stop)
    number_10 = random_num(stop)
    number_11 = random_num(stop)
    number_12 = random_num(stop)
    number_13 = random_num(stop)
    number_14 = random_num(stop)
    number_15 = random_num(stop)

    sum_numbers = sum_numbers(numbers=[
        number_1,
        number_2,
        number_3,
        number_4,
        number_5,
        number_6,
        number_7,
        number_8,
        number_9,
        number_10,
        number_11,
        number_12,
        number_13,
        number_14,
        number_15,
    ])
But even though these tasks seem to be understood by Prefect as parallel tasks. They run sequentially, one after the other. I'm running these tasks via an ECSAgent and as you can see in the image, the container assigned is allotted 4 vcpus and 8 GB RAM just to test out that the system isn't throttled because of lack of threads. I can't really figure out what the issue is. Any idea how I could debug this ?
I have a separate registration script which reads off which executor is mentioned within the file and attaches that to the flow before registering it. This makes it easier for me to run it locally via different params compared to that on production
@Kevin Kho
a
you didn’t attach the executor to your flow - here is how you can fix it:
Copy code
with Flow("parallel-execution", executor=EXECUTOR_TYPE) as flow:
the way you attach the executor might be a problem - you said you attach it in a separate script, but the problem with this approach is that Prefect doesn’t store the executor information in the backend, it reads it from a flow storage at runtime. So your Flow object must have the executor attached to it and stored in the flow storage to take effect
m
hmmm, I see ! Let me try this out, that maybe the issue indeed !
👍 1
That was indeed the issue, thanks a lot for your help 🙌 Just to confirm, what about flow storage and run_config ? Are they also required during run time ? Because it doesn't seem like it does.
a
Nice work! Only executor is retrieved from storage, run config and storage metadata are stored in the backend
1
m
Yes that makes sense, should have asked you much sooner, would have saved so many hours yesterday 😅 I'm not a 100% certain, is this information regarding the executor being a run time dependency documented ? Because I feel people would often create some sort of different strategy for registering their flows on production compared to running and testing them out locally ...
a
Correct, users may have different run config for various environments (e.g. dev vs prod), but the executor is usually the same for both. Afaik, the only reason it’s not serialized along with the Flow is because it can contain private information like the Dask scheduler address so we don’t keep it.
🙏 1