<@ULVA73B9P> Hey Marvin, I'm trying to setup prefe...
# ask-marvin
k
@Marvin Hey Marvin, I'm trying to setup prefect using python code only, in which one env variable should be able to distinguish between the workers/deployments: 'local' only runs locally, so no prefect queueing, 'prefect' uses prefect queues and docker workers, 'k8s' should use kubernetes workers. Now, I seem to have issues making this work:
Copy code
WORK_POOL_NAME = "fraio-docker-pool"
DEPLOYMENT_NAME = "fraio-trigger-flow"

_logger = logging.getLogger(__name__)

<http://_logger.info|_logger.info>("Module loaded, setting up work pool and deployment...")


async def ensure_work_pool():
    <http://_logger.info|_logger.info>("Getting Prefect client...")
    async with get_client() as client:
        <http://_logger.info|_logger.info>(f"client: {client}")
        pools = await client.read_work_pools()
        <http://_logger.info|_logger.info>(f"Existing work pools: {pools}")
        if not any(p.name == WORK_POOL_NAME for p in pools):
            <http://_logger.info|_logger.info>(f"Creating work pool {WORK_POOL_NAME!r}")
            await client.create_work_pool(
                work_pool=WorkPoolCreate(
                    name=WORK_POOL_NAME,
                    type="docker",
                    base_job_template={},
                )
            )
            <http://_logger.info|_logger.info>(f"Created work pool {WORK_POOL_NAME!r}")
        else:
            <http://_logger.info|_logger.info>(f"Work pool {WORK_POOL_NAME!r} already exists")


async def ensure_deployment():
    deployment = await trigger_flow.deploy(
        name=DEPLOYMENT_NAME,
        work_pool_name=WORK_POOL_NAME,
        image="harbor.mgmt.vintecc.cloud/vintecc-ags-hbm/fraio_api:local",
        build=False,
        push=False,
    )  # pyright: ignore[reportGeneralTypeIssues]
    <http://_logger.info|_logger.info>(f"Deployment: {deployment}")
    return deployment


async def run_flow():
    async with get_client() as client:
        dep = await client.read_deployment_by_name(f"trigger-flow/{DEPLOYMENT_NAME}")
        flow_run = await client.create_flow_run_from_deployment(dep.id)
        <http://_logger.info|_logger.info>(f"Created flow run: {flow_run.id}")
        return flow_run.id


_setup_done = asyncio.Event()


async def setup_once():
    <http://_logger.info|_logger.info>("Setting up work pool...")
    await ensure_work_pool()
    <http://_logger.info|_logger.info>("Setting up deployment...")
    await ensure_deployment()
    _setup_done.set()


