Kurt Sys (Vintecc)
09/15/2025, 8:23 PMWORK_POOL_NAME = "fraio-docker-pool"
DEPLOYMENT_NAME = "fraio-trigger-flow"
_logger = logging.getLogger(__name__)
<http://_logger.info|_logger.info>("Module loaded, setting up work pool and deployment...")
async def ensure_work_pool():
<http://_logger.info|_logger.info>("Getting Prefect client...")
async with get_client() as client:
<http://_logger.info|_logger.info>(f"client: {client}")
pools = await client.read_work_pools()
<http://_logger.info|_logger.info>(f"Existing work pools: {pools}")
if not any(p.name == WORK_POOL_NAME for p in pools):
<http://_logger.info|_logger.info>(f"Creating work pool {WORK_POOL_NAME!r}")
await client.create_work_pool(
work_pool=WorkPoolCreate(
name=WORK_POOL_NAME,
type="docker",
base_job_template={},
)
)
<http://_logger.info|_logger.info>(f"Created work pool {WORK_POOL_NAME!r}")
else:
<http://_logger.info|_logger.info>(f"Work pool {WORK_POOL_NAME!r} already exists")
async def ensure_deployment():
deployment = await trigger_flow.deploy(
name=DEPLOYMENT_NAME,
work_pool_name=WORK_POOL_NAME,
image="harbor.mgmt.vintecc.cloud/vintecc-ags-hbm/fraio_api:local",
build=False,
push=False,
) # pyright: ignore[reportGeneralTypeIssues]
<http://_logger.info|_logger.info>(f"Deployment: {deployment}")
return deployment
async def run_flow():
async with get_client() as client:
dep = await client.read_deployment_by_name(f"trigger-flow/{DEPLOYMENT_NAME}")
flow_run = await client.create_flow_run_from_deployment(dep.id)
<http://_logger.info|_logger.info>(f"Created flow run: {flow_run.id}")
return flow_run.id
_setup_done = asyncio.Event()
async def setup_once():
<http://_logger.info|_logger.info>("Setting up work pool...")
await ensure_work_pool()
<http://_logger.info|_logger.info>("Setting up deployment...")
await ensure_deployment()
_setup_done.set()
setup_flow = asyncio.create_task(setup_once())
setup_flow.add_done_callback(lambda _: <http://_logger.info|_logger.info>("Setup completed"))
async def run_trigger_flow():
await _setup_done.wait()
await trigger_flow()
return 5
Marvin
09/15/2025, 8:23 PMMarvin
09/15/2025, 8:27 PMpip install "prefect[docker]"
and/or pip install "prefect[kubernetes]"
. The worker classes are in prefect_docker
and prefect_kubernetes
respectively.
- When creating a work pool programmatically, pass the worker’s default base job template. Using {}
will lead to odd behavior later (worker config mismatches).
- You still need a worker process running against the pool for “prefect”/“k8s” modes. You can run workers via CLI or as sidecars/containers in your infra.
Can you share the exact error you’re seeing? In the meantime, here’s a cleaned-up, code-only pattern that should work.
Complete example with env-based switching
```
import os
import asyncio
import logging
from uuid import UUID
from prefect import flow, get_client
# Extras: pip install "prefect[docker]" and/or "prefect[kubernetes]"
try:
from prefect_docker.workers import DockerWorker # for "prefect" mode
except Exception:
DockerWorker = None
try:
from prefect_kubernetes.workers import KubernetesWorker # for "k8s" mode
except Exception:
KubernetesWorker = None
_logger = logging.getLogger(name)
WORK_POOL_NAME = "fraio-docker-pool"
DEPLOYMENT_NAME = "fraio-trigger-flow"
MODE = os.getenv("RUNTIME_MODE", "local").lower() # "local" | "prefect" | "k8s"
@flow(name="trigger-flow")
def trigger_flow():
_logger.info("Hello from trigger_flow")
return 5
async def ensure_work_pool(mode: str, pool_name: str):
async with get_client() as client:
pools = await client.read_work_pools()
if any(p.name == pool_name for p in pools):
_logger.info(f"Work pool {pool_name!r} already exists")
return
if mode == "prefect":
if not DockerWorker:
raise RuntimeError("prefect[docker] not installed but MODE='prefect'")
base_template = DockerWorker.get_default_base_job_template()
worker_type = DockerWorker.type # usually "docker"
elif mode == "k8s":
if not KubernetesWorker:
raise RuntimeError("prefect[kubernetes] not installed but MODE='k8s'")
base_template = KubernetesWorker.get_default_base_job_template()
worker_type = KubernetesWorker.type # usually "kubernetes"
else:
# local mode does not need a work pool
return
from prefect.client.schemas.actions import WorkPoolCreate
await client.create_work_pool(
work_pool=WorkPoolCreate(
name=pool_name,
type=worker_type,
base_job_template=base_template,
)
)
_logger.info(f"Created work pool {pool_name!r} of type {worker_type!r}")
def ensure_deployment(mode: str) -> UUID | None:
if mode == "local":
# No deployment required for local inline runs
return None
if mode == "prefect":
# For Docker workers, you can set image
directly. Set build/push as needed.
return trigger_flow.deploy(
name=DEPLOYMENT_NAME,
work_pool_name=WORK_POOL_NAME,
image="harbor.mgmt.vintecc.cloud/vintecc-ags-hbm/fraio_api:local",
build=False,
push=False,
# job_variables={} # add extra worker overrides if needed
)
if mode == "k8s":
# For Kubernetes, you can pass the worker job variables explicitly.
# If you’re not using Prefect to build/push an image, skip the image=
arg
# and define it in job_variables:
return trigger_flow.deploy(
name=DEPLOYMENT_NAME,
work_pool_name=WORK_POOL_NAME,Marvin
09/15/2025, 8:27 PMDockerWorker.get_default_base_job_template()
- Kubernetes: KubernetesWorker.get_default_base_job_template()
- Use job_variables to pass worker-specific fields (e.g., image
, namespace
for k8s). Passing an empty base_job_template={}
at pool creation can cause misconfigurations later.
- If you keep build=False, push=False
, make sure your worker can pull the specified image (already present in a registry or locally).
Running locally without queueing
- Just call the flow function directly (trigger_flow()
); this bypasses deployments/workers entirely. If you truly don’t want the run to be visible in Prefect at all, unset PREFECT_API_URL
(or run with a profile that points to None).
For “prefect” (Docker) and “k8s” modes
- Ensure a worker is running against the target pool so scheduled runs get picked up. Typically:
- Docker worker docs: https://prefecthq.github.io/prefect-docker/
- Kubernetes worker docs: https://prefecthq.github.io/prefect-kubernetes/
- You can run workers in containers/Pods as part of your stack; the important bit is they are alive and pointed at WORK_POOL_NAME
.
Docs and references
- Deployments in Prefect: https://docs.prefect.io/latest/concepts/deployments/
- Work pools/workers overview: https://docs.prefect.io/latest/concepts/work-pools/
- Docker collection: https://prefecthq.github.io/prefect-docker/
- Kubernetes collection: https://prefecthq.github.io/prefect-kubernetes/
If you can share the actual error you’re running into (stack trace or message), I’ll pinpoint the exact fix. My hunches are:
- awaiting Flow.deploy
- creating the pool with an empty base_job_template
- image not available to the worker
- reading the deployment with an incorrect flow/deployment name pair
- creating asyncio tasks at import-time without an event loop runningKurt Sys (Vintecc)
09/16/2025, 8:47 AMMarvin
09/16/2025, 8:52 AMMarvin
09/16/2025, 8:53 AMFlow.deploy(...)
is synchronous — do not await
it.
- Docker/Kubernetes workers are extras: install with pip install "prefect[docker]"
and/or pip install "prefect[kubernetes]"
.
- If you have the extras installed, you can provide the default base job template for the work pool; otherwise you can omit it and let the server default (or keep your current approach if it works).
Code you can drop into your module
```
import os
import asyncio
import logging
from uuid import UUID
from prefect import flow, get_client
# Optional extras (install as needed):
try:
from prefect_docker.workers import DockerWorker # MODE="prefect"
except Exception:
DockerWorker = None
try:
from prefect_kubernetes.workers import KubernetesWorker # MODE="k8s"
except Exception:
KubernetesWorker = None
from prefect.client.schemas.actions import WorkPoolCreate
_logger = logging.getLogger(name)
WORK_POOL_NAME = "fraio-docker-pool"
DEPLOYMENT_NAME = "fraio-trigger-flow"
MODE = os.getenv("RUNTIME_MODE", "local").lower() # "local" | "prefect" | "k8s"
@flow(name="trigger-flow")
def trigger_flow():
_logger.info("Hello from trigger_flow")
return 5
# Internal guards so setup runs once
_setup_lock = asyncio.Lock()
_setup_done = False
async def _ensure_work_pool(mode: str, pool_name: str):
if mode == "local":
return # no pool/worker needed
async with get_client() as client:
pools = await client.read_work_pools()
if any(p.name == pool_name for p in pools):
_logger.info(f"Work pool {pool_name!r} already exists")
return
# Pick worker type + default base job template if available
kwargs = {"name": pool_name}
if mode == "prefect":
worker_type = "docker"
kwargs["type"] = worker_type
if DockerWorker:
kwargs["base_job_template"] = DockerWorker.get_default_base_job_template()
elif mode == "k8s":
worker_type = "kubernetes"
kwargs["type"] = worker_type
if KubernetesWorker:
kwargs["base_job_template"] = KubernetesWorker.get_default_base_job_template()
else:
raise ValueError(f"Unknown mode: {mode}")
await client.create_work_pool(WorkPoolCreate(**kwargs))
_logger.info(f"Created work pool {pool_name!r} of type {kwargs['type']!r}")
def _ensure_deployment(mode: str) -> UUID | None:
if mode == "local":
return None # no deployment
if mode == "prefect":
# For Docker workers, let the worker pull this image
return trigger_flow.deploy(
name=DEPLOYMENT_NAME,
work_pool_name=WORK_POOL_NAME,
image="harbor.mgmt.vintecc.cloud/vintecc-ags-hbm/fraio_api:local",
build=False,
push=False,
)
if mode == "k8s":
# For Kubernetes workers, set job variables. If you don't use Prefect to build/push,
# specify the image here so the worker knows what to run.
return trigger_flow.deploy(
name=DEPLOYMENT_NAME,
work_pool_name=WORK_POOL_NAME,
job_variables={
"image": "your.registry/fraio_api:local",
# add other k8s settings if needed:
# "namespace": "default",
# "service_account_name": "prefect-worker",
# "labels": {"app": "prefect"},Marvin
09/16/2025, 8:53 AMawait setup_prefect()
once during application startup (e.g., FastAPI on_event("startup")
, Django async ready hook, etc.).
- Then call await run_trigger_flow()
wherever you need to trigger it.
- In MODE="local", run_trigger_flow()
returns the flow’s result. In MODE="prefect"/"k8s", it returns a flow run ID (since it runs remotely via a worker).
Why avoid tasks at import time
- asyncio.create_task(...)
at import time will fail if there’s no running loop and can create hard-to-debug race conditions. Deferring setup to an explicit setup_prefect()
and lazily calling it from run_trigger_flow()
avoids this.
Docs and references
- Deployments: https://docs.prefect.io/latest/concepts/deployments/
- Work pools and workers: https://docs.prefect.io/latest/concepts/work-pools/
- Docker worker collection: https://prefecthq.github.io/prefect-docker/
- Kubernetes worker collection: https://prefecthq.github.io/prefect-kubernetes/
If you share the specific error you hit before, I can tailor this further (e.g., name mismatches in read_deployment_by_name
, missing extras, or base job template issues).Kurt Sys (Vintecc)
09/16/2025, 9:10 AMimport asyncio
import logging
import os
from uuid import UUID
from fraio_orchestrator.fraio_trigger_flow import trigger_flow
from prefect import get_client
try:
from prefect_docker.workers import DockerWorker
except Exception:
DockerWorker = None
try:
from prefect_kubernetes.workers import KubernetesWorker
except Exception:
KubernetesWorker = None
from prefect.client.schemas.actions import WorkPoolCreate
_logger = logging.getLogger(__name__)
WORK_POOL_NAME = "fraio-docker-pool"
DEPLOYMENT_NAME = "fraio-trigger-flow"
MODE = os.getenv("RUNTIME_MODE", "local").lower() # "local" | "prefect" | "k8s"
_setup_lock = asyncio.Lock()
_setup_done = False
async def _ensure_work_pool(mode: str, pool_name: str):
async with get_client() as client:
pools = await client.read_work_pools()
if any(p.name == pool_name for p in pools):
<http://_logger.info|_logger.info>(f"Work pool {pool_name!r} already exists")
return
match mode:
case "prefect":
if not DockerWorker:
raise RuntimeError("prefect[docker] not installed but MODE='prefect'")
base_template = DockerWorker.get_default_base_job_template()
worker_type = DockerWorker.type # usually "docker"
case "k8s":
if not KubernetesWorker:
raise RuntimeError("prefect[kubernetes] not installed but MODE='k8s'")
base_template = KubernetesWorker.get_default_base_job_template()
worker_type = KubernetesWorker.type # usually "kubernetes"
case _:
return
await client.create_work_pool(
work_pool=WorkPoolCreate(
name=pool_name,
type=worker_type,
base_job_template=base_template,
)
)
<http://_logger.info|_logger.info>(f"Created work pool {pool_name!r} of type {worker_type!r}")
def _ensure_deployment(mode: str) -> UUID | None:
match mode:
case "local":
return None
case "prefect":
return trigger_flow.deploy(
name=DEPLOYMENT_NAME,
work_pool_name=WORK_POOL_NAME,
image="harbor.mgmt.vintecc.cloud/vintecc-ags-hbm/fraio_api:local",
build=False,
push=False,
) # pyright: ignore[reportReturnType]
case "k8s":
# For Kubernetes, you can pass the worker job variables explicitly.
# If you're not using Prefect to build/push an image, skip the image= arg
# and define it in job_variables:
return trigger_flow.deploy(
name=DEPLOYMENT_NAME,
work_pool_name=WORK_POOL_NAME,
job_variables={
"image": "harbor.mgmt.vintecc.cloud/vintecc-ags-hbm/fraio_api:local",
# add k8s specifics if needed:
# "namespace": "default",
# "service_account_name": "prefect-worker",
# "env": [{"name": "FOO", "value": "bar"}],
# "labels": {"app": "prefect"},
# "image_pull_secrets": ["your-secret"],
},
build=False, # set True if you want Prefect to build via DockerImage
push=False,
) # pyright: ignore[reportReturnType]
raise ValueError(f"Unknown mode: {mode}")
async def _run_via_deployment(flow_name: str, deployment_name: str) -> UUID:
async with get_client() as client:
dep = await client.read_deployment_by_name(f"{flow_name}/{deployment_name}")
fr = await client.create_flow_run_from_deployment(dep.id)
<http://_logger.info|_logger.info>(f"Created flow run: {fr.id}")
return fr.id
async def setup_prefect():
global _setup_done
if _setup_done:
return
async with _setup_lock:
if _setup_done:
return
await _ensure_work_pool(MODE, WORK_POOL_NAME)
_ensure_deployment(MODE)
_setup_done = True
<http://_logger.info|_logger.info>("Prefect setup completed")
async def run_trigger_flow():
if MODE == "local":
return trigger_flow()
await setup_prefect()
return await _run_via_deployment("trigger-flow", DEPLOYMENT_NAME)
setup_prefect_sync = asyncio.run(setup_prefect())
<http://_logger.info|_logger.info>("Prefect setup (sync) completed: {setup_prefect_sync}")
I get errors:
fraio-api | /usr/local/lib/python3.10/site-packages/uvicorn/lifespan/on.py:-1: RuntimeWarning: coroutine 'setup_prefect' was never awaited
fraio-api | RuntimeWarning: Enable tracemalloc to get the object allocation traceback
fraio-worker | Work pool 'fraio-docker-pool' does not exist and no worker type was provided.
fraio-worker | Starting a process worker...
fraio-worker | 09:04:38.673 | INFO | prefect.workers.process.processworker 2d3bd320-917d-472c-b1f3-9f28889e3f66 - Work pool 'fraio-docker-pool' created.
fraio-worker | Worker 'ProcessWorker 2d3bd320-917d-472c-b1f3-9f28889e3f66' started!
fraio-api | 09:05:05.046 | ERROR | uvicorn.error - Exception in ASGI application
...
fraio-api | File "/app/fraio_api/fraio_api/operation_handlers/optimizer/staging_horizon/trigger.py", line 5, in <module>
fraio-api | from fraio_orchestrator.fraio_trigger import run_trigger_flow
fraio-api | File "/app/fraio_orchestrator/src/fraio_orchestrator/fraio_trigger.py", line 124, in <module>
fraio-api | setup_prefect_sync = asyncio.run(setup_prefect())
fraio-api | File "/usr/local/lib/python3.10/asyncio/runners.py", line 33, in run
fraio-api | raise RuntimeError(
fraio-api | RuntimeError: asyncio.run() cannot be called from a running event loop
Marvin
09/16/2025, 9:10 AMimport asyncio import logging import os from uuid import UUID from fraio_orchestrator fraio_trigger_flow import trigger_flow from prefect import get_client try from prefect_docker workers import DockerWorker except Exception DockerWorker = None try from prefect_kubernetes workers import KubernetesWorker except Exception KubernetesWorker = None from prefect client schemas actions import WorkPoolCreate _logger = logging getLogger(__name__) WORK_POOL_NAME = fraio-docker-pool DEPLOYMENT_NAME = fraio-trigger-flow MODE = os getenv( RUNTIME_MODE local ) lower() # local | prefect | k8s _setup_lock = asyncio Lock() _setup_done = False async def _ensure_work_pool(mode str pool_name str) async with get_client() as client pools = await client read_work_pools() if any(p name == pool_name for p in pools) <http //_logger info|_logger info>(f Work pool {pool_name!r} already exists ) return match mode case prefect if not DockerWorker raise RuntimeError( prefect[docker] not installed but MODE='prefect' ) base_template = DockerWorker get_default_base_job_template() worker_type = DockerWorker type # usually docker case k8s if not KubernetesWorker raise RuntimeError( prefect[kubernetes] not installed but MODE='k8s' ) base_template = KubernetesWorker get_default_base_job_template() worker_type = KubernetesWorker type # usually kubernetes case _ return await client create_work_pool( work_pool=WorkPoolCreate( name=pool_name type=worker_type base_job_template=base_template ) ) <http //_logger info|_logger info>(f Created work pool {pool_name!r} of type {worker_type!r} ) def _ensure_deployment(mode str) -> UUID | None match mode case local return None case prefect return trigger_flow deploy( name=DEPLOYMENT_NAME work_pool_name=WORK_POOL_NAME image= harbor mgmt vintecc cloud/vintecc-ags-hbm/fraio_api local build=False push=False ) # pyright ignore[reportReturnType] case k8s # For Kubernetes you can pass the worker job variables explicitly # If you're not using Prefect to build/push an image skip the image= arg # and define it in job_variables return trigger_flow deploy( name=DEPLOYMENT_NAME work_pool_name=WORK_POOL_NAME job_variables={ image harbor mgmt vintecc cloud/vintecc-ags-hbm/fraio_api local # add k8s specifics if needed # namespace default # service_account_name prefect-worker # env [{ name FOO value bar }] # labels { app prefect } # image_pull_secrets [ your-secret ] } build=False # set True if you want Prefect to build via DockerImage push=False ) # pyright ignore[reportReturnType] raise ValueError(f Unknown mode {mode} ) async def _run_via_deployment(flow_name str deployment_name str) -> UUID async with get_client() as client dep = await client read_deployment_by_name(f {flow_name}/{deployment_name} ) fr = await client create_flow_run_from_deployment(dep id) <http //_logger info|_logger info>(f Created flow run {fr id} ) return fr id async def setup_prefect() global _setup_done if _setup_done return async with _setup_lock if _setup_done return await _ensure_work_pool(MODE WORK_POOL_NAME) _ensure_deployment(MODE) _setup_done = True <http //_logger info|_logger info>( Prefect setup completed ) async def run_trigger_flow() if MODE == local return trigger_flow() await setup_prefect() return await _run_via_deployment( trigger-flow DEPLOYMENT_NAME) setup_prefect_sync = asyncio run(setup_prefect()) <http //_logger info|_logger info>( Prefect setup (sync) completed {setup_prefect_sync} )
I get errors ```fraio-api | /usr/local/lib/python3 10/site-packages/uvicorn/lifespan/on py -1 RuntimeWarning coroutine 'setup_prefect' was never awaited fraio-api | RuntimeWarning Enable tracemalloc to get the object allocation traceback fraio-worker | Work pool 'fraio-docker-pool' does not exist and no worker type was provided fraio-worker | Starting a process worker fraio-worker | 09 04 38 673 | INFO | prefect workers process processworker 2d3bd320-917d-472c-b1f3-9f288Marvin
09/16/2025, 9:10 AMKurt Sys (Vintecc)
09/16/2025, 9:11 AMfraio-api | /usr/local/lib/python3.10/site-packages/uvicorn/lifespan/on.py:-1: RuntimeWarning: coroutine 'setup_prefect' was never awaited
fraio-api | RuntimeWarning: Enable tracemalloc to get the object allocation traceback
fraio-worker | Work pool 'fraio-docker-pool' does not exist and no worker type was provided.
fraio-worker | Starting a process worker...
fraio-worker | 09:04:38.673 | INFO | prefect.workers.process.processworker 2d3bd320-917d-472c-b1f3-9f28889e3f66 - Work pool 'fraio-docker-pool' created.
fraio-worker | Worker 'ProcessWorker 2d3bd320-917d-472c-b1f3-9f28889e3f66' started!
fraio-api | 09:05:05.046 | ERROR | uvicorn.error - Exception in ASGI application
...
fraio-api | from fraio_orchestrator.fraio_trigger import run_trigger_flow
fraio-api | File "/app/fraio_orchestrator/src/fraio_orchestrator/fraio_trigger.py", line 124, in <module>
fraio-api | setup_prefect_sync = asyncio.run(setup_prefect())
fraio-api | File "/usr/local/lib/python3.10/asyncio/runners.py", line 33, in run
fraio-api | raise RuntimeError(
fraio-api | RuntimeError: asyncio.run() cannot be called from a running event loop
fraio-api | /usr/local/lib/python3.10/site-packages/uvicorn/protocols/http/httptools_impl.py:-1: RuntimeWarning: coroutine 'setup_prefect' was never awaited
fraio-api | RuntimeWarning: Enable tracemalloc to get the object allocation traceback
The api is the one calling run_trigger_flow
, in my docker compose, I added a 'fraio-worker' (the one to be used by docker workers) and I setup the prefect module inside the module itself:
setup_prefect_sync = asyncio.run(setup_prefect())
_logger.info("Prefect setup (sync) completed: {setup_prefect_sync}")
Marvin
09/16/2025, 9:12 AMMarvin
09/16/2025, 9:13 AMasyncio.run(...)
at import time will fail.
- Fix: remove this line from your module:
setup_prefect_sync = asyncio.run(setup_prefect())
Instead, either:
- Await setup in your app’s startup hook (recommended), or
- Lazily await setup inside your request handler before triggering the flow.
For FastAPI examples:
app = FastAPI()
@app.on_event("startup")
async def startup():
await setup_prefect()
or with lifespan:
@asynccontextmanager
async def lifespan(app):
await setup_prefect()
yield
app = FastAPI(lifespan=lifespan)
If you don’t have startup hooks available, just rely on your run_trigger_flow()
to call await setup_prefect()
internally (your earlier version already did that).
2) Your worker started the wrong type and created a process pool
- Log: “Work pool 'fraio-docker-pool' does not exist and no worker type was provided. Starting a process worker... Work pool 'fraio-docker-pool' created.”
- Cause: the worker was started without --type docker
, so it created a process-type pool.
- Fix: either pre-create the pool as Docker in your setup code, or start the worker with an explicit type so it creates the right pool if missing.
Recommended options
- Pre-create via code (ensure extras installed):
- Install Docker extra in the worker image: pip install "prefect[docker]"
- Use the worker’s default base job template when creating the pool:
from prefect.client.schemas.actions import WorkPoolCreate
from prefect_docker.workers import DockerWorker
async with get_client() as client:
pools = await client.read_work_pools()
if not any(p.name == WORK_POOL_NAME for p in pools):
await client.create_work_pool(
WorkPoolCreate(
name=WORK_POOL_NAME,
type=DockerWorker.type, # "docker"
base_job_template=DockerWorker.get_default_base_job_template(),
)
)
- Or start the worker with an explicit type so it creates the right pool automatically if it’s missing:
prefect worker start --pool fraio-docker-pool --type docker
Make sure the container has the docker extra installed and Docker access.
Minimal fixes to your module
- Do not call asyncio.run or create tasks at import time.
- Let run_trigger_flow()
do lazy setup and then run:
async def setup_prefect():
# create docker/k8s pool if MODE != local
# deploy flow if MODE != local
...
async def run_trigger_flow():
# In local mode, run inline; otherwise, ensure setup and trigger a deployment run
if MODE == "local":
return trigger_flow()
await setup_prefect()
return await _run_via_deployment("trigger-flow", DEPLOYMENT_NAME)
Compose/worker tips
- Start your docker worker with the docker extra and explicit type:
- Install: pip install "prefect[docker]"
- Command:
- prefect worker start --pool fraio-docker-pool --type docker
- If you’re using Kubernetes instead, install prefect[kubernetes]
and use --type kubernetes
.
Why this resolves your errors
- No async at import-time: avoids “never awaited” and “cannot be called from a running event loop”
- Explicit worker type: prevents accidental creation of a process pool
- Using the worker’s default base job template: avoids “empty base_job_template” mismatches
Docs for reference
- Work pools and workers: https://docs.prefect.io/latest/concepts/work-pools/
- Deployments: https://docs.prefect.io/latest/concepts/deployments/
- Docker worker: https://prefecthq.github.io/prefect-docker/
- Kubernetes worker: https://prefecthq.github.io/prefect-kubernetes/
If you can share:
- Are you using FastAPI (or another ASGI framework)?Marvin
09/16/2025, 9:13 AMKurt Sys (Vintecc)
09/16/2025, 9:16 AMapp = connexion.AsyncApp(__name__, specification_dir=".", resolver=MethodResolver("operation_handlers"))
how to add a hook here (for the setup)?Marvin
09/16/2025, 9:16 AMMarvin
09/16/2025, 9:17 AM.app
- Register a Starlette startup event that awaits your async setup
- Point Uvicorn at the ASGI app (not the Connexion wrapper)
Example:
import connexion
from connexion.resolver import MethodResolver
from your_module import setup_prefect # your async setup function
cnx = connexion.AsyncApp(__name__, specification_dir=".", resolver=MethodResolver("operation_handlers"))
cnx.add_api("openapi.yaml")
# Expose the underlying ASGI app for Uvicorn
app = <http://cnx.app|cnx.app>
@app.on_event("startup")
async def startup():
await setup_prefect()
Run Uvicorn against the ASGI app:
- If your module is main.py
and contains the code above, start with:
uvicorn main:app
Notes:
- Don’t call asyncio.run(setup_prefect())
at import time; the startup hook will run it inside Uvicorn’s event loop.
- If you previously pointed Uvicorn at the Connexion wrapper (e.g., uvicorn main:cnx
), switch to the underlying ASGI app (main:app
) so the startup hook executes.Kurt Sys (Vintecc)
09/16/2025, 9:26 AMcnx = connexion.AsyncApp(__name__, specification_dir=".", resolver=MethodResolver("operation_handlers"))
options = SwaggerUIOptions(spec_path="/openapi.yaml", swagger_ui=True, swagger_ui_path="/docs")
cnx.add_api(
"openapi.yaml",
strict_validation=True,
# validate_responses=True,
pythonic_params=True,
swagger_ui_options=options,
security_handlers={"apiKeyAuth": api_key_auth, "jwtAuth": bearer_auth},
)
app = <http://cnx.app|cnx.app>
gives:
Cannot access attribute "app" for class "AsyncApp"
Attribute "app" is unknownPylancereportAttributeAccessIssue
(function) app: Unknown
Marvin
09/16/2025, 9:26 AMMarvin
09/16/2025, 9:27 AM.app
at runtime. Pylance doesn’t know about it, so you’ll see a type warning you can safely ignore/suppress.
- Add your async setup to a Starlette “startup” event so it runs inside Uvicorn’s event loop (no asyncio.run at import time).
Example:
import connexion
from connexion.resolver import MethodResolver
from connexion.options import SwaggerUIOptions
from your_module import setup_prefect # your async setup function
cnx = connexion.AsyncApp(__name__, specification_dir=".", resolver=MethodResolver("operation_handlers"))
options = SwaggerUIOptions(spec_path="/openapi.yaml", swagger_ui=True, swagger_ui_path="/docs")
cnx.add_api(
"openapi.yaml",
strict_validation=True,
pythonic_params=True,
swagger_ui_options=options,
security_handlers={"apiKeyAuth": api_key_auth, "jwtAuth": bearer_auth},
)
# Underlying Starlette app (Pylance doesn't know this attribute, but it exists at runtime)
app = <http://cnx.app|cnx.app> # pyright: ignore[reportAttributeAccessIssue]
@app.on_event("startup")
async def _startup():
await setup_prefect()
Run Uvicorn against the ASGI app:
uvicorn your_module:app
If you prefer not to use the decorator, you can register the handler explicitly:
app.add_event_handler("startup", _startup)
And make sure to remove any asyncio.run(setup_prefect())
or background tasks created at import time from your module — the startup hook will handle it.Kurt Sys (Vintecc)
09/16/2025, 9:29 AMfraio-api | File "/app/fraio_api/fraio_api/app.py", line 36, in <module>
fraio-api | app = <http://cnx.app|cnx.app> # pyright: ignore[reportAttributeAccessIssue]
fraio-api | AttributeError: 'AsyncApp' object has no attribute 'app'
Marvin
09/16/2025, 9:29 AMMarvin
09/16/2025, 9:30 AM.app
on AsyncApp. No problem; two working patterns:
Option A: Register a startup handler directly on the AsyncApp (if it exposes Starlette-style APIs)
Try this first — many Connexion versions forward Starlette APIs to the AsyncApp
instance:
cnx = connexion.AsyncApp(__name__, specification_dir=".", resolver=MethodResolver("operation_handlers"))
cnx.add_api(
"openapi.yaml",
strict_validation=True,
pythonic_params=True,
swagger_ui_options=options,
security_handlers={"apiKeyAuth": api_key_auth, "jwtAuth": bearer_auth},
)
async def _startup():
await setup_prefect()
# If available on your version:
cnx.add_event_handler("startup", _startup) # or: cnx.on_event("startup")(_startup)
# This is your ASGI app for uvicorn
app = cnx
Option B (version-agnostic): Wrap with a Starlette app that defines a lifespan
If A doesn’t work, mount the Connexion app into a small Starlette app that runs your setup during startup:
from contextlib import asynccontextmanager
from starlette.applications import Starlette
cnx = connexion.AsyncApp(__name__, specification_dir=".", resolver=MethodResolver("operation_handlers"))
cnx.add_api(
"openapi.yaml",
strict_validation=True,
pythonic_params=True,
swagger_ui_options=options,
security_handlers={"apiKeyAuth": api_key_auth, "jwtAuth": bearer_auth},
)
@asynccontextmanager
async def lifespan(_):
await setup_prefect()
yield
star = Starlette(lifespan=lifespan)
# Mount the Connexion ASGI app at the root
star.mount("/", cnx)
# This is your ASGI app for uvicorn
app = star
Then run:
uvicorn your_module:app
Important reminders
- Remove any asyncio.run(setup_prefect())
or background tasks at import time.
- Start your Prefect worker with the correct type so the pool is the right kind:
- Docker: prefect worker start --pool fraio-docker-pool --type docker
- Kubernetes: prefect worker start --pool fraio-k8s-pool --type kubernetes
Kurt Sys (Vintecc)
09/16/2025, 9:45 AMfraio-worker | Work pool 'fraio-docker-pool' does not exist and no worker type was provided.
fraio-worker | Starting a process worker...
fraio-worker | 09:43:20.707 | INFO | prefect.workers.process.processworker a6f64a89-e59f-40a0-bc43-9d989d8a3acf - Work pool 'fraio-docker-pool' created.
fraio-worker | Worker 'ProcessWorker a6f64a89-e59f-40a0-bc43-9d989d8a3acf' started!
fraio-api | /app/fraio_api/fraio_api/operation_handlers/optimizer/staging_horizon/trigger.py:9: RuntimeWarning: coroutine 'run_flow_async' was never awaited
fraio-api | await run_trigger_flow()
fraio-api | RuntimeWarning: Enable tracemalloc to get the object allocation traceback
This is really weird, because there is no run_flow_async
in my code anywhere (used grep).Marvin
09/16/2025, 9:45 AMMarvin
09/16/2025, 9:47 AMimport inspect
async def run_trigger_flow():
if MODE == "local":
# Run inline (no queues/workers)
if inspect.iscoroutinefunction(trigger_flow.fn):
return await trigger_flow() # async flow
else:
return trigger_flow() # sync flow
await setup_prefect()
# Requires a running worker
return await _run_via_deployment("trigger-flow", DEPLOYMENT_NAME)
Why this helps
- If your @flow
is defined as async def trigger_flow(...): ...
, calling trigger_flow()
returns a coroutine that Prefect’s engine will run via an internal coroutine (run_flow_async
). If you don’t await it, Python warns that the coroutine was never awaited.
Double-check your worker startup too
Your logs show a process worker created your pool:
- “Work pool 'fraio-docker-pool' does not exist and no worker type was provided. Starting a process worker...”
Start the worker with an explicit type so the correct pool type is used:
prefect worker start --pool fraio-docker-pool --type docker
(and ensure the docker extra is installed in that container: pip install "prefect[docker]")
If you still hit the warning after the patch, please share:
- How trigger_flow
is defined (async def
vs def
)
- The exact run_trigger_flow
function body you’re using now
- Where you call await run_trigger_flow()
from (confirm it’s inside an async function)
Refs
- Flows: https://docs.prefect.io/latest/concepts/flows/
- Work pools/workers: https://docs.prefect.io/latest/concepts/work-pools/Kurt Sys (Vintecc)
09/16/2025, 9:58 AMprefect
mode. I'll just post you the code, so you're up to data, but you don't need to react on it yet. I'll add the result of running it afterwards:
import asyncio
import logging
import os
from uuid import UUID
from fraio_orchestrator.fraio_trigger_flow import trigger_flow
from prefect import get_client
try:
from prefect_docker.workers import DockerWorker
except Exception:
DockerWorker = None
try:
from prefect_kubernetes.workers import KubernetesWorker
except Exception:
KubernetesWorker = None
from prefect.client.schemas.actions import WorkPoolCreate
_logger = logging.getLogger(__name__)
WORK_POOL_NAME = "fraio-docker-pool"
DEPLOYMENT_NAME = "fraio-trigger-flow"
MODE = os.getenv("RUNTIME_MODE", "local").lower() # "local" | "prefect" | "k8s"
_setup_lock = asyncio.Lock()
_setup_done = False
async def _ensure_work_pool(mode: str, pool_name: str):
<http://_logger.info|_logger.info>(f"Ensuring work pool {pool_name!r} for mode {mode!r}")
async with get_client() as client:
pools = await client.read_work_pools()
if any(p.name == pool_name for p in pools):
<http://_logger.info|_logger.info>(f"Work pool {pool_name!r} already exists")
return
match mode:
case "prefect":
if not DockerWorker:
raise RuntimeError("prefect[docker] not installed but MODE='prefect'")
base_template = DockerWorker.get_default_base_job_template()
worker_type = DockerWorker.type # usually "docker"
case "k8s":
if not KubernetesWorker:
raise RuntimeError("prefect[kubernetes] not installed but MODE='k8s'")
base_template = KubernetesWorker.get_default_base_job_template()
worker_type = KubernetesWorker.type # usually "kubernetes"
case _:
return
await client.create_work_pool(
work_pool=WorkPoolCreate(
name=pool_name,
type=worker_type,
base_job_template=base_template,
)
)
<http://_logger.info|_logger.info>(f"Created work pool {pool_name!r} of type {worker_type!r}")
def _ensure_deployment(mode: str) -> UUID | None:
<http://_logger.info|_logger.info>(f"Ensuring deployment {DEPLOYMENT_NAME!r} for mode {mode!r}")
match mode:
case "local":
return None
case "prefect":
return trigger_flow.deploy(
name=DEPLOYMENT_NAME,
work_pool_name=WORK_POOL_NAME,
image="harbor.mgmt.vintecc.cloud/vintecc-ags-hbm/fraio_api:local",
build=False,
push=False,
) # pyright: ignore[reportReturnType]
case "k8s":
# For Kubernetes, you can pass the worker job variables explicitly.
# If you're not using Prefect to build/push an image, skip the image= arg
# and define it in job_variables:
return trigger_flow.deploy(
name=DEPLOYMENT_NAME,
work_pool_name=WORK_POOL_NAME,
job_variables={
"image": "harbor.mgmt.vintecc.cloud/vintecc-ags-hbm/fraio_api:local",
# add k8s specifics if needed:
# "namespace": "default",
# "service_account_name": "prefect-worker",
# "env": [{"name": "FOO", "value": "bar"}],
# "labels": {"app": "prefect"},
# "image_pull_secrets": ["your-secret"],
},
build=False, # set True if you want Prefect to build via DockerImage
push=False,
) # pyright: ignore[reportReturnType]
raise ValueError(f"Unknown mode: {mode}")
async def _run_via_deployment(flow_name: str, deployment_name: str) -> UUID:
async with get_client() as client:
dep = await client.read_deployment_by_name(f"{flow_name}/{deployment_name}")
fr = await client.create_flow_run_from_deployment(dep.id)
<http://_logger.info|_logger.info>(f"Created flow run: {fr.id}")
return fr.id
async def setup_prefect():
<http://_logger.info|_logger.info>(f"Running Prefect in '{MODE}' mode")
global _setup_done
if _setup_done:
return
async with _setup_lock:
if _setup_done:
return
await _ensure_work_pool(MODE, WORK_POOL_NAME)
_ensure_deployment(MODE)
_setup_done = True
<http://_logger.info|_logger.info>("Prefect setup completed")
async def run_trigger_flow():
if MODE == "local":
return await trigger_flow()
await setup_prefect()
return await _run_via_deployment("trigger-flow", DEPLOYMENT_NAME)
Marvin
09/16/2025, 9:58 AMMarvin
09/16/2025, 9:58 AMpip install "prefect[docker]"
- Start with: prefect worker start --pool fraio-docker-pool --type docker
- Docs: https://prefecthq.github.io/prefect-docker/
- Ensure API connectivity from both containers
- PREFECT_API_URL
and (if Cloud) PREFECT_API_KEY
set for both the API container and the worker container
- Image accessibility
- Your worker must be able to pull harbor.mgmt.vintecc.cloud/vintecc-ags-hbm/fraio_api:local
(registry auth, network)
- Flow name vs deployment lookup
- You’re using read_deployment_by_name("trigger-flow/fraio-trigger-flow")
— confirm your @flow(name="trigger-flow")
matches the prefix
All set — send the logs/results when ready and we’ll iterate.Kurt Sys (Vintecc)
09/16/2025, 10:01 AM@asynccontextmanager
async def lifespan(app: connexion.AsyncApp):
print("Starting up…")
logging.getLogger("fraio_orchestrator.fraio_trigger").info("Starting up…")
await setup_prefect()
yield
print("Cleaning up…")
Marvin
09/16/2025, 10:02 AMMarvin
09/16/2025, 10:02 AMfrom contextlib import asynccontextmanager
import connexion
from connexion.resolver import MethodResolver
@asynccontextmanager
async def lifespan(app: connexion.AsyncApp):
print("Starting up…")
logging.getLogger("fraio_orchestrator.fraio_trigger").info("Starting up…")
await setup_prefect()
yield
print("Cleaning up…")
cnx = connexion.AsyncApp(
__name__,
specification_dir=".",
resolver=MethodResolver("operation_handlers"),
lifespan=lifespan, # <-- attach here
)
cnx.add_api("openapi.yaml", ...)
app = cnx # point uvicorn at this
- Alternatively, use the Starlette wrapper we discussed earlier:
from starlette.applications import Starlette
@asynccontextmanager
async def lifespan(_):
print("Starting up…")
await setup_prefect()
yield
print("Cleaning up…")
cnx = connexion.AsyncApp(__name__, specification_dir=".", resolver=MethodResolver("operation_handlers"))
cnx.add_api("openapi.yaml", ...)
star = Starlette(lifespan=lifespan)
star.mount("/", cnx)
app = star
Ensure Uvicorn runs the lifespan hook
- Run Uvicorn against app
and, if needed, force lifespan on:
uvicorn your_module:app --lifespan on
By default it’s “auto”; flipping it to “on” helps if auto-detection misses your implementation.
If “Starting up…” prints but it hangs at await setup_prefect()
- Add granular logs inside setup_prefect to pinpoint the wait:
async def setup_prefect():
<http://_logger.info|_logger.info>("setup_prefect: start")
async with _setup_lock:
<http://_logger.info|_logger.info>("setup_prefect: acquired lock")
await _ensure_work_pool(MODE, WORK_POOL_NAME)
<http://_logger.info|_logger.info>("setup_prefect: ensured work pool")
_ensure_deployment(MODE)
<http://_logger.info|_logger.info>("setup_prefect: ensured deployment")
<http://_logger.info|_logger.info>("setup_prefect: done")
- The most common hang is API connectivity. From the API container, verify:
- PREFECT_API_URL (and PREFECT_API_KEY if using Cloud) are set
- You can reach the endpoint (e.g., curl http://prefect-server:4200/api/health)
- If the worker is also creating the pool “on the fly” with the wrong type, fix the worker command:
- Docker worker image: install extras: pip install "prefect[docker]"
- Start with explicit type: prefect worker start --pool fraio-docker-pool --type docker
Once you hook the lifespan correctly and see “Starting up…”, if it still blocks, paste the added setup_prefect logs and we’ll zero in on where it’s waiting.