Tom Matthews
06/09/2024, 5:21 PMMarvin
06/09/2024, 5:21 PMsh
pip install prefect
2. Create and Query Deployments:
You can use the Prefect client to create and query deployments. Here's an example snippet:
python
from prefect.client import get_client
async def get_deployments():
async with get_client() as client:
# Replace "your_flow_name" with the name of your flow
deployments = await client.read_deployments(flow_name="your_flow_name")
return deployments
# Example usage
import asyncio
deployments = asyncio.run(get_deployments())
for deployment in deployments:
print(deployment)
### Documentation Links
1. Deployments and Flows
2. Prefect Deployment Tutorial
3. Prefect Deploy Guide
### Related GitHub Issues
Searching through GitHub issues may also offer additional insights and related discussions:
- Support subflow cancellation if parent flow is in terminal state
- Unable to handle 'Cancelling' events
Feel free to delve into the linked resources for a deeper understanding. If you have further questions or issues, don't hesitate to ask!Tom Matthews
06/09/2024, 5:41 PMasync def _create_prefect_workpools():
async with get_client() as client:
await client.create_work_pool(
WorkPoolCreate(
name=infra.PrefectWorkpool.CPU.value,
type="kubernetes",
base_job_template={
"job_configuration": {},
"variables": {
"properties": {
"image": constants.DEFAULT_PREFECT_WORKFLOW_IMAGE
}
},
},
)
)
Marvin
06/09/2024, 5:41 PMbase_job_template
defines fields that can be edited on a per-deployment basis or for the entire work pool.
### Updated Code
python
from prefect.client import get_client
from prefect.workers import WorkPoolCreate
import constants
import infra
async def _create_prefect_workpools():
async with get_client() as client:
await client.create_work_pool(
WorkPoolCreate(
name=infra.PrefectWorkpool.CPU.value,
type="kubernetes",
base_job_template={
"job_configuration": {
"container": {
"image": "{{ image }}",
"resources": {
"limits": {
"cpu": "{{ cpu }}",
"memory": "{{ memory }}"
}
},
"env": {
"PREFECT_API_URL": "{{ api_url }}",
"PREFECT_LOGGING_LEVEL": "{{ logging_level }}"
}
}
},
"variables": {
"properties": {
"image": {
"default": constants.DEFAULT_PREFECT_WORKFLOW_IMAGE,
"type": "string"
},
"cpu": {
"default": 500,
"type": "integer"
},
"memory": {
"default": 1024,
"type": "integer"
},
"api_url": {
"default": "<http://api.prefect.io>",
"type": "string"
},
"logging_level": {
"default": "INFO",
"type": "string"
}
}
}
},
)
)
# Example constants and infra definitions
class constants:
DEFAULT_PREFECT_WORKFLOW_IMAGE = "prefecthq/prefect:2.0.0"
class infra:
class PrefectWorkpool:
CPU = "cpu-workpool"
### Explanation
1. Job Configuration: The job_configuration
section includes placeholders ({{ image }}
, {{ cpu }}
, {{ memory }}
, {{ api_url }}
, {{ logging_level }}
) which are referenced from the variables
section. This allows you to customize job configurations based on the deployment.
2. Variables: This section defines the default values and types for each configurable variable. These variables can be referenced in the job configuration using double curly braces.
### Documentation Links
- Prefect Work Pools Concept
- Developing a New Worker Type
Feel free to adjust the configuration as needed for your specific testing requirements. If you have any further questions or run into issues, let me know!Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by