setup_flow = asyncio.create_task(setup_once())
setup_flow.add_done_callback(lambda _: <http://_logger.info|_logger.info>("Setup completed"))


async def run_trigger_flow():
    await _setup_done.wait()
    await trigger_flow()
    return 5
m
thought for 259.7 seconds
Got it — you want a single env var to switch between: - local: run the flow inline (no queues/workers) - prefect: use a Docker work pool + worker - k8s: use a Kubernetes work pool + worker A few important notes up front: - Flow.deploy in Prefect 3.x is synchronous — don’t await it. It returns the deployment ID (UUID). - Docker/Kubernetes workers live in extras: install them with
pip install "prefect[docker]"
and/or
pip install "prefect[kubernetes]"
. The worker classes are in
prefect_docker
and
prefect_kubernetes
respectively. - When creating a work pool programmatically, pass the worker’s default base job template. Using
{}
will lead to odd behavior later (worker config mismatches). - You still need a worker process running against the pool for “prefect”/“k8s” modes. You can run workers via CLI or as sidecars/containers in your infra. Can you share the exact error you’re seeing? In the meantime, here’s a cleaned-up, code-only pattern that should work. Complete example with env-based switching ``` import os import asyncio import logging from uuid import UUID from prefect import flow, get_client # Extras: pip install "prefect[docker]" and/or "prefect[kubernetes]" try: from prefect_docker.workers import DockerWorker # for "prefect" mode except Exception: DockerWorker = None try: from prefect_kubernetes.workers import KubernetesWorker # for "k8s" mode except Exception: KubernetesWorker = None _logger = logging.getLogger(name) WORK_POOL_NAME = "fraio-docker-pool" DEPLOYMENT_NAME = "fraio-trigger-flow" MODE = os.getenv("RUNTIME_MODE", "local").lower() # "local" | "prefect" | "k8s" @flow(name="trigger-flow") def trigger_flow(): _logger.info("Hello from trigger_flow") return 5 async def ensure_work_pool(mode: str, pool_name: str): async with get_client() as client: pools = await client.read_work_pools() if any(p.name == pool_name for p in pools): _logger.info(f"Work pool {pool_name!r} already exists") return if mode == "prefect": if not DockerWorker: raise RuntimeError("prefect[docker] not installed but MODE='prefect'") base_template = DockerWorker.get_default_base_job_template() worker_type = DockerWorker.type # usually "docker" elif mode == "k8s": if not KubernetesWorker: raise RuntimeError("prefect[kubernetes] not installed but MODE='k8s'") base_template = KubernetesWorker.get_default_base_job_template() worker_type = KubernetesWorker.type # usually "kubernetes" else: # local mode does not need a work pool return from prefect.client.schemas.actions import WorkPoolCreate await client.create_work_pool( work_pool=WorkPoolCreate( name=pool_name, type=worker_type, base_job_template=base_template, ) ) _logger.info(f"Created work pool {pool_name!r} of type {worker_type!r}") def ensure_deployment(mode: str) -> UUID | None: if mode == "local": # No deployment required for local inline runs return None if mode == "prefect": # For Docker workers, you can set
image
directly. Set build/push as needed. return trigger_flow.deploy( name=DEPLOYMENT_NAME, work_pool_name=WORK_POOL_NAME, image="harbor.mgmt.vintecc.cloud/vintecc-ags-hbm/fraio_api:local", build=False, push=False, # job_variables={} # add extra worker overrides if needed ) if mode == "k8s": # For Kubernetes, you can pass the worker job variables explicitly. # If you’re not using Prefect to build/push an image, skip the
image=
arg # and define it in job_variables: return trigger_flow.deploy( name=DEPLOYMENT_NAME, work_pool_name=WORK_POOL_NAME,
job_variables={ "image": "your.registry/fraio_api:local", # add k8s specifics if needed: # "namespace": "default", # "service_account_name": "prefect-worker", # "env": [{"name": "FOO", "value": "bar"}], # "labels": {"app": "prefect"}, # "image_pull_secrets": ["your-secret"], }, build=False, # set True if you want Prefect to build via DockerImage push=False, ) raise ValueError(f"Unknown mode: {mode}") async def run_via_deployment(flow_name: str, deployment_name: str) -> UUID: async with get_client() as client: dep = await client.read_deployment_by_name(f"{flow_name}/{deployment_name}") fr = await client.create_flow_run_from_deployment(dep.id) _logger.info(f"Created flow run: {fr.id}") return fr.id async def main(): _logger.info(f"Starting in MODE={MODE!r}") # Only create pools/deployments when needed await ensure_work_pool(MODE, WORK_POOL_NAME) deployment_id = ensure_deployment(MODE) if MODE == "local": # Inline execution (no queueing) result = trigger_flow() _logger.info(f"Local flow result: {result}") else: # Requires a running worker pointed at WORK_POOL_NAME await run_via_deployment("trigger-flow", DEPLOYMENT_NAME) if name == "__main__": # Avoid creating asyncio tasks at import-time; run within an event loop instead. asyncio.run(main()) ``` Key fixes vs your snippet - Don’t await Flow.deploy — it’s synchronous in 3.x and returns a UUID. - Create work pools with the correct default base job template for the worker type: - Docker:
DockerWorker.get_default_base_job_template()
- Kubernetes:
KubernetesWorker.get_default_base_job_template()
- Use job_variables to pass worker-specific fields (e.g.,
image
,
namespace
for k8s). Passing an empty
base_job_template={}
at pool creation can cause misconfigurations later. - If you keep
build=False, push=False
, make sure your worker can pull the specified image (already present in a registry or locally). Running locally without queueing - Just call the flow function directly (
trigger_flow()
); this bypasses deployments/workers entirely. If you truly don’t want the run to be visible in Prefect at all, unset
PREFECT_API_URL
(or run with a profile that points to None). For “prefect” (Docker) and “k8s” modes - Ensure a worker is running against the target pool so scheduled runs get picked up. Typically: - Docker worker docs: https://prefecthq.github.io/prefect-docker/ - Kubernetes worker docs: https://prefecthq.github.io/prefect-kubernetes/ - You can run workers in containers/Pods as part of your stack; the important bit is they are alive and pointed at
WORK_POOL_NAME
. Docs and references - Deployments in Prefect: https://docs.prefect.io/latest/concepts/deployments/ - Work pools/workers overview: https://docs.prefect.io/latest/concepts/work-pools/ - Docker collection: https://prefecthq.github.io/prefect-docker/ - Kubernetes collection: https://prefecthq.github.io/prefect-kubernetes/ If you can share the actual error you’re running into (stack trace or message), I’ll pinpoint the exact fix. My hunches are: - awaiting Flow.deploy - creating the pool with an empty base_job_template - image not available to the worker - reading the deployment with an incorrect flow/deployment name pair - creating asyncio tasks at import-time without an event loop running
k
@Marvin right, but I don't have a `__main__`: it's just a module, not a standalone application.
m
thought for 57.4 seconds
Totally fair — if this is a library/module (not a script), you don’t want to spin up event loops or create tasks at import time. Here’s a safe “library-style” pattern: - No asyncio tasks created at import time - A single env var toggles local vs docker vs k8s - Expose async setup and run functions your application can call (e.g., from a web app’s startup hook) - Local mode runs inline (no queues/workers) - “prefect” and “k8s” modes ensure a work pool + deployment exist, then create a remote flow run Notes: - In Prefect 3.x,
Flow.deploy(...)
is synchronous — do not
await
it. - Docker/Kubernetes workers are extras: install with
pip install "prefect[docker]"
and/or
pip install "prefect[kubernetes]"
. - If you have the extras installed, you can provide the default base job template for the work pool; otherwise you can omit it and let the server default (or keep your current approach if it works). Code you can drop into your module ``` import os import asyncio import logging from uuid import UUID from prefect import flow, get_client # Optional extras (install as needed): try: from prefect_docker.workers import DockerWorker # MODE="prefect" except Exception: DockerWorker = None try: from prefect_kubernetes.workers import KubernetesWorker # MODE="k8s" except Exception: KubernetesWorker = None from prefect.client.schemas.actions import WorkPoolCreate _logger = logging.getLogger(name) WORK_POOL_NAME = "fraio-docker-pool" DEPLOYMENT_NAME = "fraio-trigger-flow" MODE = os.getenv("RUNTIME_MODE", "local").lower() # "local" | "prefect" | "k8s" @flow(name="trigger-flow") def trigger_flow(): _logger.info("Hello from trigger_flow") return 5 # Internal guards so setup runs once _setup_lock = asyncio.Lock() _setup_done = False async def _ensure_work_pool(mode: str, pool_name: str): if mode == "local": return # no pool/worker needed async with get_client() as client: pools = await client.read_work_pools() if any(p.name == pool_name for p in pools): _logger.info(f"Work pool {pool_name!r} already exists") return # Pick worker type + default base job template if available kwargs = {"name": pool_name} if mode == "prefect": worker_type = "docker" kwargs["type"] = worker_type if DockerWorker: kwargs["base_job_template"] = DockerWorker.get_default_base_job_template() elif mode == "k8s": worker_type = "kubernetes" kwargs["type"] = worker_type if KubernetesWorker: kwargs["base_job_template"] = KubernetesWorker.get_default_base_job_template() else: raise ValueError(f"Unknown mode: {mode}") await client.create_work_pool(WorkPoolCreate(**kwargs)) _logger.info(f"Created work pool {pool_name!r} of type {kwargs['type']!r}") def _ensure_deployment(mode: str) -> UUID | None: if mode == "local": return None # no deployment if mode == "prefect": # For Docker workers, let the worker pull this image return trigger_flow.deploy( name=DEPLOYMENT_NAME, work_pool_name=WORK_POOL_NAME, image="harbor.mgmt.vintecc.cloud/vintecc-ags-hbm/fraio_api:local", build=False, push=False, ) if mode == "k8s": # For Kubernetes workers, set job variables. If you don't use Prefect to build/push, # specify the image here so the worker knows what to run. return trigger_flow.deploy( name=DEPLOYMENT_NAME, work_pool_name=WORK_POOL_NAME, job_variables={ "image": "your.registry/fraio_api:local", # add other k8s settings if needed: # "namespace": "default", # "service_account_name": "prefect-worker", # "labels": {"app": "prefect"},
# "image_pull_secrets": ["your-secret"], }, build=False, push=False, ) raise ValueError(f"Unknown mode: {mode}") async def _run_via_deployment(flow_name: str, deployment_name: str) -> UUID: async with get_client() as client: dep = await client.read_deployment_by_name(f"{flow_name}/{deployment_name}") fr = await client.create_flow_run_from_deployment(dep.id) _logger.info(f"Created flow run: {fr.id}") return fr.id async def setup_prefect(): global _setup_done if _setup_done: return async with _setup_lock: if _setup_done: return await _ensure_work_pool(MODE, WORK_POOL_NAME) _ensure_deployment(MODE) # sync _setup_done = True _logger.info("Prefect setup completed") async def run_trigger_flow(): # Lazy setup at call time; safe when imported as a module if MODE == "local": # No API calls, no queueing return trigger_flow() await setup_prefect() # Requires a running worker pointed at WORK_POOL_NAME return await _run_via_deployment("trigger-flow", DEPLOYMENT_NAME) ``` How to use this from your host app - Call
await setup_prefect()
once during application startup (e.g., FastAPI
on_event("startup")
, Django async ready hook, etc.). - Then call
await run_trigger_flow()
wherever you need to trigger it. - In MODE="local",
run_trigger_flow()
returns the flow’s result. In MODE="prefect"/"k8s", it returns a flow run ID (since it runs remotely via a worker). Why avoid tasks at import time -
asyncio.create_task(...)
at import time will fail if there’s no running loop and can create hard-to-debug race conditions. Deferring setup to an explicit
setup_prefect()
and lazily calling it from
run_trigger_flow()
avoids this. Docs and references - Deployments: https://docs.prefect.io/latest/concepts/deployments/ - Work pools and workers: https://docs.prefect.io/latest/concepts/work-pools/ - Docker worker collection: https://prefecthq.github.io/prefect-docker/ - Kubernetes worker collection: https://prefecthq.github.io/prefect-kubernetes/ If you share the specific error you hit before, I can tailor this further (e.g., name mismatches in
read_deployment_by_name
, missing extras, or base job template issues).
k
@Marvin ok, so with this:
Copy code
import asyncio
import logging
import os
from uuid import UUID

