<@ULVA73B9P> I recently upgraded to 2.19.3 from 2....
# ask-marvin
j
@Marvin I recently upgraded to 2.19.3 from 2.18.0 and my
async
flows are not registering. I use a
prefect.yaml
file for managing the deployments and nothing has changed with it, can you tell me what I am missing?
m
I couldn't locate any direct issues regarding
async
flows not registering after upgrading to 2.19.3. However, there could have been changes that impact the deployment process or
prefect.yaml
configuration. Below are some things to consider: 1. Flow Pre-Registration: Prefect 2.x no longer requires flow pre-registration like Prefect 1. Instead, you should create a Deployment. More details on Prefect 2 deployments. 2. Work Pools: Ensure that your deployments specify a work pool. If no work pool is set, a default one is used.
Copy code
yaml
   # Example configuration
   work_pools:
     - name: "my-dev-work-pool"
3. Troubleshooting Guide: Check the troubleshooting guide for further hints and detailed steps in case some settings or configurations have specific updates. Troubleshooting Guide 4. Migration Guide: Refer to the migration guide to see if there are any specific adjustments required for
async
flows between versions. Migration Guide Check each configuration for updates in the links provided. These steps should help you pinpoint any differences or updates needed in your
prefect.yaml
or deployment workflows. If the issue persists, consider raising it on the Prefect GitHub repository for more targeted assistance.
n
hi @Jeremy Knickerbocker - this seems odd can you share how you're creating your deployments and perhaps an example of the flow code that is not registering?
j
Hi @Nate it is a bit complicated, but I will do my best. When I roll back to
2.18.0
everything seems to work, but again this could be my own oversight. We have a
prefect.yaml
that has 20+ flows defined in it. The sync flows all deploy fine, however I get the following errors on my async flows.
Copy code
Could not find flow 'set_concurrency_limits' in 'flows/set_prefect_concurrency.py'
If I comment it out, then it continues to deploy until it hits the next async flow. Removing them all from my deployment and everything works as expected. One of the actual flows in question:
Copy code
import asyncio
from pprint import pformat

import psutil
from prefect import flow, get_client, get_run_logger, task

BYTES_TO_GB = 1 / 1024 / 1024 / 1024
RESOURCE_ALLOCATION = {
    "small": {"memory": 8},
    "medium": {"memory": 12},
    "large": {"memory": 16},
}


@task(name="Get Current Concurrency Limits")
async def get_current_concurrency_limits():
    async with get_client() as client:
        work_pools = await client.read_work_pools()
        work_queues = await client.read_work_queues()

        for wp in work_pools:
            del wp.base_job_template
            del wp.description

    return {"pools": work_pools, "queues": work_queues}


@task(name="Get System Resources")
async def get_system_resources():
    psutil.PROCFS_PATH = "/prochost"

    num_cpus = psutil.cpu_count()
    physical_memory = psutil.virtual_memory()
    swap_memory = psutil.swap_memory()

    result = {
        "num_cpus": num_cpus,
        "physical_memory": physical_memory,
        "swap_memory": swap_memory,
    }

    return result


@task(name="Set Concurrency Limits")
async def set_concurrency_limits_from_resources(current_limits: dict, resources: dict):
    logger = get_run_logger()
    logger.debug("This has not been implemented yet")
    <http://logger.info|logger.info>(pformat(current_limits))
    <http://logger.info|logger.info>(pformat(resources))


@flow(name="Set Concurrency Limits", flow_run_name="Get/Set Concurrency Limits")
async def set_concurrency_limits():
    resources = await get_system_resources()
    current_limits = await get_current_concurrency_limits.submit()
    await set_concurrency_limits_from_resources.submit(
        current_limits, resources, wait_for=[current_limits, resources]
    )


if __name__ == "__main__":
    asyncio.run(set_concurrency_limits())
The above flow runs in a docker container and doesn’t actually do anything, but does log host machine stats for later review (working on doing dynamic provisioning of flow concurrency based on available system resources)
NVM I have the release notes bookmarked in main (which don’t show 2.19.4 and 2.19.5), looks like this was fixed in 2.19.4 will update