Florent VanDeMoortele
09/19/2024, 2:39 PMMarvin
09/19/2024, 2:39 PM--no-cpu-throttling
to ensure that your Cloud Run instances can start up quickly.
- Setting --min-instances
to a higher value to keep some instances warm and reduce cold start latency.
Example command:
shell
gcloud run deploy prefect-worker --image=prefecthq/prefect:<VERSION> \
--set-env-vars PREFECT_API_URL=$PREFECT_API_URL,PREFECT_API_KEY=$PREFECT_API_KEY \
--service-account <YOUR-SERVICE-ACCOUNT-NAME> \
--no-cpu-throttling \
--min-instances 1 \
--args "prefect","worker","start","--install-policy","always","--with-healthcheck","-p","<WORK-POOL-NAME>","-t","cloud-run"
3. Concurrency and Scheduling: Configure global concurrency limits and optimize task scheduling in Prefect to reduce wait times.
4. Prefect Worker Policies: Ensure that your Prefect workers have the appropriate policies set to minimize install and initialization times. For instance, you can use the --install-policy always
flag to ensure necessary packages are always installed.
For detailed guidance on Prefect configuration and deployment strategies, you can refer to the Prefect documentation for Cloud Run deployment.
By following these steps, you should be able to significantly reduce the pending duration for your flows.
Links for reference:
- Prefect 2.x Documentation: Global Concurrency Limits
- Prefect 3.x Documentation: GCP Worker GuideFlorent VanDeMoortele
09/19/2024, 2:43 PM{
"variables": {
"type": "object",
"required": [
"credentials"
],
"properties": {
"cpu": {
"type": "string",
"title": "CPU",
"default": "1000m",
"pattern": "^(\\d*000)m$",
"description": "The CPU to allocate to the Cloud Run job."
},
"env": {
"type": "object",
"title": "Environment Variables",
"description": "Environment variables to set when starting a flow run.",
"additionalProperties": {
"type": "string"
}
},
"args": {
"type": "array",
"items": {
"type": "string"
},
"title": "Args",
"description": "The arguments to pass to the Cloud Run Job V2's entrypoint command."
},
"name": {
"type": "string",
"title": "Name",
"description": "Name given to created infrastructure."
},
"image": {
"type": "string",
"title": "Image Name",
"default": "<http://docker.io/prefecthq/prefect:2-latest|docker.io/prefecthq/prefect:2-latest>",
"description": "The image to use for the Cloud Run job. If not provided the latest Prefect image will be used."
},
"labels": {
"type": "object",
"title": "Labels",
"description": "Labels applied to created infrastructure.",
"additionalProperties": {
"type": "string"
}
},
"memory": {
"type": "string",
"title": "Memory",
"default": "512Mi",
"example": "512Mi",
"pattern": "^\\d+(?:G|Gi|M|Mi)$",
"description": "The memory to allocate to the Cloud Run job along with the units, which could be: G, Gi, M, Mi."
},
"region": {
"type": "string",
"title": "Region",
"default": "us-central1",
"description": "The region in which to run the Cloud Run job"
},
"command": {
"type": "string",
"title": "Command",
"description": "The command to use when starting a flow run. In most cases, this should be left blank and the command will be automatically generated."
},
"timeout": {
"type": "integer",
"title": "Job Timeout",
"default": 600,
"maximum": 86400,
"description": "The length of time that Prefect will wait for a Cloud Run Job to complete before raising an exception (maximum of 86400 seconds, 1 day).",
"exclusiveMinimum": 0
},
"keep_job": {
"type": "boolean",
"title": "Keep Job After Completion",
"default": false,
"description": "Keep the completed Cloud run job on Google Cloud Platform."
},
"credentials": {
"allOf": [
{
"$ref": "#/definitions/GcpCredentials"
}
],
"title": "GCP Credentials",
"default": {
"$ref": {
"block_document_id": "edf27b1c-a5b3-4a4a-b8d7-6e3bab853395"
}
},
"description": "The GCP Credentials used to initiate the Cloud Run Job. `service_account_info` is required."
},
"max_retries": {
"type": "integer",
"title": "Max Retries",
"default": 0,
"description": "The number of times to retry the Cloud Run job."
},
"launch_stage": {
"enum": [
"ALPHA",
"BETA",
"GA",
"DEPRECATED",
"EARLY_ACCESS",
"PRELAUNCH",
"UNIMPLEMENTED",
"LAUNCH_TAG_UNSPECIFIED"
],
"type": "string",
"title": "Launch Stage",
"default": "BETA",
"description": "The launch stage of the Cloud Run Job V2. See <https://cloud.google.com/run/docs/about-features-categories> for additional details."
},
"vpc_connector_name": {
"type": "string",
"title": "VPC Connector Name",
"description": "The name of the VPC connector to use for the Cloud Run job."
},
"service_account_name": {
"type": "string",
"title": "Service Account Name",
"example": "<mailto:service-account@example.iam.gserviceaccount.com|service-account@example.iam.gserviceaccount.com>",
"description": "The name of the service account to use for the task execution of Cloud Run Job. By default Cloud Run jobs run as the default Compute Engine Service Account. "
}
},
"definitions": {
"GcpCredentials": {
"type": "object",
"title": "GcpCredentials",
"properties": {
"project": {
"type": "string",
"title": "Project",
"description": "The GCP project to use for the client."
},
"service_account_file": {
"type": "string",
"title": "Service Account File",
"format": "path",
"description": "Path to the service account JSON keyfile."
},
"service_account_info": {
"type": "object",
"title": "Service Account Info",
"description": "The contents of the keyfile as a dict."
}
},
"description": "Block used to manage authentication with GCP. Google authentication is handled via the `google.oauth2.\nSee Google's [Authentication documentation](<https://cloud.google.com/docs/authentication#service-accounts>)\nfor details on inference and recommended authentication patterns.",
"secret_fields": [
"service_account_info.*"
],
"block_type_slug": "gcp-credentials",
"block_schema_references": {}
}
},
"description": "Default variables for the Cloud Run V2 Push Pool.\n\nThe schema for this class is used to populate the `variables` section of the\ndefault base job template."
},
"job_configuration": {
"env": "{{ env }}",
"name": "{{ name }}",
"labels": "{{ labels }}",
"region": "{{ region }}",
"command": "{{ command }}",
"timeout": "{{ timeout }}",
"job_body": {
"client": "prefect",
"template": {
"template": {
"timeout": "{{ timeout }}",
"containers": [
{
"env": [],
"args": "{{ args }}",
"image": "{{ image }}",
"command": "{{ command }}",
"resources": {
"limits": {
"cpu": "{{ cpu }}",
"memory": "{{ memory }}"
}
}
}
],
"maxRetries": "{{ max_retries }}",
"serviceAccount": "{{ service_account_name }}"
}
},
"launchStage": "{{ launch_stage }}"
},
"keep_job": "{{ keep_job }}",
"credentials": "{{ credentials }}"
}
}
Marvin
09/19/2024, 2:43 PMhere's my worker configuration, how can I reduce the pending duration?```{ "variables": { "type": "object", "required": [ "credentials" ], "properties": { "cpu": { "type": "string", "title": "CPU", "default": "1000m", "pattern": "^(\\d*000)m$", "description": "The CPU to allocate to the Cloud Run job." }, "env": { "type": "object", "title": "Environment Variables", "description": "Environment variables to set when starting a flow run.", "additionalProperties": { "type": "string" } }, "args": { "type": "array", "items": { "type": "string" }, "title": "Args", "description": "The arguments to pass to the Cloud Run Job V2's entrypoint command." }, "name": { "type": "string", "title": "Name", "description": "Name given to created infrastructure." }, "image": { "type": "string", "title": "Image Name", "default": "docker.io/prefecthq/prefect:2-latest", "description
Florent VanDeMoortele
09/19/2024, 2:45 PM{
"variables": {
"type": "object",
"required": [
"credentials"
],
"properties": {
"cpu": {
"type": "string",
"title": "CPU",
"default": "1000m",
"pattern": "^(\\d*000)m$",
"description": "The CPU to allocate to the Cloud Run job."
},
"image": {
"type": "string",
"title": "Image Name",
"default": "<http://docker.io/prefecthq/prefect:2-latest|docker.io/prefecthq/prefect:2-latest>",
"description": "The image to use for the Cloud Run job. If not provided the latest Prefect image will be used."
},
"memory": {
"type": "string",
"title": "Memory",
"default": "512Mi",
"example": "512Mi",
"pattern": "^\\d+(?:G|Gi|M|Mi)$",
"description": "The memory to allocate to the Cloud Run job along with the units, which could be: G, Gi, M, Mi."
},
"timeout": {
"type": "integer",
"title": "Job Timeout",
"default": 600,
"maximum": 86400,
"description": "The length of time that Prefect will wait for a Cloud Run Job to complete before raising an exception (maximum of 86400 seconds, 1 day).",
"exclusiveMinimum": 0
},
"job_configuration": {
"env": "{{ env }}",
"name": "{{ name }}",
"labels": "{{ labels }}",
"region": "{{ region }}",
"command": "{{ command }}",
"timeout": "{{ timeout }}",
"job_body": {
"client": "prefect",
"template": {
"template": {
"timeout": "{{ timeout }}",
"containers": [
{
"env": [],
"args": "{{ args }}",
"image": "{{ image }}",
"command": "{{ command }}",
"resources": {
"limits": {
"cpu": "{{ cpu }}",
"memory": "{{ memory }}"
}
}
}
],
"maxRetries": "{{ max_retries }}",
"serviceAccount": "{{ service_account_name }}"
}
},
"launchStage": "{{ launch_stage }}"
},
"keep_job": "{{ keep_job }}",
"credentials": "{{ credentials }}"
}
}
Marvin
09/19/2024, 2:45 PMhere's my worker configuration, how can I reduce the pending duration?```{ "variables": { "type": "object", "required": [ "credentials" ], "properties": { "cpu": { "type": "string", "title": "CPU", "default": "1000m", "pattern": "^(\\d*000)m$", "description": "The CPU to allocate to the Cloud Run job." }, "image": { "type": "string", "title": "Image Name", "default": "docker.io/prefecthq/prefect:2-latest", "description": "The image to use for the Cloud Run job. If not provided the latest Prefect image will be used." }, "memory": { "type": "string", "title": "Memory", "default": "512Mi", "example": "512Mi", "pattern": "^\\d+(?:G|Gi|M|Mi)$", "description": "The memory to allocate to the Cloud Run job along with the units, which could be: G, Gi, M, Mi." }, "timeout": { "type": "integer", "title": "Job Timeout", "default": 600,
Florent VanDeMoortele
09/19/2024, 2:46 PMMarvin
09/19/2024, 2:47 PMsh
prefect work-pool create 'my-cloud-run-pool' --type cloud-run
2. Start Cloud Run Worker:
sh
prefect worker start --pool 'my-cloud-run-pool'
3. Custom Cloud Run Job Template:
You can use a customized job template to specify resources and other configurations:
json
{
"apiVersion": "<http://run.googleapis.com/v1|run.googleapis.com/v1>",
"kind": "Job",
"metadata": {
"name": "{{ name }}",
"annotations": {
"<http://run.googleapis.com/launch-stage|run.googleapis.com/launch-stage>": "BETA"
}
},
"spec": {
"template": {
"spec": {
"template": {
"spec": {
"containers": [
{
"image": "{{ image }}",
"args": "{{ args }}",
"resources": {
"limits": {
"cpu": "{{ cpu }}",
"memory": "{{ memory }}"
},
"requests": {
"cpu": "{{ cpu }}",
"memory": "{{ memory }}"
}
}
}
],
"timeoutSeconds": "{{ timeout }}",
"serviceAccountName": "{{ service_account_name }}"
}
}
}
}
}
}
### Prefect 3.x
1. Set Required Environment Variables:
sh
export PREFECT_API_URL='<https://api.prefect.cloud/api/accounts/<ACCOUNT-ID>/workspaces/<WORKSPACE-ID>>'
export PREFECT_API_KEY='<YOUR-API-KEY>'
2. Deploy Worker as a Service:
sh
gcloud run deploy prefect-worker --image=prefecthq/prefect:3-latest \
--set-env-vars PREFECT_API_URL=$PREFECT_API_URL,PREFECT_API_KEY=$PREFECT_API_KEY \
--service-account <YOUR-SERVICE-ACCOUNT-NAME> \
--no-cpu-throttling \
--min-instances 1 \
--args "prefect","worker","start","--install-policy","always","--with-healthcheck","-p","<WORK-POOL-NAME>","-t","cloud-run"
By pre-configuring the work pool and using custom job templates, you ensure that the necessary resources are ready to minimize startup latency. Using the flags for no-cpu-throttling
and setting min-instances
helps in avoiding cold starts, thus reducing the pending duration significantly.
For more detailed instructions and configurations, you can refer to the:
- Prefect 2.x Documentation: Prefect GCP Cloud Run Worker
- Prefect 3.x Documentation: Prefect GCP Worker GuideFlorent VanDeMoortele
09/19/2024, 3:27 PMMarvin
09/19/2024, 3:27 PMsh
prefect work-pool create my-push-pool --type push
2. Configure the Push Work Pool:
- In the Prefect UI, navigate to the Work Pools page.
- Click "Create" and choose "Push" during the infrastructure type step.
- Configure the required fields, such as GCP credentials or other cloud settings as needed.
3. Deploy Your Flow to the Push Work Pool:
When creating or updating your deployment, specify the work pool:
yaml
name: my-flow
work_pool:
name: my-push-pool
4. Start the Worker:
sh
prefect worker start --pool my-push-pool
### Prefect 3.x
1. Set Up Environment Variables (specific to your cloud provider):
sh
export PREFECT_API_URL='<https://api.prefect.cloud/api/accounts/<ACCOUNT-ID>/workspaces/<WORKSPACE-ID>>'
export PREFECT_API_KEY='<YOUR-API-KEY>'
2. Create and Configure a Push Work Pool:
- In the Prefect UI, go to the Work Pools section.
- Click the "+" button to create a new work pool, selecting the "Push" option.
- Fill in the necessary details, such as credentials and other cloud-specific settings.
3. Deploy the Worker:
sh
gcloud run deploy prefect-worker --image=prefecthq/prefect:3-latest \
--set-env-vars PREFECT_API_URL=$PREFECT_API_URL,PREFECT_API_KEY=$PREFECT_API_KEY \
--service-account <YOUR-SERVICE-ACCOUNT-NAME> \
--no-cpu-throttling \
--min-instances 1 \
--args "prefect","worker","start","--install-policy","always","--with-healthcheck","-p","my-push-pool","-t","cloud-run"
### Detailed Configuration Example
Here’s a more detailed YAML configuration for a deployment:
yaml
# deployment.yaml
name: my-flow
work_pool:
name: my-push-pool
tags: []
parameters: {}
### References for Push Work Pools:
- Prefect 2.x Documentation: Push Work Pools Guide
- Prefect 3.x Documentation: Work Pools Overview
By configuring a Push Work Pool, you can streamline job dispatches and improve performance, ensuring tasks are efficiently assigned to the right workers.