from fraio_orchestrator.fraio_trigger_flow import trigger_flow
from prefect import get_client

try:
    from prefect_docker.workers import DockerWorker
except Exception:
    DockerWorker = None
try:
    from prefect_kubernetes.workers import KubernetesWorker
except Exception:
    KubernetesWorker = None

from prefect.client.schemas.actions import WorkPoolCreate

_logger = logging.getLogger(__name__)


WORK_POOL_NAME = "fraio-docker-pool"
DEPLOYMENT_NAME = "fraio-trigger-flow"
MODE = os.getenv("RUNTIME_MODE", "local").lower()  # "local" | "prefect" | "k8s"


_setup_lock = asyncio.Lock()
_setup_done = False


async def _ensure_work_pool(mode: str, pool_name: str):
    async with get_client() as client:
        pools = await client.read_work_pools()
        if any(p.name == pool_name for p in pools):
            <http://_logger.info|_logger.info>(f"Work pool {pool_name!r} already exists")
            return
        match mode:
            case "prefect":
                if not DockerWorker:
                    raise RuntimeError("prefect[docker] not installed but MODE='prefect'")
                base_template = DockerWorker.get_default_base_job_template()
                worker_type = DockerWorker.type  # usually "docker"
            case "k8s":
                if not KubernetesWorker:
                    raise RuntimeError("prefect[kubernetes] not installed but MODE='k8s'")
                base_template = KubernetesWorker.get_default_base_job_template()
                worker_type = KubernetesWorker.type  # usually "kubernetes"
            case _:
                return

        await client.create_work_pool(
            work_pool=WorkPoolCreate(
                name=pool_name,
                type=worker_type,
                base_job_template=base_template,
            )
        )
        <http://_logger.info|_logger.info>(f"Created work pool {pool_name!r} of type {worker_type!r}")


def _ensure_deployment(mode: str) -> UUID | None:
    match mode:
        case "local":
            return None
        case "prefect":
            return trigger_flow.deploy(
                name=DEPLOYMENT_NAME,
                work_pool_name=WORK_POOL_NAME,
                image="harbor.mgmt.vintecc.cloud/vintecc-ags-hbm/fraio_api:local",
                build=False,
                push=False,
            )  # pyright: ignore[reportReturnType]
        case "k8s":
            # For Kubernetes, you can pass the worker job variables explicitly.
            # If you're not using Prefect to build/push an image, skip the image= arg
            # and define it in job_variables:
            return trigger_flow.deploy(
                name=DEPLOYMENT_NAME,
                work_pool_name=WORK_POOL_NAME,
                job_variables={
                    "image": "harbor.mgmt.vintecc.cloud/vintecc-ags-hbm/fraio_api:local",
                    # add k8s specifics if needed:
                    # "namespace": "default",
                    # "service_account_name": "prefect-worker",
                    # "env": [{"name": "FOO", "value": "bar"}],
                    # "labels": {"app": "prefect"},
                    # "image_pull_secrets": ["your-secret"],
                },
                build=False,  # set True if you want Prefect to build via DockerImage
                push=False,
            )  # pyright: ignore[reportReturnType]
    raise ValueError(f"Unknown mode: {mode}")


