https://prefect.io logo
r

Raphaël Riel

10/14/2020, 12:44 PM
Hi all! 👋 I’m facing some trouble executing parallel/concurrent Tasks on my agent. I have a Task that I start in my flow using
the_task.map(list_of_ints)
. The Task will have to Map >10 items. When A) running the flow directly from within the .py file using
flow.run()
AND B) setting
executor=LocalDaskExecutor()
I’m able to have it use more than 1 thread. But as soon as I try to run this Flow in an Agent OR if I remove the Dask Executor (While executing the .py file directly), I can’t make it run in parallel! Recap: 1. Execute the flow from .py file WITH
executor=LocalDaskExecutor()
= Works 2. Execute the flow from .py file with “default” Executor = Nope 3. Any combinaison of executor running in an agent = Nope Any suggestion will be welcome! Thanks.
j

josh

10/14/2020, 12:57 PM
Hi @Raphaël Riel it makes sense that the default executor will not run in parallel because it is a synchronous executor. When running with the agent how are you specifying the executor? Currently it has to be set in the flow’s environment like this: https://docs.prefect.io/orchestration/execution/local_environment.html#examples
r

Raphaël Riel

10/14/2020, 1:05 PM
Thanks for the hint Josh, I’ll try this right away!
I can’t manage to get this working. Here’s how I register the Flow:
And start my agent
When running the Flow within the Agent:
j

josh

10/14/2020, 2:31 PM
Hmm, running a flow like this w/ the local agent I am not seeing the same behavior:
Copy code
@task
def vals():
    return [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

@task
def printv(v):
    print(v)

with Flow("local-dask", environment=LocalEnvironment(executor=LocalDaskExecutor())) as f:
    v = vals()
    printv.map(v)
I see the tasks being executed in non-sequential order. I wonder if it has something to do with Dask thinking you only have one thread available. What happens when you set your executor to something like
LocalDaskExecutor(nthreads=4)
?
r

Raphaël Riel

10/14/2020, 2:37 PM
Running this directly as a Python file works just fine. Running the exact same code while it’s registered and ran under the agent makes it sequential.
j

josh

10/14/2020, 2:39 PM
Oh I think I know that’s happening. By registering your flow as a script you are saying that the flow object in that script is the true source however you are attaching the environment/executor to the time when you register and not in the actual script file. So when the flow goes to run it is pulled from the script and it doesn’t have the executor on the environment since the flow inside that script doesn’t have it attached
r

Raphaël Riel

10/14/2020, 2:45 PM
Works. EDIT: And I managed to remove the Env+Executor code from flow Registration process
Is this a good practice? (Either of Registration, Agent Start or Flow definition)
j

josh

10/14/2020, 2:47 PM
Yeah definitely a viable setup you have here for script based flow execution, you just need to make sure that the flow object is what you intend because registration merely creates a metadata representation of the flow structure 🙂
r

Raphaël Riel

10/14/2020, 2:50 PM
Perfect! That’s what I want: Single source of Truth, and repeatable deploys no matter if the target env is a Developer’s machine, a PR in GitHub to be tested or the prod’s code.
🚀 2
Thanks @josh! Always nice to have you around! 🙂
😄 2