Tom Matthews
06/09/2024, 5:21 PMMarvin
06/09/2024, 5:21 PMsh
pip install prefect
2. Create and Query Deployments:
You can use the Prefect client to create and query deployments. Here's an example snippet:
python
from prefect.client import get_client
async def get_deployments():
async with get_client() as client:
# Replace "your_flow_name" with the name of your flow
deployments = await client.read_deployments(flow_name="your_flow_name")
return deployments
# Example usage
import asyncio
deployments = asyncio.run(get_deployments())
for deployment in deployments:
print(deployment)
### Documentation Links
1. Deployments and Flows
2. Prefect Deployment Tutorial
3. Prefect Deploy Guide
### Related GitHub Issues
Searching through GitHub issues may also offer additional insights and related discussions:
- Support subflow cancellation if parent flow is in terminal state
- Unable to handle 'Cancelling' events
Feel free to delve into the linked resources for a deeper understanding. If you have further questions or issues, don't hesitate to ask!Tom Matthews
06/09/2024, 5:41 PMasync def _create_prefect_workpools():
async with get_client() as client:
await client.create_work_pool(
WorkPoolCreate(
name=infra.PrefectWorkpool.CPU.value,
type="kubernetes",
base_job_template={
"job_configuration": {},
"variables": {
"properties": {
"image": constants.DEFAULT_PREFECT_WORKFLOW_IMAGE
}
},
},
)
)
Marvin
06/09/2024, 5:41 PMbase_job_template
defines fields that can be edited on a per-deployment basis or for the entire work pool.
### Updated Code
python
from prefect.client import get_client
from prefect.workers import WorkPoolCreate
import constants
import infra
async def _create_prefect_workpools():
async with get_client() as client:
await client.create_work_pool(
WorkPoolCreate(
name=infra.PrefectWorkpool.CPU.value,
type="kubernetes",
base_job_template={
"job_configuration": {
"container": {
"image": "{{ image }}",
"resources": {
"limits": {
"cpu": "{{ cpu }}",
"memory": "{{ memory }}"
}
},
"env": {
"PREFECT_API_URL": "{{ api_url }}",
"PREFECT_LOGGING_LEVEL": "{{ logging_level }}"
}
}
},
"variables": {
"properties": {
"image": {
"default": constants.DEFAULT_PREFECT_WORKFLOW_IMAGE,
"type": "string"
},
"cpu": {
"default": 500,
"type": "integer"
},
"memory": {
"default": 1024,
"type": "integer"
},
"api_url": {
"default": "<http://api.prefect.io>",
"type": "string"
},
"logging_level": {
"default": "INFO",
"type": "string"
}
}
}
},
)
)
# Example constants and infra definitions
class constants:
DEFAULT_PREFECT_WORKFLOW_IMAGE = "prefecthq/prefect:2.0.0"
class infra:
class PrefectWorkpool:
CPU = "cpu-workpool"
### Explanation
1. Job Configuration: The job_configuration
section includes placeholders ({{ image }}
, {{ cpu }}
, {{ memory }}
, {{ api_url }}
, {{ logging_level }}
) which are referenced from the variables
section. This allows you to customize job configurations based on the deployment.
2. Variables: This section defines the default values and types for each configurable variable. These variables can be referenced in the job configuration using double curly braces.
### Documentation Links
- Prefect Work Pools Concept
- Developing a New Worker Type
Feel free to adjust the configuration as needed for your specific testing requirements. If you have any further questions or run into issues, let me know!