<@ULVA73B9P> how can prefect trigger cloud run job...
# ask-marvin
v
@Marvin how can prefect trigger cloud run jobs that are already created?
m
thought for 83.0 seconds
Short answer: write a small Prefect task that calls the Cloud Run Jobs API (or shells out to gcloud) and run it in your flow. The jobs can stay exactly as they are. Two common approaches: 1) Use the Google Cloud Run Python client (recommended) - Auth: make sure your Prefect worker/runtime has ADC set (e.g.,
GOOGLE_APPLICATION_CREDENTIALS=/path/key.json
) and the service account can execute Cloud Run jobs. - Install dependency:
pip install google-cloud-run
Example:
Copy code
from prefect import flow, task
from google.cloud.run_v2 import JobsClient
# optional: from google.api_core.exceptions import NotFound

@task
def trigger_cloud_run_job(project_id: str, region: str, job_name: str) -> dict:
    client = JobsClient()
    name = f"projects/{project_id}/locations/{region}/jobs/{job_name}"
    op = client.run_job(name=name)
    execution = op.result()  # waits for the job to finish
    return {"execution_name": execution.name, "state": execution.state.name}

@flow
def run_job_flow(project_id: str, region: str, job_name: str):
    return trigger_cloud_run_job(project_id, region, job_name)

if __name__ == "__main__":
    print(run_job_flow("my-project", "us-central1", "my-job"))
- Docs: JobsClient (Python) - Cloud Run Jobs overview: Running Cloud Run jobs 2) Call gcloud from a Prefect task (simple and effective) - Ensure gcloud is installed and authenticated in the environment where the Prefect worker runs.
Copy code
import subprocess
from prefect import flow, task

@task
def run_job_via_gcloud(job: str, region: str, project: str, wait: bool = True):
    cmd = [
        "gcloud", "run", "jobs", "execute", job,
        f"--region={region}", f"--project={project}"
    ]
    if wait:
        cmd.append("--wait")
    subprocess.run(cmd, check=True)

@flow
def run_job_flow(project: str, region: str, job: str):
    run_job_via_gcloud(job, region, project)
Permissions to check (on the service account your Prefect worker uses): - Ability to execute Cloud Run jobs (commonly Cloud Run Developer or Admin) - If the job runs with its own service account, you may also need Service Account User on that account If you tell me your project/region and whether you want to wait for completion or fire-and-forget, I’ll tailor the snippet. If you’re running on Prefect 3.x (assumed), both approaches fit neatly into a deployment. For GCP-specific worker guidance, see Prefect GCP worker guide.
v
@Marvin additional context: • prefect server is hosted in GCE • prefect worker is deployed as a cloud run service per the cloud run worker guide you provided • service account linked to the worker on cloud run service has the required permissions • flow should wait for completion • use
my-project
and
us-west1
as project/region given the above: • what dependencies are required on the cloud run service, if prefecthq:3-latest image is used as base? • what should the flow.py contain, and how should the deployment be submitted for best practice? e.g. running a python script, or a shell script, or using prefect.yaml?
m
thought for 242.2 seconds
Great, that context helps. Here’s the clean path for your setup. Dependencies on your Cloud Run worker (base image prefecthq/prefect:3-latest) - Add the Google Cloud Run client: - python:
google-cloud-run
(this pulls in google-api-core, google-auth, grpc, etc.) - You do NOT need the gcloud CLI in the container. - You do NOT need a key file or GOOGLE_APPLICATION_CREDENTIALS on Cloud Run; ADC will use the attached service account automatically (which you’ve already granted permissions to). Example Dockerfile
Copy code
FROM prefecthq/prefect:3-latest
RUN pip install --no-cache-dir google-cloud-run
Flow code (waits for completion; project=my-project, region=us-west1)
Copy code
from prefect import flow, task, get_run_logger
from google.cloud.run_v2 import JobsClient

PROJECT_ID = "my-project"
REGION = "us-west1"

@task
def trigger_cloud_run_job(job_name: str) -> str:
    logger = get_run_logger()
    # Use regional endpoint to avoid 404s/misdirected requests
    client = JobsClient(client_options={"api_endpoint": f"{REGION}-<http://run.googleapis.com|run.googleapis.com>"})
    name = f"projects/{PROJECT_ID}/locations/{REGION}/jobs/{job_name}"

    <http://logger.info|logger.info>(f"Starting Cloud Run job: {name}")
    op = client.run_job(name=name)
    execution = op.result()  # waits until the job execution completes or fails
    <http://logger.info|logger.info>(f"Job execution completed: {execution.name}")
    # You could inspect execution.state or conditions for richer status handling
    return execution.name

