I'm trying to get prefect running with Dask, using...
# prefect-community
j
I'm trying to get prefect running with Dask, using Prefect Cloud. I'm assuming here that Prefect Cloud is whatever runs or helps running flows that are shown in the prefect.io UI. I have some questions about terminology, which could mostly be summarized by asking "What exactly do flow runners, executors, agents do? What is the timeline of their communication with each other? Where do they run?" FlowRunners handle the execution of Flows and determine the State of a Flow before, during and after the Flow is run. A flow's Executor is responsible for running tasks in a flow. During execution of a flow run, a flow's executor will be initialized, used to execute all tasks in the flow, then shutdown. Agents orchestrate flow runs. Agents start and monitor flow runs. During operation the agent process queries the Prefect API for any scheduled flow runs, and allocates resources for them on their respective deployment platforms. So, when I call flow.run() with a flow that’s configured to run using Prefect cloud resources and coiled/dask, where is the FlowRunner? Is it my local machine, the process that called flow.run()? Or the cli, with which I launched a flow? In Prefect Cloud, there will be a flow visible on the UI, waiting for an agent with the right flags to connect. What do you call the process that is waiting for an agent to connect? If it’s the flow runner, then what do I call the local process that called flow.run()? If an agent does connect to Prefect Cloud and starts orchestrating the flow, what does it launch? An executor? A flow runner? Where does it run them? How does the flow runner learn the location of the executor? According to documentation, the flow runner loops over tasks and sends them to an executor. I’m guessing the executor then finds resources on which to launch TaskRunners. If it’s a DaskExecutor, I’m guessing it looks for Dask workers I’ve provided. However, it seems as though it does not launch dask workers, because that’s what an Agent does. If Agents allocate resources, and Dask is a recommended source of resources, why is there no DaskAgent? Why only Docker, Kubernetes, Vertex, and ECS?
k
The FlowRunner is the class that traverses the DAG and decides what tasks to run. It’s mainly an internal class and you shouldn’t need to interact with it or manipulate it. Using 
flow.run()
 uses the FlowRunner, but a cloud-backed run uses a 
CloudFlowRunner
 that is configured a bit differently, like where to send logs to. The FlowRunner submits the tasks to an executor. If you use the default 
LocalExecutor
, execution is just single-threaded. If you use a 
LocalDaskExecutor
 it’s like the FlowRunner is doing 
pool.submit()
 and if you use 
DaskExecutor
, it’s like the FlowRunner is responsible for doing 
client.submit()
So if an upstream task fails, it’s the FlowRunner that decides not to run the downstream tasks. So if I have 
MyFlow
 and I am iterating locally, and I use 
flow.run()
, Prefect Cloud has absolutely no idea about that because it is the normal 
FlowRunner
. RunConfig and Storage are not looked at for 
flow.run()
, only Executor is. In this case, the FlowRunner is the process created by doing 
flow.run()
. So 
flow.run()
 is absolutely not related to Agents at all and not related to Prefect Cloud. If you are purely using 
flow.run()
 (which is not recommended), then that is what we refer to as 
Prefect Core
 (edited)  So your paragraph on agents is pretty right. The agent simply polls Prefect Cloud every 10 seconds and looks for Flows to pull. If the agent finds something, it will deploy a Flow. For Local agent, this is a Python process. For Docker agent, it’s a new container. For Kubernetes agent, it’s a new pod. Whichever you use, the FlowRunner will begin, and as part of start up, spin up the defined executor. Once that executor is spun up, it will do the task submission (
pool.submit()
 for 
LocalDaskExecutor
 and 
client.submit()
 for 
DaskExecutor
). For 
LocalExecutor
, it’s just like calling a decorated function. When the agent picks up a Flow, it loads it from 
Storage
 (
Github
 , 
S3
Docker
 ). There is a file saved somewhere to load the code. It is in this file it pulls the executor from. We don’t store the executor when you register a Flow (because the Dask client address can be private). In the scenario where you have a Dask cluster, the FlowRunner spins up the Dask 
Client
 and uses 
client.submit()
 to send tasks to the Dask cluster. The 
FlowRunner
 process has the 
client
. It’s not the agent that launches the Dask cluster. It’s the process that the agent that spins up that does (FlowRunner) As a quick example to digest that, if I have a Docker agent that picks up a Flow, the Docker agent spins up a new container, it loads the flow into that container and starts execution (FlowRunner). The FlowRunner then starts the Executor and can spin up a Dask cluster. As it spins it up, it holds the 
client
, and uses the 
client
 to submit tasks. Now to your final question:
Copy code
If Agents allocate resources, and Dask is a recommended source of resources, why is there no DaskAgent? Why only Docker, Kubernetes, Vertex, and ECS?
This is hard to answer because before 0.14.0, the set-up was similar to as you described. Dask was coupled into the configuration of the Flow. For example, you can check the legacy 
DaskKubernetesEnvironment
 documentation here. The new set-up decouples how spin-up a Flow versus execution. I presume this also opens the door to add other executors (Ray, Spark) and now you can mix and match. With the previous setup, it was coupled as a pair and harder to maintain and use. There is also a difference in the agent in that it spins up the resources for 1 specific process that will then spin the cluster up. So it makes the initial pod/container. The Dask configuration though is multiple machines and distributed.
upvote 1
💯 1
@Marvin archive “How do the FlowRunner, Agent, and Executor work together?”