Jeremy Knickerbocker
06/18/2024, 3:25 PMasync
flows are not registering. I use a prefect.yaml
file for managing the deployments and nothing has changed with it, can you tell me what I am missing?Marvin
06/18/2024, 3:25 PMasync
flows not registering after upgrading to 2.19.3. However, there could have been changes that impact the deployment process or prefect.yaml
configuration. Below are some things to consider:
1. Flow Pre-Registration:
Prefect 2.x no longer requires flow pre-registration like Prefect 1. Instead, you should create a Deployment.
More details on Prefect 2 deployments.
2. Work Pools:
Ensure that your deployments specify a work pool. If no work pool is set, a default one is used.
yaml
# Example configuration
work_pools:
- name: "my-dev-work-pool"
3. Troubleshooting Guide:
Check the troubleshooting guide for further hints and detailed steps in case some settings or configurations have specific updates.
Troubleshooting Guide
4. Migration Guide:
Refer to the migration guide to see if there are any specific adjustments required for async
flows between versions.
Migration Guide
Check each configuration for updates in the links provided. These steps should help you pinpoint any differences or updates needed in your prefect.yaml
or deployment workflows. If the issue persists, consider raising it on the Prefect GitHub repository for more targeted assistance.Nate
06/18/2024, 3:26 PMJeremy Knickerbocker
06/18/2024, 3:34 PM2.18.0
everything seems to work, but again this could be my own oversight.
We have a prefect.yaml
that has 20+ flows defined in it. The sync flows all deploy fine, however I get the following errors on my async flows.
Could not find flow 'set_concurrency_limits' in 'flows/set_prefect_concurrency.py'
If I comment it out, then it continues to deploy until it hits the next async flow. Removing them all from my deployment and everything works as expected.
One of the actual flows in question:
import asyncio
from pprint import pformat
import psutil
from prefect import flow, get_client, get_run_logger, task
BYTES_TO_GB = 1 / 1024 / 1024 / 1024
RESOURCE_ALLOCATION = {
"small": {"memory": 8},
"medium": {"memory": 12},
"large": {"memory": 16},
}
@task(name="Get Current Concurrency Limits")
async def get_current_concurrency_limits():
async with get_client() as client:
work_pools = await client.read_work_pools()
work_queues = await client.read_work_queues()
for wp in work_pools:
del wp.base_job_template
del wp.description
return {"pools": work_pools, "queues": work_queues}
@task(name="Get System Resources")
async def get_system_resources():
psutil.PROCFS_PATH = "/prochost"
num_cpus = psutil.cpu_count()
physical_memory = psutil.virtual_memory()
swap_memory = psutil.swap_memory()
result = {
"num_cpus": num_cpus,
"physical_memory": physical_memory,
"swap_memory": swap_memory,
}
return result
@task(name="Set Concurrency Limits")
async def set_concurrency_limits_from_resources(current_limits: dict, resources: dict):
logger = get_run_logger()
logger.debug("This has not been implemented yet")
<http://logger.info|logger.info>(pformat(current_limits))
<http://logger.info|logger.info>(pformat(resources))
@flow(name="Set Concurrency Limits", flow_run_name="Get/Set Concurrency Limits")
async def set_concurrency_limits():
resources = await get_system_resources()
current_limits = await get_current_concurrency_limits.submit()
await set_concurrency_limits_from_resources.submit(
current_limits, resources, wait_for=[current_limits, resources]
)
if __name__ == "__main__":
asyncio.run(set_concurrency_limits())
The above flow runs in a docker container and doesn’t actually do anything, but does log host machine stats for later review (working on doing dynamic provisioning of flow concurrency based on available system resources)Jeremy Knickerbocker
06/18/2024, 3:43 PMJeremy Knickerbocker
06/18/2024, 3:43 PM