Srinivas Kandukuri
07/09/2025, 7:27 AMMarvin
07/09/2025, 7:27 AMMarvin
07/09/2025, 7:27 AMbash
prefect workpool create "ads-pool-1" --type process
prefect workpool create "ads-pool-2" --type process
# ... and so on for all 6 ADS servers
2. For each workpool, you can set base configuration using workpool settings. Instead of reading from /server/cfg/setup/dev1.cfg
, you can set the ADS hostname directly in the workpool configuration:
python
from prefect.client import get_client
async def configure_workpool():
client = get_client()
await client.update_workpool(
"ads-pool-1",
base_job_template={
"env": {
"ADS_HOSTNAME": "<http://ads1.example.com|ads1.example.com>" # Your actual ADS hostname
}
}
)
3. Modify your flow to read the ADS hostname from environment variables instead of the config file:
python
import os
from prefect import flow
@flow
def your_flow(other_params):
ads_hostname = os.environ.get("ADS_HOSTNAME")
# Your logic using ads_hostname
...
4. When deploying your flow, you no longer need the cfg parameter:
python
from prefect import flow
from prefect.deployments import Deployment
@flow
def your_flow(other_params):
...
deployment = Deployment.build_from_flow(
your_flow,
name="your-deployment",
# Remove the cfg parameter since it's now handled by workpool
)
5. To handle the connection limits (200 connections per ADS), set the concurrency limits on your workpools:
bash
prefect workpool update ads-pool-1 --concurrency-limit 200
6. When running deployments, you can now specify which workpool to use:
python
from prefect.client import get_client
async def run_deployment():
client = get_client()
deployment_id = await client.create_deployment_run(
"your-flow/your-deployment",
workpool_name="ads-pool-1" # Specify which ADS pool to use
)
This setup gives you several advantages:
1. Load Distribution: You can distribute your 1000+ runs across multiple ADS servers by using different workpools
2. Connection Management: Each workpool's concurrency limit ensures you don't exceed the 200 connection limit per ADS
3. Configuration Isolation: Each workpool has its own configuration, making it easier to manage different ADS servers
4. Dynamic Selection: You can choose which ADS server to use at runtime by selecting the appropriate workpool
For monitoring, you can use the Prefect UI to see how many runs are currently executing on each workpool and manage the workload distribution.
Remember to adjust the concurrency limits based on your actual system capabilities and requirements. You might want to set it slightly lower than 200 to leave some headroom for other connections.
This approach gives you more flexibility and better resource management than handling it at the deployment level, and it's more maintainable as you can update ADS configurations without modifying your deployments.Srinivas Kandukuri
07/09/2025, 12:46 PMSrinivas Kandukuri
07/09/2025, 12:46 PMMarvin
07/09/2025, 12:46 PMMarvin
07/09/2025, 12:46 PMbase_job_template
can handle complex configurations including environment variables, commands to run before your flow, and other settings. Here's how you can structure it:
python
from prefect.client import get_client
async def configure_workpool():
client = get_client()
await client.update_workpool(
"ads-pool-1",
base_job_template={
# Environment variables
"env": {
"ADS_HOSTNAME": "<http://ads1.example.com|ads1.example.com>",
"OTHER_CONFIG": "value",
"CUSTOM_SETTING": "setting_value",
# Add all your environment variables here
},
# Commands to run before the flow starts
"job_configuration": {
"prepare_command": [
"export MY_VAR=value",
"source /path/to/your/script.sh",
# Add any other commands that were in your .cfg file
]
}
}
)
If you have a lot of configuration, you can make it more manageable by loading it from a dictionary:
python
config = {
"env": {
"ADS_HOSTNAME": "<http://ads1.example.com|ads1.example.com>",
"DB_CONNECTION": "connection_string",
"API_KEY": "your_api_key",
# ... all your environment variables
},
"job_configuration": {
"prepare_command": [
"export PATH=$PATH:/additional/path",
"source /path/to/setup.sh",
"export CUSTOM_VAR=value",
# ... all your export commands and script calls
]
}
}
async def configure_workpool():
client = get_client()
await client.update_workpool(
"ads-pool-1",
base_job_template=config
)
A few important notes:
1. The prepare_command
commands run in sequence before your flow starts
2. Each command in prepare_command
runs in a shell context, so you can use shell features like:
- Environment variable expansion ($VAR
)
- Shell script sourcing (source
or .
)
- Export commands
- Piping and redirection
3. If you need to maintain different configurations for different environments, you could create a configuration file for each workpool:
python
import yaml
# Load workpool-specific configuration
def load_workpool_config(workpool_name):
with open(f"configs/{workpool_name}.yaml", "r") as f:
return yaml.safe_load(f)
async def configure_workpool(workpool_name):
config = load_workpool_config(workpool_name)
client = get_client()
await client.update_workpool(
workpool_name,
base_job_template=config
)
Example YAML configuration (configs/ads-pool-1.yaml
):
yaml
env:
ADS_HOSTNAME: <http://ads1.example.com|ads1.example.com>
DB_CONNECTION: connection_string
API_KEY: your_api_key
job_configuration:
prepare_command:
- export PATH=$PATH:/additional/path
- source /path/to/setup.sh
- export CUSTOM_VAR=value
This approach gives you:
1. All the configuration capabilities you had with your .cfg files
2. Better organization and separation of concerns
3. Easy management of multiple ADS configurations
4. The ability to version control your configurations
5. A clean way to update configurations without modifying the flows
Remember that any sensitive values (like API keys or passwords) should be handled securely, preferably using Prefect Blocks for secrets management rather than storing them directly in these configurations.
Also, make sure any scripts or files referenced in your prepare_command
are accessible in the environment where your flows will run. If you're using remote workers, ensure all necessary files are either in the worker environment or copied there as part of the deployment process.Srinivas Kandukuri
07/10/2025, 7:42 AMMarvin
07/10/2025, 7:43 AMMarvin
07/10/2025, 7:43 AM