M. Siddiqui

    M. Siddiqui

    8 months ago
    Hello everyone ! Hope you guys had a great start to the year 😄 I was trying to run a flow with over 250+ tasks on Prefect Cloud Free Plan. It was painfully slow since only 1 task was running at any given time. I know we can set task concurrency limits on the Standard Plan, but what is the limit on the Free plan ? I couldn't find any docs related to that.
    Anna Geller

    Anna Geller

    8 months ago
    I’m not aware of any limits with regards to the task and flow concurrency. But your problem seems to be the opposite, you want to run tasks in parallel?
    M. Siddiqui

    M. Siddiqui

    8 months ago
    @Anna Geller thanks for responding back. So by definition, Prefect Cloud imposes no concurrency limits on tasks running in parallel ? And if I want to impose a limit, then I would opt for the Standard Plan ?
    Anna Geller

    Anna Geller

    8 months ago
    Correct to 1. Wrong to 2. To impose limits, you would need to set the concurrency limits as described in the docs. If you want to run your tasks in parallel you would need to attach one of Dask executors e.g.
    from prefect.executors import LocalDaskExecutor
    
    flow.executor = LocalDaskExecutor()
    M. Siddiqui

    M. Siddiqui

    8 months ago
    Understood, thank you so much !
    @Anna Geller I am trying to figure out why my tasks are not running in parallel. To create a sample test flow, I used the one available on Prefect's Docs and tweaked it a little.
    from random import randrange
    import time
    
    from prefect import task, Flow, Parameter
    from prefect.executors import LocalDaskExecutor
    
    EXECUTOR_TYPE = LocalDaskExecutor(scheduler="threads", num_workers=8)
    
    @task(tags=["dbt_shell"])
    def random_num(stop):
        number = randrange(stop)
        time.sleep(10)
        print(f"Your number is {number}")
        return number
    
    @task()
    def sum_numbers(numbers):
        print(sum(numbers))
    
    with Flow("parallel-execution") as flow:
        stop = Parameter("stop", 1000)
    
        number_1 = random_num(stop)
        number_2 = random_num(stop)
        number_3 = random_num(stop)
        number_4 = random_num(stop)
        number_5 = random_num(stop)
        number_6 = random_num(stop)
        number_7 = random_num(stop)
        number_8 = random_num(stop)
        number_9 = random_num(stop)
        number_10 = random_num(stop)
        number_11 = random_num(stop)
        number_12 = random_num(stop)
        number_13 = random_num(stop)
        number_14 = random_num(stop)
        number_15 = random_num(stop)
    
        sum_numbers = sum_numbers(numbers=[
            number_1,
            number_2,
            number_3,
            number_4,
            number_5,
            number_6,
            number_7,
            number_8,
            number_9,
            number_10,
            number_11,
            number_12,
            number_13,
            number_14,
            number_15,
        ])
    But even though these tasks seem to be understood by Prefect as parallel tasks. They run sequentially, one after the other. I'm running these tasks via an ECSAgent and as you can see in the image, the container assigned is allotted 4 vcpus and 8 GB RAM just to test out that the system isn't throttled because of lack of threads. I can't really figure out what the issue is. Any idea how I could debug this ?
    I have a separate registration script which reads off which executor is mentioned within the file and attaches that to the flow before registering it. This makes it easier for me to run it locally via different params compared to that on production
    @Kevin Kho
    Anna Geller

    Anna Geller

    8 months ago
    you didn’t attach the executor to your flow - here is how you can fix it:
    with Flow("parallel-execution", executor=EXECUTOR_TYPE) as flow:
    the way you attach the executor might be a problem - you said you attach it in a separate script, but the problem with this approach is that Prefect doesn’t store the executor information in the backend, it reads it from a flow storage at runtime. So your Flow object must have the executor attached to it and stored in the flow storage to take effect
    M. Siddiqui

    M. Siddiqui

    8 months ago
    hmmm, I see ! Let me try this out, that maybe the issue indeed !
    That was indeed the issue, thanks a lot for your help 🙌 Just to confirm, what about flow storage and run_config ? Are they also required during run time ? Because it doesn't seem like it does.
    Anna Geller

    Anna Geller

    8 months ago
    Nice work! Only executor is retrieved from storage, run config and storage metadata are stored in the backend
    M. Siddiqui

    M. Siddiqui

    8 months ago
    Yes that makes sense, should have asked you much sooner, would have saved so many hours yesterday 😅 I'm not a 100% certain, is this information regarding the executor being a run time dependency documented ? Because I feel people would often create some sort of different strategy for registering their flows on production compared to running and testing them out locally ...
    Anna Geller

    Anna Geller

    8 months ago
    Correct, users may have different run config for various environments (e.g. dev vs prod), but the executor is usually the same for both. Afaik, the only reason it’s not serialized along with the Flow is because it can contain private information like the Dask scheduler address so we don’t keep it.