async def _run_via_deployment(flow_name: str, deployment_name: str) -> UUID:
    async with get_client() as client:
        dep = await client.read_deployment_by_name(f"{flow_name}/{deployment_name}")
        fr = await client.create_flow_run_from_deployment(dep.id)
        <http://_logger.info|_logger.info>(f"Created flow run: {fr.id}")
        return fr.id


async def setup_prefect():
    global _setup_done
    if _setup_done:
        return
    async with _setup_lock:
        if _setup_done:
            return
        await _ensure_work_pool(MODE, WORK_POOL_NAME)
        _ensure_deployment(MODE)
        _setup_done = True
        <http://_logger.info|_logger.info>("Prefect setup completed")


async def run_trigger_flow():
    if MODE == "local":
        return trigger_flow()
    await setup_prefect()
    return await _run_via_deployment("trigger-flow", DEPLOYMENT_NAME)


setup_prefect_sync = asyncio.run(setup_prefect())
<http://_logger.info|_logger.info>("Prefect setup (sync) completed: {setup_prefect_sync}")
I get errors:
Copy code
fraio-api     | /usr/local/lib/python3.10/site-packages/uvicorn/lifespan/on.py:-1: RuntimeWarning: coroutine 'setup_prefect' was never awaited
fraio-api     | RuntimeWarning: Enable tracemalloc to get the object allocation traceback
fraio-worker  | Work pool 'fraio-docker-pool' does not exist and no worker type was provided. 
fraio-worker  | Starting a process worker...
fraio-worker  | 09:04:38.673 | INFO    | prefect.workers.process.processworker 2d3bd320-917d-472c-b1f3-9f28889e3f66 - Work pool 'fraio-docker-pool' created.
fraio-worker  | Worker 'ProcessWorker 2d3bd320-917d-472c-b1f3-9f28889e3f66' started!
fraio-api     | 09:05:05.046 | ERROR   | uvicorn.error - Exception in ASGI application
...
fraio-api     |   File "/app/fraio_api/fraio_api/operation_handlers/optimizer/staging_horizon/trigger.py", line 5, in <module>
fraio-api     |     from fraio_orchestrator.fraio_trigger import run_trigger_flow
fraio-api     |   File "/app/fraio_orchestrator/src/fraio_orchestrator/fraio_trigger.py", line 124, in <module>
fraio-api     |     setup_prefect_sync = asyncio.run(setup_prefect())
fraio-api     |   File "/usr/local/lib/python3.10/asyncio/runners.py", line 33, in run
fraio-api     |     raise RuntimeError(
fraio-api     | RuntimeError: asyncio.run() cannot be called from a running event loop
m
Your message was too long, here's your message at the allowed limit: ok so with this
Copy code
import asyncio import logging import os from uuid import UUID from fraio_orchestrator fraio_trigger_flow import trigger_flow from prefect import get_client try from prefect_docker workers import DockerWorker except Exception DockerWorker = None try from prefect_kubernetes workers import KubernetesWorker except Exception KubernetesWorker = None from prefect client schemas actions import WorkPoolCreate _logger = logging getLogger(__name__) WORK_POOL_NAME = fraio-docker-pool DEPLOYMENT_NAME = fraio-trigger-flow MODE = os getenv( RUNTIME_MODE local ) lower() # local | prefect | k8s _setup_lock = asyncio Lock() _setup_done = False async def _ensure_work_pool(mode str pool_name str) async with get_client() as client pools = await client read_work_pools() if any(p name == pool_name for p in pools) <http //_logger info|_logger info>(f Work pool {pool_name!r} already exists ) return match mode case prefect if not DockerWorker raise RuntimeError( prefect[docker] not installed but MODE='prefect' ) base_template = DockerWorker get_default_base_job_template() worker_type = DockerWorker type # usually docker case k8s if not KubernetesWorker raise RuntimeError( prefect[kubernetes] not installed but MODE='k8s' ) base_template = KubernetesWorker get_default_base_job_template() worker_type = KubernetesWorker type # usually kubernetes case _ return await client create_work_pool( work_pool=WorkPoolCreate( name=pool_name type=worker_type base_job_template=base_template ) ) <http //_logger info|_logger info>(f Created work pool {pool_name!r} of type {worker_type!r} ) def _ensure_deployment(mode str) -> UUID | None match mode case local return None case prefect return trigger_flow deploy( name=DEPLOYMENT_NAME work_pool_name=WORK_POOL_NAME image= harbor mgmt vintecc cloud/vintecc-ags-hbm/fraio_api local build=False push=False ) # pyright ignore[reportReturnType] case k8s # For Kubernetes you can pass the worker job variables explicitly # If you're not using Prefect to build/push an image skip the image= arg # and define it in job_variables return trigger_flow deploy( name=DEPLOYMENT_NAME work_pool_name=WORK_POOL_NAME job_variables={ image harbor mgmt vintecc cloud/vintecc-ags-hbm/fraio_api local # add k8s specifics if needed # namespace default # service_account_name prefect-worker # env [{ name FOO value bar }] # labels { app prefect } # image_pull_secrets [ your-secret ] } build=False # set True if you want Prefect to build via DockerImage push=False ) # pyright ignore[reportReturnType] raise ValueError(f Unknown mode {mode} ) async def _run_via_deployment(flow_name str deployment_name str) -> UUID async with get_client() as client dep = await client read_deployment_by_name(f {flow_name}/{deployment_name} ) fr = await client create_flow_run_from_deployment(dep id) <http //_logger info|_logger info>(f Created flow run {fr id} ) return fr id async def setup_prefect() global _setup_done if _setup_done return async with _setup_lock if _setup_done return await _ensure_work_pool(MODE WORK_POOL_NAME) _ensure_deployment(MODE) _setup_done = True <http //_logger info|_logger info>( Prefect setup completed ) async def run_trigger_flow() if MODE == local return trigger_flow() await setup_prefect() return await _run_via_deployment( trigger-flow DEPLOYMENT_NAME) setup_prefect_sync = asyncio run(setup_prefect()) <http //_logger info|_logger info>( Prefect setup (sync) completed {setup_prefect_sync} )
I get errors ```fraio-api | /usr/local/lib/python3 10/site-packages/uvicorn/lifespan/on py -1 RuntimeWarning coroutine 'setup_prefect' was never awaited fraio-api | RuntimeWarning Enable tracemalloc to get the object allocation traceback fraio-worker | Work pool 'fraio-docker-pool' does not exist and no worker type was provided fraio-worker | Starting a process worker fraio-worker | 09 04 38 673 | INFO | prefect workers process processworker 2d3bd320-917d-472c-b1f3-9f288
89e3f66 - Work pool 'fraio-docker-pool' created fraio-worker | Worker 'ProcessWorker 2d3bd320-917d-472c-b1f3-9f28889e3f66' started! fraio-api | 09 05 05
k
@Marvin ok, well, in short, I get errors with the code mentioned:
Copy code
fraio-api     | /usr/local/lib/python3.10/site-packages/uvicorn/lifespan/on.py:-1: RuntimeWarning: coroutine 'setup_prefect' was never awaited
fraio-api     | RuntimeWarning: Enable tracemalloc to get the object allocation traceback
fraio-worker  | Work pool 'fraio-docker-pool' does not exist and no worker type was provided. 
fraio-worker  | Starting a process worker...
fraio-worker  | 09:04:38.673 | INFO    | prefect.workers.process.processworker 2d3bd320-917d-472c-b1f3-9f28889e3f66 - Work pool 'fraio-docker-pool' created.
fraio-worker  | Worker 'ProcessWorker 2d3bd320-917d-472c-b1f3-9f28889e3f66' started!
fraio-api     | 09:05:05.046 | ERROR   | uvicorn.error - Exception in ASGI application
...
fraio-api     |     from fraio_orchestrator.fraio_trigger import run_trigger_flow
fraio-api     |   File "/app/fraio_orchestrator/src/fraio_orchestrator/fraio_trigger.py", line 124, in <module>
fraio-api     |     setup_prefect_sync = asyncio.run(setup_prefect())
fraio-api     |   File "/usr/local/lib/python3.10/asyncio/runners.py", line 33, in run
fraio-api     |     raise RuntimeError(
fraio-api     | RuntimeError: asyncio.run() cannot be called from a running event loop
fraio-api     | /usr/local/lib/python3.10/site-packages/uvicorn/protocols/http/httptools_impl.py:-1: RuntimeWarning: coroutine 'setup_prefect' was never awaited
fraio-api     | RuntimeWarning: Enable tracemalloc to get the object allocation traceback
The api is the one calling
run_trigger_flow
, in my docker compose, I added a 'fraio-worker' (the one to be used by docker workers) and I setup the prefect module inside the module itself:
Copy code
setup_prefect_sync = asyncio.run(setup_prefect())
_logger.info("Prefect setup (sync) completed: {setup_prefect_sync}")
m
thought for 71.1 seconds
Thanks for the logs — they point to two separate issues: 1) You’re calling async setup code at import time - Error: “coroutine 'setup_prefect' was never awaited” and “asyncio.run() cannot be called from a running event loop” - Cause: uvicorn is already running an event loop; calling
asyncio.run(...)
at import time will fail. - Fix: remove this line from your module:
Copy code
setup_prefect_sync = asyncio.run(setup_prefect())
Instead, either: - Await setup in your app’s startup hook (recommended), or - Lazily await setup inside your request handler before triggering the flow. For FastAPI examples:
Copy code
app = FastAPI()

@app.on_event("startup")
async def startup():
    await setup_prefect()
or with lifespan:
Copy code
@asynccontextmanager
async def lifespan(app):
    await setup_prefect()
    yield

app = FastAPI(lifespan=lifespan)
If you don’t have startup hooks available, just rely on your
run_trigger_flow()
to call
await setup_prefect()
internally (your earlier version already did that). 2) Your worker started the wrong type and created a process pool - Log: “Work pool 'fraio-docker-pool' does not exist and no worker type was provided. Starting a process worker... Work pool 'fraio-docker-pool' created.” - Cause: the worker was started without
--type docker
, so it created a process-type pool. - Fix: either pre-create the pool as Docker in your setup code, or start the worker with an explicit type so it creates the right pool if missing. Recommended options - Pre-create via code (ensure extras installed): - Install Docker extra in the worker image:
pip install "prefect[docker]"
- Use the worker’s default base job template when creating the pool:
Copy code
from prefect.client.schemas.actions import WorkPoolCreate
from prefect_docker.workers import DockerWorker

async with get_client() as client:
    pools = await client.read_work_pools()
    if not any(p.name == WORK_POOL_NAME for p in pools):
        await client.create_work_pool(
            WorkPoolCreate(
                name=WORK_POOL_NAME,
                type=DockerWorker.type,  # "docker"
                base_job_template=DockerWorker.get_default_base_job_template(),
            )
        )
- Or start the worker with an explicit type so it creates the right pool automatically if it’s missing:
Copy code
prefect worker start --pool fraio-docker-pool --type docker
Make sure the container has the docker extra installed and Docker access. Minimal fixes to your module - Do not call asyncio.run or create tasks at import time. - Let
run_trigger_flow()
do lazy setup and then run:
Copy code
async def setup_prefect():
    # create docker/k8s pool if MODE != local
    # deploy flow if MODE != local
    ...

async def run_trigger_flow():
    # In local mode, run inline; otherwise, ensure setup and trigger a deployment run
    if MODE == "local":
        return trigger_flow()

    await setup_prefect()
    return await _run_via_deployment("trigger-flow", DEPLOYMENT_NAME)
Compose/worker tips - Start your docker worker with the docker extra and explicit type: - Install: pip install "prefect[docker]" - Command: -
prefect worker start --pool fraio-docker-pool --type docker
- If you’re using Kubernetes instead, install
prefect[kubernetes]
and use
--type kubernetes
. Why this resolves your errors - No async at import-time: avoids “never awaited” and “cannot be called from a running event loop” - Explicit worker type: prevents accidental creation of a process pool - Using the worker’s default base job template: avoids “empty base_job_template” mismatches Docs for reference - Work pools and workers: https://docs.prefect.io/latest/concepts/work-pools/ - Deployments: https://docs.prefect.io/latest/concepts/deployments/ - Docker worker: https://prefecthq.github.io/prefect-docker/ - Kubernetes worker: https://prefecthq.github.io/prefect-kubernetes/ If you can share: - Are you using FastAPI (or another ASGI framework)?
- The exact docker-compose command for your fraio-worker I can suggest a precise docker-compose command and a tiny tweak to your API startup so setup runs in the right place.
k
@Marvin let's do this step by step: having a connexion app:
Copy code
app = connexion.AsyncApp(__name__, specification_dir=".", resolver=MethodResolver("operation_handlers"))
how to add a hook here (for the setup)?
m
thought for 18.7 seconds
Great question. With Connexion’s AsyncApp you’re running on ASGI (Starlette under the hood), so you can attach a startup hook to the underlying Starlette app. Pattern: - Build the Connexion AsyncApp - Grab the underlying ASGI app via
.app
- Register a Starlette startup event that awaits your async setup - Point Uvicorn at the ASGI app (not the Connexion wrapper) Example:
Copy code
import connexion
from connexion.resolver import MethodResolver

from your_module import setup_prefect  # your async setup function

cnx = connexion.AsyncApp(__name__, specification_dir=".", resolver=MethodResolver("operation_handlers"))
cnx.add_api("openapi.yaml")

# Expose the underlying ASGI app for Uvicorn
app = <http://cnx.app|cnx.app>

@app.on_event("startup")
async def startup():
    await setup_prefect()
Run Uvicorn against the ASGI app: - If your module is
main.py
and contains the code above, start with:
Copy code
uvicorn main:app
Notes: - Don’t call
asyncio.run(setup_prefect())
at import time; the startup hook will run it inside Uvicorn’s event loop. - If you previously pointed Uvicorn at the Connexion wrapper (e.g.,
uvicorn main:cnx
), switch to the underlying ASGI app (
main:app
) so the startup hook executes.
k
@Marvin
Copy code
cnx = connexion.AsyncApp(__name__, specification_dir=".", resolver=MethodResolver("operation_handlers"))

options = SwaggerUIOptions(spec_path="/openapi.yaml", swagger_ui=True, swagger_ui_path="/docs")

cnx.add_api(
    "openapi.yaml",
    strict_validation=True,
    # validate_responses=True,
    pythonic_params=True,
    swagger_ui_options=options,
    security_handlers={"apiKeyAuth": api_key_auth, "jwtAuth": bearer_auth},
)

app = <http://cnx.app|cnx.app>
gives:
Copy code
Cannot access attribute "app" for class "AsyncApp"
  Attribute "app" is unknownPylancereportAttributeAccessIssue
(function) app: Unknown
m
thought for 39.0 seconds
You can register your Prefect setup as a startup hook on the underlying ASGI app that Connexion exposes. Two notes: - Connexion’s AsyncApp exposes the Starlette app as
.app
at runtime. Pylance doesn’t know about it, so you’ll see a type warning you can safely ignore/suppress. - Add your async setup to a Starlette “startup” event so it runs inside Uvicorn’s event loop (no asyncio.run at import time). Example:
Copy code
import connexion
from connexion.resolver import MethodResolver
from connexion.options import SwaggerUIOptions

from your_module import setup_prefect  # your async setup function

cnx = connexion.AsyncApp(__name__, specification_dir=".", resolver=MethodResolver("operation_handlers"))

options = SwaggerUIOptions(spec_path="/openapi.yaml", swagger_ui=True, swagger_ui_path="/docs")

cnx.add_api(
    "openapi.yaml",
    strict_validation=True,
    pythonic_params=True,
    swagger_ui_options=options,
    security_handlers={"apiKeyAuth": api_key_auth, "jwtAuth": bearer_auth},
)

# Underlying Starlette app (Pylance doesn't know this attribute, but it exists at runtime)
app = <http://cnx.app|cnx.app>  # pyright: ignore[reportAttributeAccessIssue]

@app.on_event("startup")
async def _startup():
    await setup_prefect()
Run Uvicorn against the ASGI app:
Copy code
uvicorn your_module:app
If you prefer not to use the decorator, you can register the handler explicitly:
Copy code
app.add_event_handler("startup", _startup)
And make sure to remove any
asyncio.run(setup_prefect())
or background tasks created at import time from your module — the startup hook will handle it.
k
@Marvin apparently, pyright was right 😉
Copy code
fraio-api     |   File "/app/fraio_api/fraio_api/app.py", line 36, in <module>
fraio-api     |     app = <http://cnx.app|cnx.app>  # pyright: ignore[reportAttributeAccessIssue]
fraio-api     | AttributeError: 'AsyncApp' object has no attribute 'app'
m
thought for 44.6 seconds
Ah — your Connexion version doesn’t expose
.app
on AsyncApp. No problem; two working patterns: Option A: Register a startup handler directly on the AsyncApp (if it exposes Starlette-style APIs) Try this first — many Connexion versions forward Starlette APIs to the
AsyncApp
instance:
Copy code
cnx = connexion.AsyncApp(__name__, specification_dir=".", resolver=MethodResolver("operation_handlers"))

cnx.add_api(
    "openapi.yaml",
    strict_validation=True,
    pythonic_params=True,
    swagger_ui_options=options,
    security_handlers={"apiKeyAuth": api_key_auth, "jwtAuth": bearer_auth},
)

async def _startup():
    await setup_prefect()

# If available on your version:
cnx.add_event_handler("startup", _startup)  # or: cnx.on_event("startup")(_startup)

# This is your ASGI app for uvicorn
app = cnx
Option B (version-agnostic): Wrap with a Starlette app that defines a lifespan If A doesn’t work, mount the Connexion app into a small Starlette app that runs your setup during startup:
Copy code
from contextlib import asynccontextmanager
from starlette.applications import Starlette

cnx = connexion.AsyncApp(__name__, specification_dir=".", resolver=MethodResolver("operation_handlers"))
cnx.add_api(
    "openapi.yaml",
    strict_validation=True,
    pythonic_params=True,
    swagger_ui_options=options,
    security_handlers={"apiKeyAuth": api_key_auth, "jwtAuth": bearer_auth},
)

@asynccontextmanager
async def lifespan(_):
    await setup_prefect()
    yield

star = Starlette(lifespan=lifespan)
# Mount the Connexion ASGI app at the root
star.mount("/", cnx)

# This is your ASGI app for uvicorn
app = star
Then run:
Copy code
uvicorn your_module:app
Important reminders - Remove any
asyncio.run(setup_prefect())
or background tasks at import time. - Start your Prefect worker with the correct type so the pool is the right kind: - Docker:
prefect worker start --pool fraio-docker-pool --type docker
- Kubernetes:
prefect worker start --pool fraio-k8s-pool --type kubernetes
k
@Marvin right, got it working with contextmanager: https://connexion.readthedocs.io/en/stable/lifespan.html so now, let's move on to the next topic: workpool deployment seems to work:
Copy code
fraio-worker  | Work pool 'fraio-docker-pool' does not exist and no worker type was provided. 
fraio-worker  | Starting a process worker...
fraio-worker  | 09:43:20.707 | INFO    | prefect.workers.process.processworker a6f64a89-e59f-40a0-bc43-9d989d8a3acf - Work pool 'fraio-docker-pool' created.
fraio-worker  | Worker 'ProcessWorker a6f64a89-e59f-40a0-bc43-9d989d8a3acf' started!
fraio-api     | /app/fraio_api/fraio_api/operation_handlers/optimizer/staging_horizon/trigger.py:9: RuntimeWarning: coroutine 'run_flow_async' was never awaited
fraio-api     |   await run_trigger_flow()
fraio-api     | RuntimeWarning: Enable tracemalloc to get the object allocation traceback
This is really weird, because there is no
run_flow_async
in my code anywhere (used grep).
m
thought for 116.1 seconds
Nice on the Connexion lifespan fix. About the new warning: - RuntimeWarning: coroutine 'run_flow_async' was never awaited - You don’t have that name in your code because it comes from Prefect internals. It happens when an async flow is invoked without awaiting it. Most likely cause in your module: in the local branch you’re returning the flow call without awaiting it, but your flow is defined async. Patch your local-mode branch to handle both sync and async flows
Copy code
import inspect

async def run_trigger_flow():
    if MODE == "local":
        # Run inline (no queues/workers)
        if inspect.iscoroutinefunction(trigger_flow.fn):
            return await trigger_flow()   # async flow
        else:
            return trigger_flow()         # sync flow

    await setup_prefect()
    # Requires a running worker
    return await _run_via_deployment("trigger-flow", DEPLOYMENT_NAME)
Why this helps - If your
@flow
is defined as
async def trigger_flow(...): ...
, calling
trigger_flow()
returns a coroutine that Prefect’s engine will run via an internal coroutine (
run_flow_async
). If you don’t await it, Python warns that the coroutine was never awaited. Double-check your worker startup too Your logs show a process worker created your pool: - “Work pool 'fraio-docker-pool' does not exist and no worker type was provided. Starting a process worker...” Start the worker with an explicit type so the correct pool type is used:
Copy code
prefect worker start --pool fraio-docker-pool --type docker
(and ensure the docker extra is installed in that container: pip install "prefect[docker]") If you still hit the warning after the patch, please share: - How
trigger_flow
is defined (
async def
vs
def
) - The exact
run_trigger_flow
function body you’re using now - Where you call
await run_trigger_flow()
from (confirm it’s inside an async function) Refs - Flows: https://docs.prefect.io/latest/concepts/flows/ - Work pools/workers: https://docs.prefect.io/latest/concepts/work-pools/
k
@Marvin thx, I'm moving forward, local works fine now, I'm trying to run
prefect
mode. I'll just post you the code, so you're up to data, but you don't need to react on it yet. I'll add the result of running it afterwards:
Copy code
import asyncio
import logging
import os
from uuid import UUID

from fraio_orchestrator.fraio_trigger_flow import trigger_flow
from prefect import get_client

try:
    from prefect_docker.workers import DockerWorker
except Exception:
    DockerWorker = None
try:
    from prefect_kubernetes.workers import KubernetesWorker
except Exception:
    KubernetesWorker = None

from prefect.client.schemas.actions import WorkPoolCreate

_logger = logging.getLogger(__name__)


WORK_POOL_NAME = "fraio-docker-pool"
DEPLOYMENT_NAME = "fraio-trigger-flow"
MODE = os.getenv("RUNTIME_MODE", "local").lower()  # "local" | "prefect" | "k8s"


_setup_lock = asyncio.Lock()
_setup_done = False


async def _ensure_work_pool(mode: str, pool_name: str):
    <http://_logger.info|_logger.info>(f"Ensuring work pool {pool_name!r} for mode {mode!r}")
    async with get_client() as client:
        pools = await client.read_work_pools()
        if any(p.name == pool_name for p in pools):
            <http://_logger.info|_logger.info>(f"Work pool {pool_name!r} already exists")
            return
        match mode:
            case "prefect":
                if not DockerWorker:
                    raise RuntimeError("prefect[docker] not installed but MODE='prefect'")
                base_template = DockerWorker.get_default_base_job_template()
                worker_type = DockerWorker.type  # usually "docker"
            case "k8s":
                if not KubernetesWorker:
                    raise RuntimeError("prefect[kubernetes] not installed but MODE='k8s'")
                base_template = KubernetesWorker.get_default_base_job_template()
                worker_type = KubernetesWorker.type  # usually "kubernetes"
            case _:
                return

        await client.create_work_pool(
            work_pool=WorkPoolCreate(
                name=pool_name,
                type=worker_type,
                base_job_template=base_template,
            )
        )
        <http://_logger.info|_logger.info>(f"Created work pool {pool_name!r} of type {worker_type!r}")


def _ensure_deployment(mode: str) -> UUID | None:
    <http://_logger.info|_logger.info>(f"Ensuring deployment {DEPLOYMENT_NAME!r} for mode {mode!r}")
    match mode:
        case "local":
            return None
        case "prefect":
            return trigger_flow.deploy(
                name=DEPLOYMENT_NAME,
                work_pool_name=WORK_POOL_NAME,
                image="harbor.mgmt.vintecc.cloud/vintecc-ags-hbm/fraio_api:local",
                build=False,
                push=False,
            )  # pyright: ignore[reportReturnType]
        case "k8s":
            # For Kubernetes, you can pass the worker job variables explicitly.
            # If you're not using Prefect to build/push an image, skip the image= arg
            # and define it in job_variables:
            return trigger_flow.deploy(
                name=DEPLOYMENT_NAME,
                work_pool_name=WORK_POOL_NAME,
                job_variables={
                    "image": "harbor.mgmt.vintecc.cloud/vintecc-ags-hbm/fraio_api:local",
                    # add k8s specifics if needed:
                    # "namespace": "default",
                    # "service_account_name": "prefect-worker",
                    # "env": [{"name": "FOO", "value": "bar"}],
                    # "labels": {"app": "prefect"},
                    # "image_pull_secrets": ["your-secret"],
                },
                build=False,  # set True if you want Prefect to build via DockerImage
                push=False,
            )  # pyright: ignore[reportReturnType]
    raise ValueError(f"Unknown mode: {mode}")


async def _run_via_deployment(flow_name: str, deployment_name: str) -> UUID:
    async with get_client() as client:
        dep = await client.read_deployment_by_name(f"{flow_name}/{deployment_name}")
        fr = await client.create_flow_run_from_deployment(dep.id)
        <http://_logger.info|_logger.info>(f"Created flow run: {fr.id}")
        return fr.id


async def setup_prefect():
    <http://_logger.info|_logger.info>(f"Running Prefect in '{MODE}' mode")
    global _setup_done
    if _setup_done:
        return
    async with _setup_lock:
        if _setup_done:
            return
        await _ensure_work_pool(MODE, WORK_POOL_NAME)
        _ensure_deployment(MODE)
        _setup_done = True
        <http://_logger.info|_logger.info>("Prefect setup completed")


async def run_trigger_flow():
    if MODE == "local":
        return await trigger_flow()
    await setup_prefect()
    return await _run_via_deployment("trigger-flow", DEPLOYMENT_NAME)
m
thought for 8.0 seconds
Thanks for sharing — I’ll wait for the run results. Quick preflight checklist to avoid the common gotchas when you switch to MODE="prefect": - Start the worker with the correct type and extras installed - Install in the worker image:
pip install "prefect[docker]"
- Start with:
prefect worker start --pool fraio-docker-pool --type docker
- Docs: https://prefecthq.github.io/prefect-docker/ - Ensure API connectivity from both containers -
PREFECT_API_URL
and (if Cloud)
PREFECT_API_KEY
set for both the API container and the worker container - Image accessibility - Your worker must be able to pull
harbor.mgmt.vintecc.cloud/vintecc-ags-hbm/fraio_api:local
(registry auth, network) - Flow name vs deployment lookup - You’re using
read_deployment_by_name("trigger-flow/fraio-trigger-flow")
— confirm your
@flow(name="trigger-flow")
matches the prefix All set — send the logs/results when ready and we’ll iterate.
k
@Marvin hmmm, it feels as if setup is never ran, so I'm still stuck at the setup_prefect - I'll debug that first, I guess
Copy code
@asynccontextmanager
async def lifespan(app: connexion.AsyncApp):
    print("Starting up…")
    logging.getLogger("fraio_orchestrator.fraio_trigger").info("Starting up…")
    await setup_prefect()
    yield
    print("Cleaning up…")
m
thought for 24.5 seconds
Good call to instrument it. If your “Starting up…” print isn’t showing, the lifespan function likely isn’t attached or Uvicorn isn’t running lifespan. Bind the lifespan to your app - Connexion AsyncApp supports a lifespan callable via its constructor:
Copy code
from contextlib import asynccontextmanager
import connexion
from connexion.resolver import MethodResolver

@asynccontextmanager
async def lifespan(app: connexion.AsyncApp):
    print("Starting up…")
    logging.getLogger("fraio_orchestrator.fraio_trigger").info("Starting up…")
    await setup_prefect()
    yield
    print("Cleaning up…")

cnx = connexion.AsyncApp(
    __name__,
    specification_dir=".",
    resolver=MethodResolver("operation_handlers"),
    lifespan=lifespan,   # <-- attach here
)

cnx.add_api("openapi.yaml", ...)
app = cnx  # point uvicorn at this
- Alternatively, use the Starlette wrapper we discussed earlier:
Copy code
from starlette.applications import Starlette

@asynccontextmanager
async def lifespan(_):
    print("Starting up…")
    await setup_prefect()
    yield
    print("Cleaning up…")

cnx = connexion.AsyncApp(__name__, specification_dir=".", resolver=MethodResolver("operation_handlers"))
cnx.add_api("openapi.yaml", ...)

star = Starlette(lifespan=lifespan)
star.mount("/", cnx)
app = star
Ensure Uvicorn runs the lifespan hook - Run Uvicorn against
app
and, if needed, force lifespan on:
Copy code
uvicorn your_module:app --lifespan on
By default it’s “auto”; flipping it to “on” helps if auto-detection misses your implementation. If “Starting up…” prints but it hangs at await setup_prefect() - Add granular logs inside setup_prefect to pinpoint the wait:
Copy code
async def setup_prefect():
    <http://_logger.info|_logger.info>("setup_prefect: start")
    async with _setup_lock:
        <http://_logger.info|_logger.info>("setup_prefect: acquired lock")
        await _ensure_work_pool(MODE, WORK_POOL_NAME)
        <http://_logger.info|_logger.info>("setup_prefect: ensured work pool")
        _ensure_deployment(MODE)
        <http://_logger.info|_logger.info>("setup_prefect: ensured deployment")
        <http://_logger.info|_logger.info>("setup_prefect: done")
- The most common hang is API connectivity. From the API container, verify: - PREFECT_API_URL (and PREFECT_API_KEY if using Cloud) are set - You can reach the endpoint (e.g., curl http://prefect-server:4200/api/health) - If the worker is also creating the pool “on the fly” with the wrong type, fix the worker command: - Docker worker image: install extras: pip install "prefect[docker]" - Start with explicit type: prefect worker start --pool fraio-docker-pool --type docker Once you hook the lifespan correctly and see “Starting up…”, if it still blocks, paste the added setup_prefect logs and we’ll zero in on where it’s waiting.