Miguel Moncada
05/07/2024, 4:10 PMDeployment.build_from_flow
method.
I'm seeing what seem to be some conceptual differences between how Prefect works now and how it used to and think that I'm missing something.
I now have a worker pool deployed to my k8s cluster (which replaces the Agent if I understood correctly) and would like to have my flows be picked by this worker and run either serverless on Google Cloud Run or as k8s jobs in the same cluster. Below an example of a deployment:
from prefect.deployments import Deployment
from prefect_gcp.cloud_storage import GcsBucket
from prefect_gcp.cloud_run import CloudRunJob
from prefect.client.schemas.schedules import CronSchedule
from dataflows.flows.hello_flow import hello_flow
storage = GcsBucket.load("prefect-storage")
infrastructure = CloudRunJob.load("cloud-run-default")
def deploy_hello_flow():
# Trigger deployment
deployment = Deployment.build_from_flow(
flow=hello_flow,
name="hello_flow_deployment",
work_queue_name="default", # k8s work pool
storage=storage, # GCS storage
path="hello_flow",
tags=["staging"],
infrastructure=infrastructure, # Cloud Run Job
schedule=CronSchedule(cron="0 12 1 * *", timezone="UTC"),
)
deployment.apply()
Is this approach still correct or am I missing something?Miguel Moncada
05/07/2024, 4:20 PMFlow.deploy
method I don't see where I'd configure the infrastructure where the flow will run - and I don't think the serve
method is what I'm looking for either https://docs.prefect.io/latest/api-ref/prefect/flows/#prefect.flows.Flow.deployNate
05/07/2024, 5:36 PM.deploy
is the successor to build_from_flow
the infra info is defined with your work pool, which you reference by name with .deployNate
05/07/2024, 5:44 PM.deploy
> work_queue_name="default", # k8s work pool
work queues and pools are not quite the same thing. a work pool can have many queues, where queues can have priorities to stagger delivery of work
in general you often only need the default queue thoughMiguel Moncada
05/08/2024, 6:35 AMMiguel Moncada
05/08/2024, 7:12 AMNate
05/08/2024, 2:55 PMFor what I see if I use the k8s work pool the flow will run as a k8s job, and if I use the serverless Cloud Run work pool then it'll run as a Cloud Run job, is that it?yep pretty much! sorry i hadnt noticed that you had both of these lines
def deploy_hello_flow():
# Trigger deployment
...
work_queue_name="default", # k8s work pool
...
infrastructure=infrastructure, # Cloud Run Job
basically the work pool has taken the role of the infrastructure
block. all that config for each type (k8s, cloud run etc) lives on the block and then each deployment can reference that config via work_pool_name
and override what it needs to for job variables like image
cpu
etc
so instead of the above you'd have something like thisMiguel Moncada
05/08/2024, 3:04 PMNate
05/08/2024, 3:08 PMNate
05/08/2024, 3:08 PMenv
is an example of a job variable thats on every work pool, but overrides work the same way for job variables like image
cpu
etcMiguel Moncada
05/08/2024, 3:21 PMMiguel Moncada
05/08/2024, 3:21 PMNate
05/08/2024, 3:21 PMMiguel Moncada
05/08/2024, 4:12 PMhello_flow.deploy(
name="hello_flow_k8s_deployment",
work_pool_name="default-worker-pool",
image="europe-docker.pkg.dev/cambium-earth/prefect-runners-prod/default:latest",
tags=["prod"],
schedule=CronSchedule(cron="0 12 1 * *", timezone="UTC"),
job_variables={"resources": {"limits": {"cpu": "1200m", "memory": "8Gi"}}},
)
From the UI I can see:Miguel Moncada
05/08/2024, 4:13 PMresources
section is empty:
image: europe-docker.pkg.dev/cambium-earth/prefect-runners-prod/default:latest
imagePullPolicy: IfNotPresent
name: prefect-job
resources: {}
Miguel Moncada
05/08/2024, 4:19 PMMiguel Moncada
05/08/2024, 4:19 PMNate
05/08/2024, 4:42 PMresources
is not a job variable directly, you can add it (or cpu_request
like in that example) to the variables / template as suggested by the section of the docs you linkedMiguel Moncada
05/09/2024, 1:18 PMMiguel Moncada
05/09/2024, 1:18 PM