@flow(name="run-cloud-run-job")
def run_cloud_run_job(job_name: str = "my-job"):
    return trigger_cloud_run_job(job_name)

if __name__ == "__main__":
    run_cloud_run_job()
Best-practice deployment - Keep the flow as Python (flow.py) and manage deployments with prefect.yaml +
prefect deploy
. No shell wrappers needed. - Because you’re running a Process worker on Cloud Run, bake your flow’s Python deps (like google-cloud-run) into the worker image (as shown above). That way your worker can execute any of your flows reliably. Minimal prefect.yaml (checked into your repo)
Copy code
name: cloud-run-jobs
prefect-version: "3.*"

deployments:
- name: trigger-existing-cloud-run-job
  entrypoint: flow.py:run_cloud_run_job
  work_pool:
    name: my-cloud-run-process-pool
  parameters:
    job_name: "my-job"
Deploy it - From the project root (with flow.py and prefect.yaml present):
Copy code
prefect deploy
- You’ll be prompted for details if anything is missing. This will register the deployment pointing to your existing work pool (the one your Cloud Run worker is polling). - After that, trigger runs via the UI or with:
Copy code
prefect deployments run "run-cloud-run-job/trigger-existing-cloud-run-job" -p job_name="my-job"
Notes - The code waits for completion by calling
op.result()
. If you want to add explicit status handling, you can inspect the returned
execution
object’s state/conditions. - Make sure your Cloud Run worker has egress to reach the public Cloud Run API (default internet egress is fine). - If you ever switch to the gcloud approach, you’d need the gcloud SDK in the container—avoid that here. Helpful docs - Prefect GCP worker guide: Prefect GCP worker guide - Cloud Run Jobs client (Python): JobsClient (Python) - Cloud Run Jobs overview: Running Cloud Run jobs If you want me to pin exact versions (e.g., google-cloud-run) or generate a full Dockerfile and CI snippet, say the word. I live for this level of excitement.
v
@Marvin additional context: • the prefect server runs on gcp compute engine as a docker image, so it is self hosted, not on prefect cloud • host URL is set to 0.0.0.0 as an environment variable inside the container question: how should the cloud run service worker be configured to follow this server? is it by setting the PREFECT_API_URL to the external IP of the self hosted prefect server? are there additional steps?
m
thought for 51.7 seconds
Short answer: yes—point the Cloud Run worker at your self-hosted API with PREFECT_API_URL, and make sure the API is actually reachable from Cloud Run. A few specifics so it doesn’t sulk and refuse to work: What to set on the Cloud Run worker - Set this env var: - PREFECT_API_URL=http//&lt;your server ip or domain&gt;4200/api - If you have a domain and TLS: https://prefect.your-domain/api - Start the worker with your pool: - Command:
prefect worker start -p <your-work-pool> --type process
Server/network prerequisites - Your Prefect server must listen on 0.0.0.0 (you’ve done that) and expose the API port externally. - Open the firewall on the GCE VM for the API port (default 4200) to Cloud Run egress (or the internet, if you like living dangerously). - If the server is private (no public IP), use a Serverless VPC Connector on Cloud Run, point PREFECT_API_URL at the VM’s internal IP, and allow the connector’s range in your VM’s firewall. - Test from the worker’s environment:
curl http://<host>:4200/api/health
. If that fails, the worker won’t connect either. TLS notes - If you terminate TLS with a proper cert (recommended), just use https://…/api. - If you insist on a self-signed cert, make the worker trust it by setting REQUESTS_CA_BUNDLE to your CA bundle. Avoid disabling verification unless you enjoy intermittent, inscrutable failures. Path prefixes (only if you added one) - If you proxy the API under a prefix (e.g., https://prefect.your-domain/prefect), set PREFECT_API_URL to https://prefect.your-domain/prefect/api. Nothing else is required - Self-hosted Prefect Server doesn’t use API keys by default, so no tokens needed. - The UI URL doesn’t matter to the worker; only PREFECT_API_URL does. Sanity checklist - Cloud Run env var: PREFECT_API_URL points to …/api - Worker command uses the correct work pool - Firewall/LB configured so the worker can reach the API - curl to /api/health from the worker succeeds If you want, share your exact endpoint (IP vs domain, HTTP vs HTTPS) and I’ll give you the precise Cloud Run deploy command with the right env vars. I do adore being specific.