Joseph Mathes
02/10/2022, 1:12 AMKevin Kho
flow.run()
uses the FlowRunner, but a cloud-backed run uses a CloudFlowRunner
that is configured a bit differently, like where to send logs to.
The FlowRunner submits the tasks to an executor. If you use the default LocalExecutor
, execution is just single-threaded. If you use a LocalDaskExecutor
it’s like the FlowRunner is doing pool.submit()
and if you use DaskExecutor
, it’s like the FlowRunner is responsible for doing client.submit()
So if an upstream task fails, it’s the FlowRunner that decides not to run the downstream tasks.
So if I have MyFlow
and I am iterating locally, and I use flow.run()
, Prefect Cloud has absolutely no idea about that because it is the normal FlowRunner
. RunConfig and Storage are not looked at for flow.run()
, only Executor is. In this case, the FlowRunner is the process created by doing flow.run()
.
So flow.run()
is absolutely not related to Agents at all and not related to Prefect Cloud. If you are purely using flow.run()
(which is not recommended), then that is what we refer to as Prefect Core
(edited)
So your paragraph on agents is pretty right. The agent simply polls Prefect Cloud every 10 seconds and looks for Flows to pull. If the agent finds something, it will deploy a Flow. For Local agent, this is a Python process. For Docker agent, it’s a new container. For Kubernetes agent, it’s a new pod. Whichever you use, the FlowRunner will begin, and as part of start up, spin up the defined executor. Once that executor is spun up, it will do the task submission (pool.submit()
for LocalDaskExecutor
and client.submit()
for DaskExecutor
). For LocalExecutor
, it’s just like calling a decorated function.
When the agent picks up a Flow, it loads it from Storage
(Github
, S3
, Docker
). There is a file saved somewhere to load the code. It is in this file it pulls the executor from. We don’t store the executor when you register a Flow (because the Dask client address can be private).
In the scenario where you have a Dask cluster, the FlowRunner spins up the Dask Client
and uses client.submit()
to send tasks to the Dask cluster. The FlowRunner
process has the client
.
It’s not the agent that launches the Dask cluster. It’s the process that the agent that spins up that does (FlowRunner)
As a quick example to digest that, if I have a Docker agent that picks up a Flow, the Docker agent spins up a new container, it loads the flow into that container and starts execution (FlowRunner). The FlowRunner then starts the Executor and can spin up a Dask cluster. As it spins it up, it holds the client
, and uses the client
to submit tasks.
Now to your final question:
If Agents allocate resources, and Dask is a recommended source of resources, why is there no DaskAgent? Why only Docker, Kubernetes, Vertex, and ECS?
This is hard to answer because before 0.14.0, the set-up was similar to as you described. Dask was coupled into the configuration of the Flow. For example, you can check the legacy DaskKubernetesEnvironment
documentation here. The new set-up decouples how spin-up a Flow versus execution. I presume this also opens the door to add other executors (Ray, Spark) and now you can mix and match. With the previous setup, it was coupled as a pair and harder to maintain and use.
There is also a difference in the agent in that it spins up the resources for 1 specific process that will then spin the cluster up. So it makes the initial pod/container. The Dask configuration though is multiple machines and distributed.Marvin
02/10/2022, 2:24 AM