Hi, I used prefect some time ago (~8 months), and ...
# prefect-getting-started
m
Hi, I used prefect some time ago (~8 months), and used to leverage Prefect Cloud, have an agent running on a VM and running flows on Google Cloud Run, I used to deploy flows via the
Deployment.build_from_flow
method. I'm seeing what seem to be some conceptual differences between how Prefect works now and how it used to and think that I'm missing something. I now have a worker pool deployed to my k8s cluster (which replaces the Agent if I understood correctly) and would like to have my flows be picked by this worker and run either serverless on Google Cloud Run or as k8s jobs in the same cluster. Below an example of a deployment:
Copy code
from prefect.deployments import Deployment
from prefect_gcp.cloud_storage import GcsBucket
from prefect_gcp.cloud_run import CloudRunJob
from prefect.client.schemas.schedules import CronSchedule
from dataflows.flows.hello_flow import hello_flow

storage = GcsBucket.load("prefect-storage")
infrastructure = CloudRunJob.load("cloud-run-default")


def deploy_hello_flow():
    # Trigger deployment
    deployment = Deployment.build_from_flow(
        flow=hello_flow,
        name="hello_flow_deployment",
        work_queue_name="default", # k8s work pool
        storage=storage, # GCS storage
        path="hello_flow",
        tags=["staging"],
        infrastructure=infrastructure, # Cloud Run Job
        schedule=CronSchedule(cron="0 12 1 * *", timezone="UTC"),
    )
    deployment.apply()
Is this approach still correct or am I missing something?
1
in the
Flow.deploy
method I don't see where I'd configure the infrastructure where the flow will run - and I don't think the
serve
method is what I'm looking for either https://docs.prefect.io/latest/api-ref/prefect/flows/#prefect.flows.Flow.deploy
n
hi @Miguel Moncada -
.deploy
is the successor to
build_from_flow
the infra info is defined with your work pool, which you reference by name with .deploy
so for example, you'd create a k8s work pool and reference it in
.deploy
> work_queue_name="default", # k8s work pool work queues and pools are not quite the same thing. a work pool can have many queues, where queues can have priorities to stagger delivery of work in general you often only need the default queue though
m
thanks @Nate but then the flow itself would run on the k8s cluster? am I misunderstanding that it should be possible to have my work pool listening to this deployment and make it run elsewhere?
For what I see if I use the k8s work pool the flow will run as a k8s job, and if I use the serverless Cloud Run work pool then it'll run as a Cloud Run job, is that it?
n
For what I see if I use the k8s work pool the flow will run as a k8s job, and if I use the serverless Cloud Run work pool then it'll run as a Cloud Run job, is that it?
yep pretty much! sorry i hadnt noticed that you had both of these lines
Copy code
def deploy_hello_flow():
    # Trigger deployment
        ...
        work_queue_name="default", # k8s work pool
        ...
        infrastructure=infrastructure, # Cloud Run Job
basically the work pool has taken the role of the
infrastructure
block. all that config for each type (k8s, cloud run etc) lives on the block and then each deployment can reference that config via
work_pool_name
and override what it needs to for job variables like
image
cpu
etc so instead of the above you'd have something like this
m
thanks @Nate got it now, I'm interested on how to override the limits/requests in each deployment, not sure if you have any examples on this available
env
is an example of a job variable thats on every work pool, but overrides work the same way for job variables like
image
cpu
etc
m
nice, I'll give it a try
catjam 1
thanks a lot for your help so far
n
sure thing!
m
@Nate perhaps I'm missing something, from my test:
Copy code
hello_flow.deploy(
        name="hello_flow_k8s_deployment",
        work_pool_name="default-worker-pool",
        image="europe-docker.pkg.dev/cambium-earth/prefect-runners-prod/default:latest",
        tags=["prod"],
        schedule=CronSchedule(cron="0 12 1 * *", timezone="UTC"),
        job_variables={"resources": {"limits": {"cpu": "1200m", "memory": "8Gi"}}},
    )
From the UI I can see:
but when I check the associated job's pod definition the
resources
section is empty:
Copy code
image: europe-docker.pkg.dev/cambium-earth/prefect-runners-prod/default:latest
    imagePullPolicy: IfNotPresent
    name: prefect-job
    resources: {}
maybe because it is not in the template by default?
n
yeah
resources
is not a job variable directly, you can add it (or
cpu_request
like in that example) to the variables / template as suggested by the section of the docs you linked
thanks @Nate!