victor kuang
08/20/2025, 4:33 AMMarvin
08/20/2025, 4:33 AMMarvin
08/20/2025, 4:34 AMGOOGLE_APPLICATION_CREDENTIALS=/path/key.json
) and the service account can execute Cloud Run jobs.
- Install dependency: pip install google-cloud-run
Example:
from prefect import flow, task
from google.cloud.run_v2 import JobsClient
# optional: from google.api_core.exceptions import NotFound
@task
def trigger_cloud_run_job(project_id: str, region: str, job_name: str) -> dict:
client = JobsClient()
name = f"projects/{project_id}/locations/{region}/jobs/{job_name}"
op = client.run_job(name=name)
execution = op.result() # waits for the job to finish
return {"execution_name": execution.name, "state": execution.state.name}
@flow
def run_job_flow(project_id: str, region: str, job_name: str):
return trigger_cloud_run_job(project_id, region, job_name)
if __name__ == "__main__":
print(run_job_flow("my-project", "us-central1", "my-job"))
- Docs: JobsClient (Python)
- Cloud Run Jobs overview: Running Cloud Run jobs
2) Call gcloud from a Prefect task (simple and effective)
- Ensure gcloud is installed and authenticated in the environment where the Prefect worker runs.
import subprocess
from prefect import flow, task
@task
def run_job_via_gcloud(job: str, region: str, project: str, wait: bool = True):
cmd = [
"gcloud", "run", "jobs", "execute", job,
f"--region={region}", f"--project={project}"
]
if wait:
cmd.append("--wait")
subprocess.run(cmd, check=True)
@flow
def run_job_flow(project: str, region: str, job: str):
run_job_via_gcloud(job, region, project)
Permissions to check (on the service account your Prefect worker uses):
- Ability to execute Cloud Run jobs (commonly Cloud Run Developer or Admin)
- If the job runs with its own service account, you may also need Service Account User on that account
If you tell me your project/region and whether you want to wait for completion or fire-and-forget, I’ll tailor the snippet. If you’re running on Prefect 3.x (assumed), both approaches fit neatly into a deployment. For GCP-specific worker guidance, see Prefect GCP worker guide.victor kuang
08/20/2025, 4:42 AMmy-project
and us-west1
as project/region
given the above:
• what dependencies are required on the cloud run service, if prefecthq:3-latest image is used as base?
• what should the flow.py contain, and how should the deployment be submitted for best practice? e.g. running a python script, or a shell script, or using prefect.yaml?Marvin
08/20/2025, 4:43 AMMarvin
08/20/2025, 4:47 AMgoogle-cloud-run
(this pulls in google-api-core, google-auth, grpc, etc.)
- You do NOT need the gcloud CLI in the container.
- You do NOT need a key file or GOOGLE_APPLICATION_CREDENTIALS on Cloud Run; ADC will use the attached service account automatically (which you’ve already granted permissions to).
Example Dockerfile
FROM prefecthq/prefect:3-latest
RUN pip install --no-cache-dir google-cloud-run
Flow code (waits for completion; project=my-project, region=us-west1)
from prefect import flow, task, get_run_logger
from google.cloud.run_v2 import JobsClient
PROJECT_ID = "my-project"
REGION = "us-west1"
@task
def trigger_cloud_run_job(job_name: str) -> str:
logger = get_run_logger()
# Use regional endpoint to avoid 404s/misdirected requests
client = JobsClient(client_options={"api_endpoint": f"{REGION}-<http://run.googleapis.com|run.googleapis.com>"})
name = f"projects/{PROJECT_ID}/locations/{REGION}/jobs/{job_name}"
<http://logger.info|logger.info>(f"Starting Cloud Run job: {name}")
op = client.run_job(name=name)
execution = op.result() # waits until the job execution completes or fails
<http://logger.info|logger.info>(f"Job execution completed: {execution.name}")
# You could inspect execution.state or conditions for richer status handling
return execution.name
@flow(name="run-cloud-run-job")
def run_cloud_run_job(job_name: str = "my-job"):
return trigger_cloud_run_job(job_name)
if __name__ == "__main__":
run_cloud_run_job()
Best-practice deployment
- Keep the flow as Python (flow.py) and manage deployments with prefect.yaml + prefect deploy
. No shell wrappers needed.
- Because you’re running a Process worker on Cloud Run, bake your flow’s Python deps (like google-cloud-run) into the worker image (as shown above). That way your worker can execute any of your flows reliably.
Minimal prefect.yaml (checked into your repo)
name: cloud-run-jobs
prefect-version: "3.*"
deployments:
- name: trigger-existing-cloud-run-job
entrypoint: flow.py:run_cloud_run_job
work_pool:
name: my-cloud-run-process-pool
parameters:
job_name: "my-job"
Deploy it
- From the project root (with flow.py and prefect.yaml present):
prefect deploy
- You’ll be prompted for details if anything is missing. This will register the deployment pointing to your existing work pool (the one your Cloud Run worker is polling).
- After that, trigger runs via the UI or with:
prefect deployments run "run-cloud-run-job/trigger-existing-cloud-run-job" -p job_name="my-job"
Notes
- The code waits for completion by calling op.result()
. If you want to add explicit status handling, you can inspect the returned execution
object’s state/conditions.
- Make sure your Cloud Run worker has egress to reach the public Cloud Run API (default internet egress is fine).
- If you ever switch to the gcloud approach, you’d need the gcloud SDK in the container—avoid that here.
Helpful docs
- Prefect GCP worker guide: Prefect GCP worker guide
- Cloud Run Jobs client (Python): JobsClient (Python)
- Cloud Run Jobs overview: Running Cloud Run jobs
If you want me to pin exact versions (e.g., google-cloud-run) or generate a full Dockerfile and CI snippet, say the word. I live for this level of excitement.victor kuang
08/20/2025, 5:02 AMMarvin
08/20/2025, 5:03 AMMarvin
08/20/2025, 5:03 AMprefect worker start -p <your-work-pool> --type process
Server/network prerequisites
- Your Prefect server must listen on 0.0.0.0 (you’ve done that) and expose the API port externally.
- Open the firewall on the GCE VM for the API port (default 4200) to Cloud Run egress (or the internet, if you like living dangerously).
- If the server is private (no public IP), use a Serverless VPC Connector on Cloud Run, point PREFECT_API_URL at the VM’s internal IP, and allow the connector’s range in your VM’s firewall.
- Test from the worker’s environment: curl http://<host>:4200/api/health
. If that fails, the worker won’t connect either.
TLS notes
- If you terminate TLS with a proper cert (recommended), just use https://…/api.
- If you insist on a self-signed cert, make the worker trust it by setting REQUESTS_CA_BUNDLE to your CA bundle. Avoid disabling verification unless you enjoy intermittent, inscrutable failures.
Path prefixes (only if you added one)
- If you proxy the API under a prefix (e.g., https://prefect.your-domain/prefect), set PREFECT_API_URL to https://prefect.your-domain/prefect/api.
Nothing else is required
- Self-hosted Prefect Server doesn’t use API keys by default, so no tokens needed.
- The UI URL doesn’t matter to the worker; only PREFECT_API_URL does.
Sanity checklist
- Cloud Run env var: PREFECT_API_URL points to …/api
- Worker command uses the correct work pool
- Firewall/LB configured so the worker can reach the API
- curl to /api/health from the worker succeeds
If you want, share your exact endpoint (IP vs domain, HTTP vs HTTPS) and I’ll give you the precise Cloud Run deploy command with the right env vars. I do adore being specific.