<@ULVA73B9P> What exactly is happening when the wo...
# ask-marvin
k
@Marvin What exactly is happening when the worker runs a flow? How the logic inside the flow is triggered? If possible, provide the links to the source code
m
thought for 265.3 seconds
Great question. Here’s what happens in Prefect 3.x when a worker runs a flow from a deployment, and how your flow logic actually gets invoked. High-level flow of events - A deployment stores: - Source location (e.g., Git repo or local path) and an entrypoint like
path/to/file.py:my_flow
- Work pool name and optional job variables - A worker (e.g., “Process” worker) is started against a work pool and continuously polls the Prefect API for scheduled flow runs. - When a scheduled flow run is claimed: 1) The worker merges the work pool’s base job template with the deployment’s job variables to create a job configuration (infra settings, env vars, working directory, etc.). 2) The code for the flow is made available locally: - If the deployment was created with
flow.from_source(...)
, the engine uses a RunnerStorage (e.g., Git clone, remote download) to fetch the code into a working directory. 3) The worker starts the job. For a Process worker, it spawns a local subprocess; for Docker/Kubernetes workers, it creates and submits a container/pod with the same entrypoint/parameters. 4) Inside that execution environment, Prefect loads the flow object from the entrypoint and calls it with the run’s parameters. That call is what triggers your flow’s Python logic. 5) As your flow runs, tasks are orchestrated by the task engine, and states are reported back to the API. Where the logic is actually triggered - The flow object is loaded from the entrypoint string and converted to a `Flow`: - prefect/flows.py: load_flow_from_entrypoint, load_function_and_convert_to_flow - The engine runs the flow, often in a subprocess for isolation: - prefect/flow_engine.py: run_flow_in_subprocess, run_flow, run_flow_sync, load_flow_run, load_flow_and_flow_run - In practice, the engine does something conceptually like:
Copy code
flow = load_flow_from_flow_run(flow_run)
  # Optionally fetch code from source (git, remote storage) into a local directory
  proc = run_flow_in_subprocess(flow, flow_run=flow_run, parameters=flow_run.parameters)
  proc.join()
Key components and their source - Worker base and polling loop (claims scheduled flow runs, builds job config, submits execution): - prefect/workers/base.py - Process worker (executes the flow in a local subprocess using the engine): - prefect/workers/process.py - Worker CLI (what powers
prefect worker start
): - prefect/cli/worker.py - Flow loading and entrypoint resolution (turns
module.py:flow_fn
into a Flow object): - prefect/flows.py - Code fetching from sources used by
flow.from_source(...)
(git, local, remote, block adapters): - prefect/runner/storage.py - Engine that orchestrates the flow run (parameters, states, task execution): - prefect/flow_engine.py - Runtime context available inside your flow (e.g.,
prefect.runtime.flow_run
): - prefect/context.py What varies by worker type - Process worker: spawns a local subprocess and calls the flow engine directly. - Docker/Kubernetes workers: render a job/pod spec from the base job template and job variables, launch the container/pod, then the container entrypoint runs the same flow engine logic inside the container. If you want, tell me which worker type you’re using (Process, Docker, Kubernetes), and I can point you to the exact submission/launch code path for that worker.
k
@Marvin I'm using kubernetes workers
m
thought for 314.1 seconds
Awesome — here’s how Prefect’s Kubernetes worker runs your flow in 3.x, with source links. What the Kubernetes worker does - Polls your work pool for scheduled flow runs and claims them. - Renders a Kubernetes Job from the work pool’s base job template plus your deployment’s job_variables. - Creates the Job in your cluster and watches it until completion. - Inside the Job’s container, Prefect executes the flow run by invoking the CLI: - Command that runs inside the container:
Copy code
prefect flow-run execute <flow-run-id>
This command pulls the deployment metadata, fetches your code (e.g., from Git/S3) if needed, loads the entrypoint (
module.py:flow
), and runs the flow via the flow engine. Key code paths - Kubernetes worker (renders Job spec from the base job template and submits it): - prefect/workers/kubernetes.py - Look for classes like
KubernetesWorker
,
KubernetesJobConfiguration
, and variable models that define/validate job variables. - The worker builds the Job manifest from the base job template +
job_variables
, then uses the Kubernetes Python client to create the Job. - Flow-run execution inside the container: - CLI entrypoint that the container runs: - prefect/cli/flow_run.py (command:
prefect flow-run execute [ID]
) - Flow loading and entrypoint resolution: - prefect/flows.py (see
load_flow_from_entrypoint
,
load_flow_from_flow_run
) - Flow engine that orchestrates the run: - prefect/flow_engine.py (see
run_flow
,
run_flow_in_subprocess
,
load_flow_run
) - Code fetching for deployments created with `flow.from_source(...)`: - prefect/runner/storage.py (e.g.,
GitRepository
,
RemoteStorage
,
LocalStorage
) How the Job gets rendered - The work pool has a base job template with placeholders (e.g., image, env, command, args, volumes). - When a flow run is claimed, the worker merges the template with the deployment’s
job_variables
to produce a concrete Kubernetes Job manifest. - The manifest’s container section includes the command/args to run
prefect flow-run execute <flow-run-id>
and environment variables (e.g., Prefect API URL/token) so the engine can communicate with the API. - The worker creates the Job via the Kubernetes API and watches for completion; logs are streamed if configured. Docs you might find helpful - Kubernetes workers overview: - Docs: Workers (Kubernetes) - Customizing job variables (image, env, volumes, node selectors, etc.): - Docs: Customize job variables - Deployments and `flow.from_source(...).deploy(...)`: - Docs: Deployments If you want, share your job_variables or what you’re trying to customize (image, service account, volumes, etc.), and I can point to the exact fields in the Kubernetes worker’s template and show a working example.
k
@Marvin at which point the container with the application code is pulled from the registry?
m
thought for 227.0 seconds
Short answer: the image is pulled by the underlying container runtime when your flow run starts, after the job is submitted. Prefect does not pull images itself. Details by work pool/orchestrator: - Docker work pool: The worker asks the local Docker daemon to start a container; the Docker daemon pulls the image at container start (uses cache if available). - Kubernetes work pool: The worker submits a Job/Pod spec; the Kubelet on the node pulls the image when starting the pod. Timing/behavior is controlled by
imagePullPolicy
(e.g.,
IfNotPresent
vs
Always
) in your job template. - ECS work pool: The worker registers/runs a task; the ECS agent pulls the image when the task launches (auth via task execution role). - Push/serverless pools (e.g., Cloud Run, ACI): Prefect submits the job to the cloud API; the provider’s platform pulls the image as it initializes the execution environment. - Process work pool: No container/image involved. Notes: - If you need the latest image every run, use unique tags and/or set Kubernetes
imagePullPolicy: Always
. - Registry authentication is handled by the orchestrator (e.g., Docker login, K8s
imagePullSecrets
, ECS execution role), not by Prefect. Docs for reference: - Work pools overview: Work pools and workers - Kubernetes specifics (job templates, imagePullPolicy): Kubernetes work pools - Docker worker details: Prefect Docker integration - Managing work pools: Manage work pools Which work pool/orchestrator are you using? I can tailor exact settings and where to configure them.