Michiel
07/10/2025, 7:40 AMMarvin
07/10/2025, 7:40 AMMarvin
07/10/2025, 7:41 AMpython
from prefect.deployments import run_deployment
# Submit the flow run
flow_run = run_deployment(
name="your-deployment/your-deployment-name",
parameters={"param1": "value1"} # optional
)
# Get the flow run ID
flow_run_id = flow_run.id
The run_deployment
function submits the flow run to be executed and returns immediately with a FlowRun
object, making it non-blocking. You can use the returned ID to track or query the flow run's status later.
If you need to wait for the flow run to complete at some point, you can use the async version:
python
from prefect.deployments import run_deployment
import asyncio
async def submit_and_wait():
flow_run = await run_deployment(
name="your-deployment/your-deployment-name",
timeout=300 # optional timeout in seconds
)
return flow_run.id
You can find more details about flow run creation in the API documentation.Michiel
07/10/2025, 7:41 AMMarvin
07/10/2025, 7:42 AMMarvin
07/10/2025, 7:42 AMpython
from prefect import flow
from prefect.flow_engine import run_flow_async
@flow
def my_flow(x: int):
return x + 1
# Submit the flow run asynchronously
future = run_flow_async(my_flow, parameters={"x": 1})
# Get the flow run ID immediately
flow_run_id = future.run_id
If you're working in an async context, you can use it like this:
python
import asyncio
from prefect import flow
from prefect.flow_engine import run_flow_async
@flow
def my_flow(x: int):
return x + 1
async def main():
# Submit the flow run asynchronously
future = await run_flow_async(my_flow, parameters={"x": 1})
# Get the flow run ID
flow_run_id = future.run_id
# Optionally, if you want to wait for the result later:
result = await future.wait()
# Run the async function
asyncio.run(main())
This approach gives you the flexibility to:
1. Get the flow run ID immediately
2. Optionally wait for the result later using await future.wait()
3. Run multiple flows concurrently
You can find more details about running flows concurrently in the documentation.Michiel
07/10/2025, 7:47 AMasyncio
to achieve that?Marvin
07/10/2025, 7:48 AMMarvin
07/10/2025, 7:48 AMpython
from fastapi import FastAPI
from prefect import flow
from prefect.flow_engine import run_flow_async
import asyncio
from typing import Dict
app = FastAPI()
@flow
def my_processing_flow(data: Dict):
# Your flow logic here
return {"processed": data}
@app.post("/trigger-flow")
async def trigger_flow(data: Dict):
# Create a task that runs the flow
future = await run_flow_async(
my_processing_flow,
parameters={"data": data}
)
# Return immediately with the flow run ID
return {
"status": "Flow started",
"flow_run_id": future.run_id
}
# Optional endpoint to check flow status
@app.get("/flow-status/{flow_run_id}")
async def get_flow_status(flow_run_id: str):
from prefect.client import get_client
async with get_client() as client:
flow_run = await client.read_flow_run(flow_run_id)
return {"status": flow_run.state.type.value}
A few important points about this implementation:
1. The endpoint returns immediately with the flow run ID, allowing the client to track the flow's progress if needed.
2. You can optionally provide a status endpoint that clients can use to check the flow's progress.
3. If you need to run multiple flows in parallel or have more complex background task handling, you can use FastAPI's `BackgroundTasks`:
python
from fastapi import BackgroundTasks
@app.post("/trigger-multiple-flows")
async def trigger_multiple_flows(data: Dict, background_tasks: BackgroundTasks):
async def run_flow_wrapper():
await run_flow_async(my_processing_flow, parameters={"data": data})
# Add the flow execution to background tasks
background_tasks.add_task(run_flow_wrapper)
return {"status": "Flow queued for execution"}
4. If you need to handle errors or add logging, you can wrap the flow execution:
python
@app.post("/trigger-flow-with-handling")
async def trigger_flow_with_handling(data: Dict):
try:
future = await run_flow_async(
my_processing_flow,
parameters={"data": data}
)
return {
"status": "Flow started",
"flow_run_id": future.run_id
}
except Exception as e:
# Log the error appropriately
return {
"status": "error",
"message": str(e)
}
Remember that your FastAPI application needs to be run with an ASGI server that supports async operations, like Uvicorn:
bash
uvicorn app:app --workers 1
This setup allows you to handle HTTP requests quickly while offloading the potentially long-running flow execution to the background, providing a responsive API experience for your users.Michiel
07/10/2025, 8:10 AMrun_flow_async
doesn't work as you describe it. It still blocks. I could use background tasks but in that case I don't get back the flow run ID since it's fire and forget.Marvin
07/10/2025, 8:10 AMMarvin
07/10/2025, 8:11 AMrun_flow_async
function does indeed block until the flow completes, despite being async. Here's a better solution that will actually work non-blocking while still getting the flow run ID:
python
from fastapi import FastAPI
from prefect import flow
from prefect.client import get_client
import asyncio
from typing import Dict
app = FastAPI()
@flow
def my_processing_flow(data: Dict):
# Your flow logic here
return {"processed": data}
@app.post("/trigger-flow")
async def trigger_flow(data: Dict):
# Create the flow run first
async with get_client() as client:
flow_run = await client.create_flow_run(
flow=my_processing_flow,
parameters={"data": data},
)
# Start the flow run in the background
asyncio.create_task(
my_processing_flow._run(flow_run=flow_run)
)
return {
"status": "Flow started",
"flow_run_id": flow_run.id
}
# Optional endpoint to check flow status
@app.get("/flow-status/{flow_run_id}")
async def get_flow_status(flow_run_id: str):
async with get_client() as client:
flow_run = await client.read_flow_run(flow_run_id)
return {"status": flow_run.state.type.value}
This approach:
1. Creates the flow run first using the Prefect client
2. Gets the flow run ID immediately
3. Uses asyncio.create_task()
to start the flow execution in the background
4. Returns the flow run ID to the caller without blocking
The key differences from my previous incorrect response are:
- We're explicitly creating the flow run first
- We're using asyncio.create_task()
to handle the actual execution
- We're using the flow's internal _run
method with the pre-created flow run
This gives you true non-blocking behavior while still maintaining the ability to track the flow run through its ID.
Note: If you need to handle a large number of concurrent flows or want more control over the execution pool, you might want to consider using a proper task queue system like Celery or Redis Queue. However, for moderate loads, this solution should work well.Michiel
07/10/2025, 8:15 AM_run
method on flow objects. That doesn't exist: AttributeError: 'Flow' object has no attribute '_run'
Marvin
07/10/2025, 8:15 AMMarvin
07/10/2025, 8:16 AMpython
from fastapi import FastAPI
from prefect import flow
from prefect.client import get_client
import asyncio
from typing import Dict
app = FastAPI()
@flow
def my_processing_flow(data: Dict):
# Your flow logic here
return {"processed": data}
@app.post("/trigger-flow")
async def trigger_flow(data: Dict):
# Create the flow run first
async with get_client() as client:
# Create a flow run
flow_run = await client.create_flow_run(
flow=my_processing_flow,
parameters={"data": data},
)
# Start a background task to execute the flow
asyncio.create_task(
my_processing_flow._run(flow_run=flow_run)
)
return {
"status": "Flow started",
"flow_run_id": flow_run.id
}
# Optional endpoint to check flow status
@app.get("/flow-status/{flow_run_id}")
async def get_flow_status(flow_run_id: str):
async with get_client() as client:
flow_run = await client.read_flow_run(flow_run_id)
return {"status": flow_run.state.type.value}
This approach:
1. Creates the flow run first using the Prefect client
2. Uses asyncio.create_task()
to start the flow execution in the background
3. Returns the flow run ID immediately to the caller
4. Provides a way to check the status of the flow run later
A few important notes:
- This pattern works because we're properly using FastAPI's async capabilities along with Prefect's async client
- The flow run ID is available immediately because we create the run before starting execution
- The status endpoint allows clients to monitor the progress of their flow runs
- Make sure your FastAPI app is running with an ASGI server that supports async operations (like Uvicorn)
You can run this with:
bash
uvicorn app:app --workers 1
This is a more reliable pattern than using run_flow_async
or FastAPI's background tasks, as it gives you both immediate response and proper flow run tracking.