Raphaël Riel

    Raphaël Riel

    1 year ago
    Hi all! 👋 I’m facing some trouble executing parallel/concurrent Tasks on my agent. I have a Task that I start in my flow using
    the_task.map(list_of_ints)
    . The Task will have to Map >10 items. When A) running the flow directly from within the .py file using
    flow.run()
    AND B) setting
    executor=LocalDaskExecutor()
    I’m able to have it use more than 1 thread. But as soon as I try to run this Flow in an Agent OR if I remove the Dask Executor (While executing the .py file directly), I can’t make it run in parallel! Recap:1. Execute the flow from .py file WITH
    executor=LocalDaskExecutor()
    = Works 2. Execute the flow from .py file with “default” Executor = Nope 3. Any combinaison of executor running in an agent = Nope Any suggestion will be welcome! Thanks.
    j

    josh

    1 year ago
    Hi @Raphaël Riel it makes sense that the default executor will not run in parallel because it is a synchronous executor. When running with the agent how are you specifying the executor? Currently it has to be set in the flow’s environment like this: https://docs.prefect.io/orchestration/execution/local_environment.html#examples
    Raphaël Riel

    Raphaël Riel

    1 year ago
    Thanks for the hint Josh, I’ll try this right away!
    I can’t manage to get this working. Here’s how I register the Flow:
    And start my agent
    When running the Flow within the Agent:
    j

    josh

    1 year ago
    Hmm, running a flow like this w/ the local agent I am not seeing the same behavior:
    @task
    def vals():
        return [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    
    @task
    def printv(v):
        print(v)
    
    with Flow("local-dask", environment=LocalEnvironment(executor=LocalDaskExecutor())) as f:
        v = vals()
        printv.map(v)
    I see the tasks being executed in non-sequential order. I wonder if it has something to do with Dask thinking you only have one thread available. What happens when you set your executor to something like
    LocalDaskExecutor(nthreads=4)
    ?
    Raphaël Riel

    Raphaël Riel

    1 year ago
    Running this directly as a Python file works just fine. Running the exact same code while it’s registered and ran under the agent makes it sequential.
    j

    josh

    1 year ago
    Oh I think I know that’s happening. By registering your flow as a script you are saying that the flow object in that script is the true source however you are attaching the environment/executor to the time when you register and not in the actual script file. So when the flow goes to run it is pulled from the script and it doesn’t have the executor on the environment since the flow inside that script doesn’t have it attached
    Raphaël Riel

    Raphaël Riel

    1 year ago
    Works. EDIT: And I managed to remove the Env+Executor code from flow Registration process
    Is this a good practice? (Either of Registration, Agent Start or Flow definition)
    j

    josh

    1 year ago
    Yeah definitely a viable setup you have here for script based flow execution, you just need to make sure that the flow object is what you intend because registration merely creates a metadata representation of the flow structure 🙂
    Raphaël Riel

    Raphaël Riel

    1 year ago
    Perfect! That’s what I want: Single source of Truth, and repeatable deploys no matter if the target env is a Developer’s machine, a PR in GitHub to be tested or the prod’s code.
    Thanks @josh! Always nice to have you around! 🙂