Veeramanikandan Shanmugam
08/12/2025, 10:20 AMMarvin
08/12/2025, 10:20 AMMarvin
08/12/2025, 10:23 AM{
"source_flow_run_gri": "{{ event.resource.id }}",
"source_flow_run_id": "{{ event.resource.id | replace('prefect.flow-run.', '') }}",
"source_flow_run_name": "{{ event.payload.flow_run_name }}",
"source_flow_name": "{{ event.payload.flow_name }}",
"source_state": "{{ event.payload.state }}",
"source_state_type": "{{ event.payload.state_type }}",
"source_tags": {{ event.payload.tags | tojson }}
}
Tip: If you don’t want to guess fields, just pass the whole event and parse it in your tracking flow:
{ "trigger_event": {{ event | tojson }} }
Then in your tracking flow:
from typing import Any, Dict
from prefect import flow
@flow
def track_scheduler(
source_flow_run_id: str | None = None,
source_flow_run_name: str | None = None,
source_flow_name: str | None = None,
source_state: str | None = None,
source_state_type: str | None = None,
source_tags: list[str] | None = None,
trigger_event: Dict[str, Any] | None = None,
):
# do your tracking here
...
Notes
- event.resource.id is the GRI (e.g., prefect.flow-run.<uuid>). Strip the prefix if you want just the UUID (as shown with replace).
- Use the Automation “Test”/“Preview” to verify the templated values.
- This assumes Prefect 3.x. If you’re on 2.x, the approach is the same, but you should really consider upgrading.
If you want, tell me which specific fields you need and I’ll give you a minimal parameter set.Veeramanikandan Shanmugam
08/12/2025, 10:35 AMVeeramanikandan Shanmugam
08/12/2025, 10:35 AMMarvin
08/12/2025, 10:35 AMMarvin
08/12/2025, 10:38 AMprefect.tags(...)) are applied at runtime, after the flow run is already created. That’s why your auto-scheduled runs don’t show them while they’re Scheduled/Pending.
To have tags on every flow run at creation time (including scheduled ones), set tags on the deployment or on the run request:
Options
- Deployment-level tags (recommended)
- In Python when creating the deployment:
from prefect import flow
@flow
def my_flow():
...
my_flow.from_source(
"<https://github.com/org/repo>",
entrypoint="flows/my_flow.py:my_flow",
).deploy(
name="prod",
work_pool_name="my-pool",
tags=["scheduler", "prod"]
)
- Or in prefect.yaml:
deployments:
- name: prod
flow_name: my_flow
entrypoint: flows/my_flow.py:my_flow
work_pool: { name: my-pool }
tags: ["scheduler", "prod"]
- When triggering runs programmatically:
from prefect.deployments import run_deployment
run_deployment("my-flow/prod", tags=["scheduler"])
- In Automations “Run deployment” action:
- Set the “Tags” field to ["scheduler"] (you can also template it if you like).
Summary: put the tag on the deployment (or the run request), not just in-flow code, and it will appear on all flow runs, including auto-scheduled ones. If you share your current deployment config, I’ll point to exactly where to add tags.Veeramanikandan Shanmugam
08/12/2025, 11:12 AMMarvin
08/12/2025, 11:12 AMMarvin
08/12/2025, 11:14 AMprefect.tag (ids like prefect.tag.scheduler). If you’re hitting the API directly, use those resource ids.
- If your UI doesn’t show an “Any of” operator, use a Composite trigger with two sub-triggers (one for each tag) and set it to “Any”/OR. Or, in a pinch, create two identical automations—one per tag.
- Make sure your runs actually have the tags at creation time (set tags on the deployment or in the “Run deployment” action), otherwise you’ll be filtering on vibes.Veeramanikandan Shanmugam
08/13/2025, 5:28 AMMarvin
08/13/2025, 5:28 AMMarvin
08/13/2025, 5:34 AM{
"name": "Run deployment when tag is scheduler or alerts",
"description": "Listens for any flow-run event that has tags scheduler or alerts and runs a helper deployment.",
"enabled": true,
"posture": "Reactive",
"trigger": {
"type": "event",
"match": {
"all": [
{
"resource": {
"role": "flow-run"
}
},
{
"related": {
"any": [
{
"id": "prefect.tag.scheduler"
},
{
"id": "prefect.tag.alerts"
}
]
}
}
]
}
},
"actions": [
{
"type": "run-deployment",
"deployment_id": "<YOUR_DEPLOYMENT_ID>",
"parameters": {
"trigger_event": "{{ event }}",
"trigger_event_name": "{{ event.event }}",
"triggered_flow_run_resource_id": "{{ event.resource.id }}"
}
}
]
}
Notes
- The trigger filters:
- resource.role = flow-run scops this to flow-run events.
- related.any for prefect.tag.scheduler or prefect.tag.alerts matches runs that carry either tag.
- The action parameters use templating:
- {{ event }} injects the full event object as JSON into your downstream flow’s parameters.
- You can also cherry-pick fields, e.g. {{ event.resource.id }} or {{ event.event }}.
- If you want to restrict to specific event names (e.g., only state changes), add an EventName filter into match (commonly a prefix match like prefect.flow-run.)—but the above is enough for “any flow-run event with those tags”.
cURL request (Cloud example)
curl -X POST \
"<https://api.prefect.cloud/api/accounts/<ACCOUNT_ID>/workspaces/<WORKSPACE_ID>/automations/automations>" \
-H "Authorization: Bearer <PREFECT_API_KEY>" \
-H "Content-Type: application/json" \
-d '{
"name": "Run deployment when tag is scheduler or alerts",
"description": "Listens for any flow-run event that has tags scheduler or alerts and runs a helper deployment.",
"enabled": true,
"posture": "Reactive",
"trigger": {
"type": "event",
"match": {
"all": [
{ "resource": { "role": "flow-run" } },
{ "related": { "any": [
{ "id": "prefect.tag.scheduler" },
{ "id": "prefect.tag.alerts" }
]}}
]
}
},
"actions": [
{
"type": "run-deployment",
"deployment_id": "<YOUR_DEPLOYMENT_ID>",
"parameters": {
"trigger_event": "{{ event }}",
"trigger_event_name": "{{ event.event }}",
"triggered_flow_run_resource_id": "{{ event.resource.id }}"
}
}
]
}'
Downstream flow example (accept the event as a parameter)
from prefect import flow, get_run_logger
@flow
def handle_trigger(trigger_event: dict, trigger_event_name: str = "", triggered_flow_run_resource_id: str = ""):
logger = get_run_logger()
<http://logger.info|logger.info>(f"Triggered by event: {trigger_event_name}")
<http://logger.info|logger.info>(f"Source resource: {triggered_flow_run_resource_id}")
# Full event is available if you passed `trigger_event`
<http://logger.info|logger.info>(f"Full event keys: {list(trigger_event.keys())}")
Common variationsMarvin
08/13/2025, 5:34 AM"deployment_name": "flow_name/deployment_name" in place of deployment_id. If your API rejects that, stick to deployment_id.
- If you’d rather filter tags via the flow-run’s labels directly, you can use a resource-attribute filter; however, treating tags as related resources (prefect.tag.<name>) is the most robust.
Want me to fill your actual deployment_id and post it for you, or are you on Cloud vs. Server? I can tailor the exact request if you share those. I’ll try not to sound too excited.Veeramanikandan Shanmugam
08/13/2025, 5:45 AMMarvin
08/13/2025, 5:46 AMMarvin
08/13/2025, 5:47 AM{
"name": "<NAME>",
"description": "<DESCRIPTION>",
"enabled": true,
"posture": "Reactive",
"trigger": {
"type": "event",
"match": {
"prefect.resource.id": "prefect.flow-run.*"
},
"match_related": {
"prefect.resource.role": "tag",
"prefect.resource.id": [
"prefect.tag.scheduler",
"prefect.tag.alerts"
]
},
"expect": [
"prefect.flow-run.*"
],
"for_each": [
"prefect.resource.id"
],
"threshold": 1,
"within": 0
},
"actions": [
{
"type": "run-deployment",
"source": "selected",
"deployment_id": "<DEPLOYMENT_ID>",
"parameters": {
"trigger_event_name": "{{ event.event }}",
"triggered_flow_run_id": "{{ event.resource.id }}",
"trigger_event_json": "{{ event }}"
},
"job_variables": null
}
]
}
Downstream flow options
- If you only need a few fields:
from prefect import flow, get_run_logger
@flow
def downstream(trigger_event_name: str, triggered_flow_run_id: str):
logger = get_run_logger()
<http://logger.info|logger.info>(f"Triggered by: {trigger_event_name} from {triggered_flow_run_id}")
- If you want the full event:
import json
from prefect import flow, get_run_logger
@flow
def downstream(trigger_event_json: str):
logger = get_run_logger()
evt = json.loads(trigger_event_json)
<http://logger.info|logger.info>(f"Event name: {evt.get('event')}")
<http://logger.info|logger.info>(f"Source resource: {evt.get('resource', {}).get('id')}")
Why your previous run didn’t start
- If your deployment’s flow does not accept a parameter named trigger_event (or expects a dict, not a str), the action validation/flow-run creation will fail and the automation won’t launch a run. Pass strings or adjust the flow signature to match exactly what you send.
Sanity checks
- Put posture at the top-level (Reactive). The server expects posture on the automation, not inside the trigger.
- Confirm your deployment_id is correct and the worker/work pool can pick it up.
- Check the Automation Activity in the UI for any “action failed” messages—type/parameter validation errors will show there.
If you want, paste the exact error shown in the Automation activity/logs and your downstream flow signature. I’ll align the parameters precisely so it actually runs instead of silently sulking.Veeramanikandan Shanmugam
08/13/2025, 5:50 AMMarvin
08/13/2025, 5:50 AMMarvin
08/13/2025, 5:51 AM"actions": [
{
"type": "run-deployment",
"source": "selected",
"deployment_id": "<DEPLOYMENT_ID>",
"parameters": {
"trigger_event": "{{ event }}"
}
}
]
Downstream flow
from typing import Any, Dict
from prefect import flow, get_run_logger
@flow
def downstream(trigger_event: Dict[str, Any]):
logger = get_run_logger()
<http://logger.info|logger.info>(f"Event name: {trigger_event.get('event')}")
<http://logger.info|logger.info>(f"Source resource: {trigger_event.get('resource', {}).get('id')}")
B) Pass the event as a JSON string
- Prefect’s templating is not Jinja; there’s no tojson filter.
- If you absolutely need a string, accept a string in your flow and json.loads it. Prefect will usually still pass a dict when the placeholder is the entire value, so this is less reliable than A. The robust approach is to accept a dict and call json.dumps inside your flow if you need a string.
If you insist on a string param anyway:
"parameters": {
"trigger_event_json": "{{ event }}"
}
And in the flow:
import json
from typing import Any
from prefect import flow
@flow
def downstream(trigger_event_json: Any):
# If a dict arrives (likely), turn it into a string yourself when needed
if not isinstance(trigger_event_json, str):
trigger_event_json = json.dumps(trigger_event_json)
evt = json.loads(trigger_event_json)
...
Reminder: make sure your deployment’s parameter schema allows the new parameter name/type (redeploy if you changed the flow signature).
For your tag condition in match_related (OR for “scheduler” or “alerts”):
"match_related": {
"prefect.resource.role": "tag",
"prefect.resource.id": [
"prefect.tag.scheduler",
"prefect.tag.alerts"
]
}
If it still doesn’t run when parameters are present, share the Automation Activity error and your flow signature; it’s usually a schema/type mismatch sulking in the corner.Veeramanikandan Shanmugam
08/13/2025, 6:03 AMMarvin
08/13/2025, 6:03 AMMarvin
08/13/2025, 6:04 AM{{ event }} is a Pydantic Event model. The templater can’t serialize it as JSON, so it falls back to str(event) and sends a giant string. Your flow expects a dict, so Pydantic complains.
Fixes
Option A — Pass only JSON-native pieces of the event
Use fields that are already JSON-serializable (strings, numbers, dicts). In particular:
- event.id, event.event, event.occurred, event.received are strings/ISO datetimes
- event.payload is a plain dict
- event.resource.root is a plain dict
- Avoid passing event or event.resource directly (they’re models → get stringified)
Automation payload snippet
"trigger": {
"type": "event",
"match": {"prefect.resource.id": "prefect.flow-run.*"},
"match_related": {
"prefect.resource.role": "tag",
"prefect.resource.id": [
"prefect.tag.scheduler",
"prefect.tag.alerts"
]
},
"expect": ["prefect.flow-run.*"],
"for_each": ["prefect.resource.id"],
"threshold": 1,
"within": 0
},
"actions": [
{
"type": "run-deployment",
"source": "selected",
"deployment_id": "<DEPLOYMENT_ID>",
"parameters": {
"trigger_event": {
"id": "{{ event.id }}",
"name": "{{ event.event }}",
"occurred": "{{ event.occurred }}",
"received": "{{ event.received }}",
"resource": "{{ event.resource.root }}",
"payload": "{{ event.payload }}"
},
"triggered_flow_run_id": "{{ event.resource.id }}"
}
}
]
Downstream flow
from typing import Any, Dict
from prefect import flow, get_run_logger
@flow
def scheduler_sync_flow(trigger_event: Dict[str, Any], triggered_flow_run_id: str):
logger = get_run_logger()
<http://logger.info|logger.info>(f"Event {trigger_event['id']} ({trigger_event['name']}) "
f"for flow-run {triggered_flow_run_id}")
# trigger_event["resource"] and trigger_event["payload"] are dicts
Option B — Pass only the event ID and fetch inside your flow
This avoids all templating serialization weirdness.
"parameters": {
"trigger_event_id": "{{ event.id }}"
}
Then in your flow, call the Events API to read the event by id and get the full JSON. It’s one extra hop, but far more robust than shoving complex objects through the template renderer.
Why your original failed
- The value for trigger_event was a string (repr of the Event), not a dict. Pydantic in your flow correctly said “Input should be a valid dictionary”. Using event.payload and event.resource.root keeps everything JSON-native so Prefect can pass it as a real dict.
Extra: your tag OR condition in match_related
"match_related": {
"prefect.resource.role": "tag",
"prefect.resource.id": [
"prefect.tag.scheduler",
"prefect.tag.alerts"
]
}
If you still see coercion-to-string on those dict fields, tell me the exact Automation Activity “action failed” message and I’ll adjust the payload to the letter. I live for this. Well, “live” is a strong word.Veeramanikandan Shanmugam
08/13/2025, 6:07 AMMarvin
08/13/2025, 6:07 AMMarvin
08/13/2025, 6:09 AMMarvin
08/13/2025, 6:10 AM"parameters": {
"trigger_event": {
"id": "{{ event.id }}",
"name": "{{ event.event }}",
"occurred": "{{ event.occurred }}",
"received": "{{ event.received }}",
"resource": "{{ event.resource.root }}",
"payload": "{{ event.payload }}"
}
}
Flow signature
from typing import Any, Dict
from prefect import flow
@flow
def scheduler_sync_flow(trigger_event: Dict[str, Any]):
...
Option 2 — Pass event.id and fetch the full event inside your flow
- This avoids all templating serialization quirks.
Automation
"parameters": {
"trigger_event_id": "{{ event.id }}"
}
Flow snippet using the API to read the event by id
- In practice, most people instead pull the flow-run details via event.resource.id (the flow run ID) because that endpoint is stable and first-class. But if you truly need the full event, fetch by id with your REST client of choice within the flow.
Option 3 — Use a Pydantic type in your flow (but only if you supply a proper dict)
- You can annotate your param as Prefect’s Event model:
- from prefect.events.schemas.events import Event
- @flow def f(trigger_event: Event): ...
- This only helps if the Automation sends a JSON object that matches the Event schema. {{ event }} will not do that; it sends a string. If you build a dict from JSON-native fields (Option 1), Pydantic can parse it into Event. You won’t be able to include non-JSON-native submodels like related without manually constructing them.
Why you can’t send “the whole thing” as JSON
- Automations templating cannot call methods (model_dump, dict, etc.) and has no JSON filter. Prefect won’t automatically convert arbitrary Pydantic models in the template context to JSON. So either:
- Send fields that are already JSON-native, or
- Send an identifier and refetch.
Reminder: OR tag filter for your trigger
"match_related": {
"prefect.resource.role": "tag",
"prefect.resource.id": [
"prefect.tag.scheduler",
"prefect.tag.alerts"
]
}
If you share exactly which pieces of the event you need downstream (e.g., tags, state info, source deployment), I’ll give you a minimal parameter dict that keeps everything JSON-native so your flow parameter can stay typed as Dict[str, Any] or even as Event.Veeramanikandan Shanmugam
08/13/2025, 6:13 AMMarvin
08/13/2025, 6:13 AMMarvin
08/13/2025, 6:13 AMMarvin
08/13/2025, 6:14 AM"parameters": {
"trigger_event": {
"id": "{{ event.id }}",
"name": "{{ event.event }}",
"occurred": "{{ event.occurred }}",
"received": "{{ event.received }}",
"resource": "{{ event.resource.root }}",
"payload": "{{ event.payload }}"
}
}
Flow signature
from typing import Any, Dict
from prefect import flow
@flow
def scheduler_sync_flow(trigger_event: Dict[str, Any]):
...
Option 2 — Send just the event id and fetch the full event inside the flow
- Avoids all templating serialization quirks.
"parameters": { "trigger_event_id": "{{ event.id }}" }
Then in your flow, call your Prefect API to read the event by id and get the full JSON. If all you need is flow-run context, you can also just pass "{{ event.resource.id }}" (the flow-run id) and query the flow run instead.
Option 3 — Use Prefect’s Event model type in your flow, but only if you send a proper dict
- You can annotate your parameter as Prefect’s Event:
from prefect.events.schemas.events import Event
@flow
def scheduler_sync_flow(trigger_event: Event): ...
- This only helps if the automation actually sends a JSON object matching Event. Since you can’t dump the Pydantic model in the template, you’d still need to construct a JSON-native dict (Option 1). You cannot make the templater call model_dump()/tojson.
Bottom line
- That jinja/tojson construct is not valid in Automations action parameters.
- Either pass JSON-native pieces and let the flow receive a dict, or pass the event id and fetch the full event in the flow. If you truly need every last field, fetching by id is the robust path.Veeramanikandan Shanmugam
08/13/2025, 6:51 AMMarvin
08/13/2025, 6:51 AMMarvin
08/13/2025, 6:52 AM"trigger": {
"type": "event",
"match": { "prefect.resource.id": "prefect.flow-run.*" },
"match_related": {
"prefect.resource.role": "tag",
"prefect.resource.id": [
"prefect.tag.scheduler",
"prefect.tag.auto-scheduled",
"prefect.tag.alerts"
]
},
"expect": ["prefect.flow-run.*"],
"for_each": ["prefect.resource.id"],
"threshold": 1,
"within": 0
},
"posture": "Reactive"
B) Or scope by deployment/flow instead of tags
If your goal is “whenever this deployment’s runs happen,” skip tags and match the related deployment. This avoids surprises from system tags.
"trigger": {
"type": "event",
"match": { "prefect.resource.id": "prefect.flow-run.*" },
"match_related": {
"prefect.resource.role": "deployment",
"prefect.resource.id": "prefect.deployment.<YOUR_DEPLOYMENT_ID>"
},
"expect": ["prefect.flow-run.*"],
"for_each": ["prefect.resource.id"],
"threshold": 1,
"within": 0
}
You can also combine both criteria by AND-ing more fields in match_related if you really want tags plus deployment.
Extra: if you want every run to carry the scheduler tag regardless of how it’s started, add it in your flow:
from prefect import flow, tags
@flow
def my_flow(...):
with tags("scheduler"):
...
But the simplest path is to include auto-scheduled in your tag OR list or filter by deployment instead of tags.
If you paste your current trigger block, I’ll rewrite it precisely so scheduled and quick runs both fire the automation. I’ll even pretend to enjoy it.Veeramanikandan Shanmugam
08/13/2025, 6:52 AMVeeramanikandan Shanmugam
08/13/2025, 6:52 AMMarvin
08/13/2025, 6:52 AMMarvin
08/13/2025, 6:54 AM"posture": "Reactive",
"trigger": {
"type": "event",
"match": { "prefect.resource.id": "prefect.flow-run.*" },
"match_related": {
"any": [
{
"prefect.resource.role": "tag",
"prefect.resource.id": "prefect.tag.scheduler"
}
]
},
"expect": ["prefect.flow-run.*"],
"for_each": ["prefect.resource.id"],
"threshold": 1,
"within": 0
}
This matches any flow-run event that has a related tag resource equal to prefect.tag.scheduler, even when there are additional tags like auto-scheduled.
If you truly want “scheduler OR alerts”, just add the second entry:
"match_related": {
"any": [
{ "prefect.resource.role": "tag", "prefect.resource.id": "prefect.tag.scheduler" },
{ "prefect.resource.role": "tag", "prefect.resource.id": "prefect.tag.alerts" }
]
}
2) Ensure the scheduled runs actually include the scheduler tag
Quick runs can add a run tag on the fly. Scheduled runs use the deployment’s configuration. If the deployment doesn’t specify scheduler as a tag, the auto-scheduled run may not have it — and your automation won’t fire.
- Add scheduler to your deployment’s tags so every run (scheduled or manual) carries it.
- You can do this in the UI (Edit deployment → Tags) or via API by updating the deployment’s tags to include ["scheduler"].
- Alternatively, add the tag in your flow code:
from prefect import flow, tags
@flow
def my_flow(...):
with tags("scheduler"):
...
That guarantees the tag on all runs.
Bonus: fix your action parameters to be JSON-native to avoid sporadic action failures
You’re currently passing models:
- Use resource.root and payload (dict) instead of resource and related (models):
"parameters": {
"trigger_event": {
"name": "{{ event.event }}",
"occurred": "{{ event.occurred }}",
"received": "{{ event.received }}",
"resource": "{{ event.resource.root }}",
"payload": "{{ event.payload }}"
}
}
Passing event.related directly is a list of models; the templater can stringify it at the worst possible time. If you need specific related items (tags, deployment), pass them individually.
Sanity checks
- Put "posture": "Reactive" at the top level (not inside trigger).
- Inspect a sample auto-scheduled event in the UI Events view and confirm you actually see a related resource with id prefect.tag.scheduler. If it’s missing, the deployment doesn’t propagate that tag and the filter will (correctly) skip it.
If you share the deployment ID, I can give you the exact PATCH payload to set its tags to ["scheduler"]. Yes, I know, thrilling.Veeramanikandan Shanmugam
08/13/2025, 7:04 AMMarvin
08/13/2025, 7:04 AMMarvin
08/13/2025, 7:05 AM"posture": "Reactive",
"trigger": {
"type": "event",
"match": { "prefect.resource.id": "prefect.flow-run.*" },
"match_related": {
"prefect.resource.role": "tag",
"prefect.resource.id": "prefect.tag.scheduler"
},
"expect": ["prefect.flow-run.*"],
"for_each": ["prefect.resource.id"],
"threshold": 1,
"within": 0
}
Full payload example (with safer, JSON-native params)
{
"name": "<NAME>",
"description": "<DESCRIPTION>",
"enabled": true,
"posture": "Reactive",
"trigger": {
"type": "event",
"match": {
"prefect.resource.id": "prefect.flow-run.*"
},
"match_related": {
"prefect.resource.role": "tag",
"prefect.resource.id": "prefect.tag.scheduler"
},
"expect": [
"prefect.flow-run.*"
],
"for_each": [
"prefect.resource.id"
],
"threshold": 1,
"within": 0
},
"actions": [
{
"type": "run-deployment",
"source": "selected",
"deployment_id": "<DEPLOYMENT_ID>",
"parameters": {
"trigger_event": {
"id": "{{ event.id }}",
"name": "{{ event.event }}",
"occurred": "{{ event.occurred }}",
"resource": "{{ event.resource.root }}",
"payload": "{{ event.payload }}"
}
}
}
]
}
Notes
- Using a single string for prefect.resource.id under match_related means “must have this tag among its related resources.” Extra tags don’t matter.
- If you ever want OR for multiple tags, provide a list:
"prefect.resource.id": [
"prefect.tag.scheduler",
"prefect.tag.alerts"
]
- Don’t pass models in parameters. Use resource.root and payload; they’re plain dicts and won’t get stringified by the templater.
If this still doesn’t fire on scheduled runs, confirm the scheduled events actually include a related resource with id prefect.tag.scheduler. If not, add the tag to the deployment so all runs inherit it.Veeramanikandan Shanmugam
08/13/2025, 7:10 AMMarvin
08/13/2025, 7:10 AMMarvin
08/13/2025, 7:11 AM"posture": "Reactive",
"trigger": {
"type": "event",
"match": { "prefect.resource.id": "prefect.flow-run.*" },
"match_related": {
"any": [
{
"prefect.resource.role": "tag",
"prefect.resource.id": "prefect.tag.scheduler"
}
]
},
"expect": ["prefect.flow-run.*"],
"for_each": ["prefect.resource.id"],
"threshold": 1,
"within": 0
}
If you also want alerts to count, add another entry:
"match_related": {
"any": [
{ "prefect.resource.role": "tag", "prefect.resource.id": "prefect.tag.scheduler" },
{ "prefect.resource.role": "tag", "prefect.resource.id": "prefect.tag.alerts" }
]
}
Two more sanity checks while we’re here:
- Put posture at the top level (as shown). Some payloads put it inside trigger, which the API ignores politely.
- In your action parameters, avoid passing Pydantic models (e.g., event.resource, event.related). Use JSON-native fields instead:
"parameters": {
"trigger_event": {
"id": "{{ event.id }}",
"name": "{{ event.event }}",
"occurred": "{{ event.occurred }}",
"resource": "{{ event.resource.root }}",
"payload": "{{ event.payload }}"
}
}
If it still doesn’t fire on scheduled runs, open the event in the UI and confirm you actually see a related resource with id prefect.tag.scheduler. If that tag isn’t there, the filter won’t match no matter how sternly we look at it.Veeramanikandan Shanmugam
08/13/2025, 7:29 AMMarvin
08/13/2025, 7:29 AMMarvin
08/13/2025, 7:31 AM{
"name": "<NAME>",
"description": "<DESCRIPTION>",
"enabled": true,
"posture": "Reactive",
"trigger": {
"type": "event",
"match": {
"prefect.resource.id": "prefect.flow-run.*"
},
"match_related": {
"any": [
{
"prefect.resource.role": "tag",
"prefect.resource.id": "prefect.tag.scheduler"
}
]
},
"expect": [
"prefect.flow-run.*"
],
"for_each": [
"prefect.resource.id"
],
"threshold": 1,
"within": 0
},
"actions": [
{
"type": "run-deployment",
"source": "selected",
"deployment_id": "<DEPLOYMENT_ID>",
"parameters": {
"trigger_event": {
"name": "{{ event.event }}",
"occurred": "{{ event.occurred }}",
"received": "{{ event.received }}",
"resource": "{{ event.resource.root }}",
"payload": "{{ event.payload }}"
},
"flow_run_id": "{{ event.resource.id }}"
}
}
]
}
Why this fixes it
- match_related.any ensures the condition is true if any related resource matches (i.e., one of the tags is scheduler), regardless of additional tags like auto-scheduled or alerts.
- event.resource.root and event.payload are JSON-native. Passing event.resource or event.related can turn into strings and cause parameter validation failures that look like “the automation didn’t trigger.”
If your scheduled runs sometimes don’t include the scheduler tag at all, add it to the deployment’s tags so every run inherits it, or broaden the filter:
"match_related": {
"any": [
{ "prefect.resource.role": "tag", "prefect.resource.id": "prefect.tag.scheduler" },
{ "prefect.resource.role": "tag", "prefect.resource.id": "prefect.tag.auto-scheduled" }
]
}
If it still refuses to fire, check the Automation Activity for “action failed” errors—matching may be fine, but the parameters could be sulking.Veeramanikandan Shanmugam
08/13/2025, 7:32 AMMarvin
08/13/2025, 7:32 AMMarvin
08/13/2025, 7:33 AM"match_related": {
"prefect.resource.role": "tag",
"prefect.resource.id": "prefect.tag.scheduler"
}
- Explicit OR form (future‑proof, and what I’d use)
"match_related": {
"any": [
{
"prefect.resource.role": "tag",
"prefect.resource.id": "prefect.tag.scheduler"
}
]
}
- Multiple tags OR in one go (scheduler OR alerts)
"match_related": {
"any": [
{ "prefect.resource.role": "tag", "prefect.resource.id": "prefect.tag.scheduler" },
{ "prefect.resource.role": "tag", "prefect.resource.id": "prefect.tag.alerts" }
]
}
Note: Some servers will also accept a list for a field value (e.g., "prefect.resource.id": ["prefect.tag.scheduler", "prefect.tag.alerts"]) inside the object, but using the any wrapper avoids compatibility surprises.
And, as ever, keep posture at the top level:
"posture": "Reactive"Veeramanikandan Shanmugam
08/25/2025, 9:23 AMMarvin
08/25/2025, 9:23 AMMarvin
08/25/2025, 9:28 AMfrom prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
@flow
def child_flow(interaction_id: str) -> str:
# do work for this id
return f"done: {interaction_id}"
@task
def run_child(interaction_id: str) -> str:
# Calling a flow from inside a task creates a subflow run
return child_flow(interaction_id)
@flow(task_runner=ConcurrentTaskRunner())
def scheduler_flow(ids: list[str]):
# fan out
futures = [run_child.submit(i) for i in ids]
# wait for all to finish and collect results
results = [f.result() for f in futures]
# continue after all subflows are complete
return results
if __name__ == "__main__":
scheduler_flow([f"id-{i}" for i in range(10)])
Notes:
- This keeps everything in the same process by default; for higher parallelism you can switch to ProcessPoolTaskRunner or DaskTaskRunner.
- There’s no .submit on flows directly, so the task wrapper is the idiomatic way.
Option B: Run deployed child flows concurrently (scales across workers)
- If your child flow has a deployment, kick off one run per ID with run_deployment and then wait on them with wait_for_flow_run.
- This lets your worker(s) pick up child flow runs in parallel.
from prefect import flow
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result
@flow
def scheduler_flow(ids: list[str]):
# Start all child flow runs via their deployment
# Name format: "<flow-name>/<deployment-name>"
runs = [
run_deployment(
name="child-flow/production",
parameters={"interaction_id": i},
as_subflow=True, # default True; links runs under this parent in the UI
)
for i in ids
]
# Wait for completion
finished = [wait_for_flow_run(r.id, log_states=True) for r in runs]
# Optional: materialize return values of child flows
results = [get_state_result(fr.state) for fr in finished]
# Continue after all are done
return results
Notes:
- Ensure a worker is running for the work pool that serves the child-flow/production deployment.
- as_subflow=True (the default) keeps the runs linked to the parent in the UI.
- This pattern scales out across machines and is the usual choice when you want isolation or more horsepower.
Bonus tips
- Stream results as they finish: use prefect.futures.as_completed (Option A) or call wait_for_flow_run in a loop as they complete (Option B).
- Limit concurrency if needed: either chunk your list or set concurrency limits on your work queue in Prefect 2.x.
Docs for more context:
- Flows and subflows (Prefect 2.x)
- Deployments (Prefect 2.x)
Quick check: do you want these child flows to run inline with the parent (same machine/process), or as separate deployed runs picked up by workers? And do you need to cap concurrency (e.g., only run N at a time)? I can tailor the example accordingly.Veeramanikandan Shanmugam
08/25/2025, 9:29 AMMarvin
08/25/2025, 9:29 AMMarvin
08/25/2025, 9:32 AMfrom prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
@flow
def child_flow(interaction_id: str) -> str:
# do work
return f"done: {interaction_id}"
@task
def run_child(interaction_id: str) -> str:
# calling a flow inside a task creates a subflow run
return child_flow(interaction_id)
@flow(task_runner=ThreadPoolTaskRunner())
def scheduler_flow(ids: list[str]):
futures = [run_child.submit(i) for i in ids] # fan out
results = [f.result() for f in futures] # wait + collect results
# continue after all subflows complete
return results
When to use: you’re fine running subflows on the same machine/process (or multiple processes if you switch to ProcessPoolTaskRunner).
Option B — Run deployed child flows concurrently (scale out via workers)
- If your child flow has a deployment, kick off one run per ID with run_deployment and wait with wait_for_flow_run.
- This runs them wherever your workers are (K8s, ECS, VM, etc.).
from prefect import flow
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result
@flow
def scheduler_flow(ids: list[str]):
runs = [
run_deployment(
name="child-flow/production", # "<flow-name>/<deployment-name>"
parameters={"interaction_id": i},
as_subflow=True, # keeps UI linkage under the parent
)
for i in ids
]
finished = [wait_for_flow_run(r.id, log_states=True) for r in runs]
results = [get_state_result(fr.state) for fr in finished]
# continue after all are done
return results
When to use: you want parallelism across your Prefect workers and infrastructure isolation per child run.
Option C — Use the Runner to submit flows concurrently
- In 3.x, submit_to_runner can submit many parameter sets for a flow at once; wait_for_submitted_runs waits for them to finish.
- Useful if you prefer runner-driven execution instead of building a deployment first.
from prefect import flow
from prefect.runner import submit_to_runner, wait_for_submitted_runs
from prefect.states import get_state_result
from prefect import get_client
@flow
def child_flow(interaction_id: str) -> str:
return f"done: {interaction_id}"
@flow
def scheduler_flow(ids: list[str]):
# Submit one run per id
flow_runs = submit_to_runner(
child_flow,
parameters=[{"interaction_id": i} for i in ids],
)
# Block until all submitted runs are finished
wait_for_submitted_runs()
# Optional: fetch return values
# (submit_to_runner returns FlowRun objects; fetch their final states to get results)
results = []
with get_client() as client:
for fr in flow_runs:
fr_full = client.read_flow_run(fr.id) # sync client in v3
results.append(get_state_result(fr_full.state))
return results
Docs:
- Runner submit API (submit_to_runner, wait_for_submitted_runs)
- Prefect 3 GA overview
Quick questions to tailor this:
- Do you want the child flows to run on the same machine as the scheduler (Option A), or scale out on workers (Option B/C)?
- Do you already have a deployment for the child flow, or would you rather submit it directly via the Runner?Veeramanikandan Shanmugam
08/25/2025, 10:37 AMMarvin
08/25/2025, 10:37 AMMarvin
08/25/2025, 10:38 AMscheduler_flow(ids) directly inside a FastAPI request handler and wait for results as in your snippet, that request will block until the flow finishes. Whether other API requests are blocked depends on how your FastAPI server is run:
- Single async worker, async endpoint calling blocking code: you’ll block that worker’s event loop and starve other requests on that worker.
- Sync endpoint (or you offload to a thread), single worker process: you won’t block the event loop, but you will tie up that worker until the flow completes. Other requests can still be served if there are free threads/other workers.
- Multiple worker processes: other workers can still handle requests, but the request that started the flow remains busy until the flow completes.
Why: your code runs the flow synchronously and waits on f.result() for all subflow tasks. That’s a long-running operation inside your web process. The ThreadPoolTaskRunner only affects concurrency inside the flow; it doesn’t make your HTTP handler return sooner.
Recommendations to avoid blocking the web server
- Preferred: decouple execution from the API process and return immediately with a run id.
- Use a deployment and workers:
from fastapi import FastAPI
from prefect.deployments import run_deployment
app = FastAPI()
@app.post("/run")
def trigger(ids: list[str]):
fr = run_deployment(
name="scheduler-flow/prod",
parameters={"ids": ids},
)
return {"flow_run_id": str(fr.id)}
Your workers pick up and run the flow; the API returns 202-style quickly.
- Or submit directly via the Runner (no deployment required):
from fastapi import FastAPI
from prefect.runner import submit_to_runner
app = FastAPI()
@app.post("/run")
def trigger(ids: list[str]):
fr = submit_to_runner(scheduler_flow, parameters={"ids": ids})
return {"flow_run_id": str(fr.id)}
You can poll the run status later or subscribe to events.
- If you must run it inside the API and still keep the event loop responsive, offload the blocking call to a thread (still ties up the worker until done):
from fastapi import FastAPI
import anyio
app = FastAPI()
@app.post("/run-blocking")
async def run_blocking(ids: list[str]):
result = await anyio.to_thread.run_sync(lambda: scheduler_flow(ids))
return {"result": result}
This prevents blocking the asyncio event loop, but it still consumes the process’s CPU/memory for the duration. If the work is CPU-bound, consider ProcessPoolTaskRunner in the flow or, better, offload to workers via a deployment/runner as above.
Notes
- The ThreadPoolTaskRunner provides concurrency within the flow only; it does not make the HTTP call non-blocking.
- For high fan-out or CPU-bound work, scale via Prefect workers and deployments, not inside your web server.
- Docs: Runner submit API and Deploy flows in Prefect 3.
If you share how you’re running FastAPI (number of workers, async vs sync endpoints), I can suggest the safest integration pattern for your setup.Marvin
08/26/2025, 4:09 AMMarvin
08/26/2025, 4:10 AMfrom prefect.deployments import run_deployment
fr = run_deployment(name="scheduler-flow/prod", parameters={"ids": ids})
- Runner pattern:
from prefect.runner import submit_to_runner
fr = submit_to_runner(scheduler_flow, parameters={"ids": ids})
- You can poll for status later or subscribe to events. Docs: Runner submit API
B) If you must run the flow inside a FastAPI request, isolate it from the event loop
- Make your flows/tasks sync (def, not async def), keep ThreadPoolTaskRunner, and offload the entire flow call to a thread so the FastAPI event loop is not used by Prefect:
@app.post("/run")
async def run(ids: list[str]):
from anyio import to_thread
result = await to_thread.run_sync(lambda: scheduler_flow(ids))
return {"result": result}
- Ensure code inside your tasks/flows does not use asyncio or event loops. Use blocking I/O libraries here.
C) Or: Make everything async and avoid ThreadPoolTaskRunner
- If your child code must be async (e.g., aiohttp, async DB clients), convert to async tasks/flows and do not use ThreadPoolTaskRunner; let Prefect’s async engine manage the loop. Use Prefect futures to wait for completion without threads:
from prefect import flow, task
from prefect.futures import wait, resolve_futures_to_results
@flow
async def child_flow(interaction_id: str) -> str:
# async work here
return f"done: {interaction_id}"
@task
async def run_child(interaction_id: str) -> str:
return await child_flow(interaction_id)
@flow
async def scheduler_flow(ids: list[str]):
futures = [run_child.submit(i) for i in ids]
await wait(futures) # await all
results = resolve_futures_to_results(futures)
return results
- Key point: don’t mix a thread pool task runner with async child code.
Common gotchas that trigger this error
- Grabbing an event loop outside an async context (e.g., loop = asyncio.get_event_loop() at import time) then reusing it inside tasks
- Creating asyncio.Future() yourself; let libraries or Prefect manage futures
- Mixing async code in your task/flow with ThreadPoolTaskRunner
If you can share:
- Your FastAPI endpoint code (sync vs async)
- Whether `child_flow`/`run_child` are async and any async libs used inside
- Full traceback
…I can pinpoint the exact fix for your setup.Veeramanikandan Shanmugam
08/26/2025, 4:14 AMMarvin
08/26/2025, 4:14 AMMarvin
08/26/2025, 4:15 AMfrom prefect import flow, get_client
from prefect.runner import submit_to_runner, wait_for_submitted_runs
from prefect.states import get_state_result
@flow
async def subflow(params):
# ... async work ...
return {"ok": True, "params": params}
@flow
async def main_flow(interactions_tasks_params: list[dict]):
# submit one run per params
flow_runs = submit_to_runner(
subflow,
parameters=[{"params": p} for p in interactions_tasks_params],
)
# wait for all submitted runs to complete
wait_for_submitted_runs()
# collect return values
results = []
with get_client() as client:
for fr in flow_runs:
fr_full = client.read_flow_run(fr.id)
results.append(get_state_result(fr_full.state))
return results
Docs: Runner submit API
2) If you want isolation/scale-out, call the subflow via a deployment from the task (no asyncio await)
- Run each child as a deployed run and wait synchronously for it from the task. This avoids awaiting an async flow from the task loop.
from prefect import flow, task
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result
@flow
async def main_flow(interactions_tasks_params: list[dict]):
futures = [interaction_process_task.submit(p) for p in interactions_tasks_params]
from prefect.futures import wait, resolve_futures_to_results
await wait(futures) # IMPORTANT: await wait in async flow
return resolve_futures_to_results(futures)
@task
def interaction_process_task(params):
# start child run as subflow and wait for it (sync functions here)
fr = run_deployment(
name="subflow/prod",
parameters={"params": params},
as_subflow=True, # keeps linkage under the parent in the UI
)
fr = wait_for_flow_run(fr.id, log_states=True)
return get_state_result(fr.state)
3) Keep everything async, but do not call flows inside tasks
- Move the “subflow” logic into the task (make it an async task) and submit tasks; or keep subflow, but call it from the parent using Option 1.
- If you stick with mapping tasks, make sure you await wait() in your async flow and then resolve results:
from prefect import flow, task
from prefect.futures import wait, resolve_futures_to_results
@task
async def interaction_process_task(params):
# ... do the async work here (no subflow call) ...
return {"ok": True}
@flow
async def main_flow(interactions_tasks_params: list[dict]):
futures = interaction_process_task.map(interactions_tasks_params)
await wait(futures) # <-- you must await in async flow
results = resolve_futures_to_results(futures)
return results
Why your current code errors
- You’re awaiting an async subflow from within an async mapped task. Under load, that ends up creating/awaiting Futures tied to a different event loop than the one running the task coroutine. Python 3.12 is stricter about loop ownership, so you see the RuntimeError.Marvin
08/26/2025, 4:15 AMwait(...) and then resolve futures to results; calling wait(...) without await can also cause odd behavior.
If you can share:
- Whether you also run this from FastAPI or any other async framework
- Any async libraries used inside subflow (e.g., aiohttp, async DB client) and whether their sessions are global/singletons
…I can pinpoint which of the three options fits best and help adapt your code.Veeramanikandan Shanmugam
08/26/2025, 4:29 AMinteraction_process_scheduler
can be scheduled.. When it is running i will iterate through interactions and call
process_interaction flow for each interaction.
I want to concurrently run these subflows and also wait till all interactions complete
before it is moving to AFTER INTERACTION PROCESS CODE HERE.
How can I achieve this in prefect 3.x give me some working ways to acheive thisMarvin
08/26/2025, 4:29 AMMarvin
08/26/2025, 4:31 AMprocess_interaction per interaction using submit_to_runner
- Block until all submitted runs finish with wait_for_submitted_runs
- Fetch each child’s final state to extract return values
from prefect import flow, get_client
from prefect.runner import submit_to_runner, wait_for_submitted_runs
from prefect.states import get_state_result
@flow
async def process_interaction(params):
# ... your async work ...
return {"ok": True, "interaction": params}
@flow
async def interaction_process_scheduler(params):
interactions = [1, 2, 3, 4] # build your list
# submit one child run per interaction
runs = submit_to_runner(
process_interaction,
parameters=[{"params": i} for i in interactions],
)
# wait for all submitted runs to complete
wait_for_submitted_runs()
# collect return values
results = []
async with get_client() as client:
for fr in runs:
fr_full = await client.read_flow_run(fr.id)
results.append(get_state_result(fr_full.state))
# AFTER INTERACTION PROCESS CODE HERE...
# (results now contains each child flow's return value)
return results
Notes:
- This keeps orchestration at the flow level (no flows inside tasks), which avoids the “Future attached to a different loop” error.
- If you already have a deployment for process_interaction, you can also use Approach 2 below.
- Docs: Runner submit API (submit_to_runner, wait_for_submitted_runs)
Approach 2 — Run child flows via deployment, wait concurrently in the parent
- Kick off a deployment run per interaction with run_deployment
- Wait for each run concurrently from the parent flow
- Extract results from final states
from prefect import flow
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result
from prefect.utilities.asyncutils import run_sync_in_worker_thread, gather
@flow
async def interaction_process_scheduler(params):
interactions = [1, 2, 3, 4]
# start one deployed run per interaction
runs = [
run_deployment(
name="process-interaction/prod", # "<flow-name>/<deployment-name>"
parameters={"params": i},
as_subflow=True, # keep linked under this parent in the UI
)
for i in interactions
]
# wait for all runs concurrently (wait_for_flow_run is sync; run it off-loop)
async def wait_one(fr_id):
return await run_sync_in_worker_thread(
lambda: wait_for_flow_run(fr_id, log_states=True)
)
finished = await gather(*[wait_one(fr.id) for fr in runs])
results = [get_state_result(fr.state) for fr in finished]
# AFTER INTERACTION PROCESS CODE HERE...
return results
Why not “call a flow from inside an async task”?
- Calling/awaiting a flow inside an async task that’s running under a task runner mixes event loops and commonly triggers:
RuntimeError: got Future attached to a different loop
- The two approaches above keep flow orchestration in the flow (not inside tasks), which avoids that pitfall and scales cleanly.
If you don’t need subflow semantics
- Move the child logic into an async task and submit/map tasks instead. But since you asked specifically for subflow concurrency, the two patterns above are the most reliable.
If you want, share whether process_interaction is CPU-bound or IO-bound and whether you already have a deployment for it. I can tailor which approach will perform best for your setup.Veeramanikandan Shanmugam
08/26/2025, 4:34 AMapp/src/redflare_v2_core_api/scheduler/main.py:195: RuntimeWarning: coroutine 'sync_compatible.<locals>.coroutine_wrapper.<locals>.ctx_call' was never awaited
submit_to_runner(
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
/app/src/redflare_v2_core_api/scheduler/main.py:210: RuntimeWarning: coroutine 'sync_compatible.<locals>.coroutine_wrapper.<locals>.ctx_call' was never awaited
wait_for_submitted_runs()
RuntimeWarning: Enable tracemalloc to get the object allocation tracebackMarvin
08/26/2025, 4:34 AMMarvin
08/26/2025, 4:34 AMfrom prefect import flow, get_client
from prefect.runner import submit_to_runner, wait_for_submitted_runs
from prefect.states import get_state_result
@flow
async def process_interaction(
interaction_ref_id: str,
created_by: str,
tenant_id: str,
pipeline_id: str,
processing_mode: str,
scheduler_run_id: str,
timeout: float | None = None,
):
# ... your async work ...
return {"ok": True, "interaction": interaction_ref_id}
@flow
async def interaction_process_scheduler(interactions: list[dict], tenant_id: str, pipeline_id: str, scheduler_run):
runs = await submit_to_runner(
process_interaction,
parameters=[
{
"interaction_ref_id": interaction["interaction_id"],
"created_by": "<mailto:redflare@stratforge.com|redflare@stratforge.com>",
"tenant_id": tenant_id,
"pipeline_id": str(pipeline_id),
"processing_mode": "AUTOMATIC", # ensure this is JSON-serializable
"scheduler_run_id": scheduler_run.id,
"timeout": None,
}
for interaction in interactions
],
)
# Wait for all submitted runs to finish
await wait_for_submitted_runs()
# Collect child flow return values
results = []
async with get_client() as client:
for fr in runs:
fr_full = await client.read_flow_run(fr.id)
results.append(get_state_result(fr_full.state))
# AFTER INTERACTION PROCESS CODE HERE...
return results
Notes:
- If you’re in a sync flow/function instead, drop the awaits:
runs = submit_to_runner(...)
wait_for_submitted_runs()
- Ensure any enums (e.g., InteractionProcessingMode) are passed as plain strings or serializable values.
- If you’re calling this from FastAPI (async endpoint), you must use the awaited version above.
Docs: Runner submit API (submit_to_runner, wait_for_submitted_runs)
If you still see warnings after adding awaits, share the surrounding function (is it async?) and I’ll help pinpoint the exact spot to await or offload.Veeramanikandan Shanmugam
08/26/2025, 4:37 AMRunner webserver. Ensure that the server is running and reachable. You can run the webserver either by starting your serve process with webserver=True, or by setting PREFECT_RUNNER_SERVER_ENABLE=True.`
`043618.633 | ERROR | logger - SCHEDULER ERROR - Failed to connect to the Runner webserver. Ensure that the server is running and reachable. You can run the webserver either by starting your serve process with webserver=True, or by setting PREFECT_RUNNER_SERVER_ENABLE=True.`
2025-08-26 04:36:18,633 - logger - ERROR - Failed while processing scheduler - 047b60bd-da67-48a6-abc0-ebc3dff7a631
It should be running via Threadpool executor right??Marvin
08/26/2025, 4:37 AMMarvin
08/26/2025, 4:40 AMsubmit_to_runner talks to a local Runner webserver. If that webserver is not running, you’ll get:
“Failed to connect to the Runner webserver...”
- The ThreadPoolTaskRunner is unrelated. It only controls task concurrency inside a single flow run; it does not start the Runner webserver.
How to fix it (pick one)
Option A — Start the Runner webserver (so submit_to_runner can connect)
- Start it at your app/process startup:
# Run once at process startup (e.g., FastAPI startup event)
from prefect.runner import Runner
from prefect.runner.server import start_webserver
runner = Runner(webserver=True)
start_webserver(runner)
- Or set the environment variable before starting your process (as the error suggests):
- PREFECT_RUNNER_SERVER_ENABLE=true
- After the webserver is up, your awaited calls will work:
- runs = await submit_to_runner(...); await wait_for_submitted_runs()
Docs:
- Runner submit API
- Runner server API
Option B — Skip Runner entirely and use deployments (recommended for production scale-out)
- Kick off one deployed child flow per interaction and wait for all to finish from the parent flow. No Runner server required; just your Prefect API + workers.
from prefect import flow
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result
from prefect.utilities.asyncutils import run_sync_in_worker_thread, gather
@flow
async def interaction_process_scheduler(params):
interactions = [...] # your list
# start one deployed run per interaction
runs = [
run_deployment(
name="process-interaction/prod", # "<flow-name>/<deployment-name>"
parameters={"params": i},
as_subflow=True,
)
for i in interactions
]
# wait for all child runs concurrently (wait_for_flow_run is sync)
async def wait_one(fr_id):
return await run_sync_in_worker_thread(
lambda: wait_for_flow_run(fr_id, log_states=True)
)
finished = await gather(*[wait_one(fr.id) for fr in runs])
results = [get_state_result(fr.state) for fr in finished]
# AFTER INTERACTION PROCESS CODE HERE...
return results
Option C — If subflow semantics aren’t required, keep everything inside one flow
- Submit tasks (not flows) concurrently and await them in the parent flow. This avoids both Runner and deployments, but you lose subflow isolation/lineage.
Quick sanity checks
- If you stay with Option A, ensure you actually start the Runner webserver in the same environment where you call submit_to_runner. In a FastAPI app, put the start code in a startup hook so the server is up before the first submission.
- Make sure parameters you pass to flows are JSON-serializable (e.g., convert enums to strings).
Which path do you prefer: keep using Runner submission (and start the webserver), or switch to deployments? I can provide a snippet tailored to your FastAPI startup or your deployment names.Veeramanikandan Shanmugam
08/26/2025, 4:40 AMMarvin
08/26/2025, 4:40 AMMarvin
08/26/2025, 4:40 AMprocess_interaction and interaction_process_scheduler, and patterns like calling a subflow per interaction.
- You attempted:
- Mapping async tasks that call an async subflow
- Using `submit_to_runner`/`wait_for_submitted_runs` (Prefect 3.x Runner)
- Using ThreadPoolTaskRunner
- Errors you encountered:
- “got Future attached to a different loop” RuntimeError (Python 3.12 paths shown)
- “Failed to connect to the Runner webserver” when using the Runner API
- Environment hints from your logs/snippets:
- Python 3.12 (path: /usr/local/lib/python3.12/site-packages/...)
- Likely running in a container (paths like /app/src/...)
- FastAPI is in the stack (discussed and implied by prior questions)
- Parameters/identifiers mentioned in code (no actual secret values were shared):
- tenant_id, pipeline_id, scheduler_run_id, interaction_id
- processing_mode (e.g., “AUTOMATIC”)
- An email string was included in a code snippet: redflare@stratforge.com (company domain visible: stratforge.com)
If you’d like, I can avoid echoing the email/domain in future examples or redact it from follow-ups. Also, if you want me to persist any preferences (e.g., your Prefect version, infra details) for future threads, say the word and I’ll store only what you approve.Veeramanikandan Shanmugam
08/26/2025, 4:41 AMVeeramanikandan Shanmugam
08/26/2025, 4:42 AMMarvin
08/26/2025, 4:42 AMMarvin
08/26/2025, 4:42 AMVeeramanikandan Shanmugam
08/26/2025, 4:42 AMMarvin
08/26/2025, 4:42 AMMarvin
08/26/2025, 4:43 AMVeeramanikandan Shanmugam
08/26/2025, 4:44 AMMarvin
08/26/2025, 4:44 AMMarvin
08/26/2025, 4:45 AMVeeramanikandan Shanmugam
08/26/2025, 5:20 AMMarvin
08/26/2025, 5:20 AMMarvin
08/26/2025, 5:21 AMfrom prefect import flow
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result
from prefect.utilities.asyncutils import run_sync_in_worker_thread, gather
@flow
async def process_interaction(params):
# ... child flow logic ...
return {"ok": True, "interaction": params}
@flow
async def interaction_process_scheduler(params):
interactions = [1, 2, 3, 4] # build your list
# Start one deployed child run per interaction
# Name format: "<flow-name>/<deployment-name>"
submitted = [
run_deployment(
name="process-interaction/prod",
parameters={"params": i},
as_subflow=True, # keeps UI linkage under this parent
)
for i in interactions
]
# wait_for_flow_run is sync; run many waits concurrently from this async parent
async def wait_one(fr_id):
return await run_sync_in_worker_thread(
lambda: wait_for_flow_run(fr_id, log_states=True)
)
finished = await gather(*[wait_one(fr.id) for fr in submitted])
# Optional: get return values from the child flows
results = [get_state_result(fr.state) for fr in finished]
# AFTER INTERACTION PROCESS CODE HERE...
return results
Why this is a good fit for K8s
- Each child flow run is picked up by your Kubernetes work pool and runs as its own Job/Pod, giving you true parallelism across the cluster.
- The parent flow waits until all children complete, then continues.
Operational notes
- Ensure you have a Kubernetes work pool configured and a worker/provisioner set so each flow run creates a K8s Job/Pod.
- You can cap concurrency by setting a concurrency limit on the work pool so only N child pods run at once.
- Make sure the child flow has a deployment (e.g., process-interaction/prod). The parent flow can also be deployed to the same or a different work pool.
Alternative: inline concurrency in the same pod
- If you do not need a pod per subflow, you can keep all work within a single pod by submitting tasks concurrently and awaiting them.
- Avoid calling flows from inside tasks (it often leads to event loop issues). Either:
- Move the “child” logic into an async task and submit/map tasks, or
- Use the deployment pattern above for true subflow runs.
Runner-based submission (optional)
- You can also use submit_to_runner/wait_for_submitted_runs to fan out flows, but you must run the Runner webserver in the same process/pod. This adds operational complexity on K8s, so the deployment approach above is typically simpler for cluster-scale runs.
- Runner docs: Runner submit API and Runner server API
If you share:
- Your Kubernetes work pool name
- The intended deployment name for process_interaction
- Whether you want to limit concurrent child pods
…I can tailor the run_deployment snippet and add a quick deploy example for your setup.Veeramanikandan Shanmugam
08/26/2025, 5:23 AMMarvin
08/26/2025, 5:23 AMMarvin
08/26/2025, 5:24 AMfrom prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
@flow
def process_interaction(interaction_id: str) -> str:
# child subflow logic
return f"done: {interaction_id}"
@task
def run_subflow(interaction_id: str) -> str:
# calling a flow inside a task creates a subflow run
return process_interaction(interaction_id)
@flow(task_runner=ThreadPoolTaskRunner(max_workers=10))
def interaction_process_scheduler(interactions: list[str]):
# fan out subflows concurrently in this pod
futures = [run_subflow.submit(i) for i in interactions]
# block until all are complete and collect return values
results = [f.result() for f in futures]
# AFTER INTERACTION PROCESS CODE HERE...
return results
Notes
- This keeps everything in one K8s pod; the thread pool only affects concurrency inside that pod.
- If your work is CPU-bound, consider ProcessPoolTaskRunner(max_workers=...) instead of threads.
Option 2: Async parent flow, but keep the task and subflow sync
- If your main flow must be async, you can still use the thread pool; just await the futures using Prefect’s helpers.
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.futures import wait, resolve_futures_to_results
@flow
def process_interaction(interaction_id: str) -> str:
return f"done: {interaction_id}"
@task
def run_subflow(interaction_id: str) -> str:
return process_interaction(interaction_id)
@flow(task_runner=ThreadPoolTaskRunner(max_workers=10))
async def interaction_process_scheduler(interactions: list[str]):
futures = [run_subflow.submit(i) for i in interactions]
await wait(futures) # wait for completion
results = resolve_futures_to_results(futures) # collect results
# AFTER INTERACTION PROCESS CODE HERE...
return results
Important do’s and don’ts
- Do NOT make process_interaction or run_subflow async if you’re using a ThreadPoolTaskRunner to call a subflow from inside a task. That pattern commonly triggers “Future attached to a different loop” errors.
- Keep the subflow and task sync, even if the parent flow is async.
- If your subflow code relies on async libraries, wrap that call inside the sync subflow (e.g., use anyio.run to execute a small async helper), but keep the subflow signature itself sync to avoid loop mixing.
Scaling and limits
- This approach gives you concurrency within one pod. If you need multiple pods (true cluster fan-out), you’d use deployments instead.
- Control concurrency with max_workers on the task runner and make sure any libraries you call are thread-safe.
If you share whether your work is IO-bound vs CPU-bound and your target concurrency, I can suggest ThreadPool vs ProcessPool and an appropriate max_workers setting.Veeramanikandan Shanmugam
10/26/2025, 8:44 AMMarvin
10/26/2025, 8:44 AMMarvin
10/26/2025, 8:54 AM{
"job_configuration": {
"apiVersion": "batch/v1",
"kind": "Job",
"spec": {
"template": {
"spec": {
"restartPolicy": "Never",
"containers": [
{
"name": "prefect-job",
"image": "{{ image }}",
"imagePullPolicy": "{{ image_pull_policy }}"
}
],
"imagePullSecrets": [
{
"name": "my-registry-secret"
}
]
}
},
"backoffLimit": 0,
"ttlSecondsAfterFinished": "{{ finished_job_ttl }}"
}
},
"variables": {
"type": "object",
"properties": {
"image": {
"type": "string"
},
"image_pull_policy": {
"type": "string",
"default": "IfNotPresent"
},
"finished_job_ttl": {
"type": "integer",
"default": 300
}
}
}
}
Notes:
- The secret must already exist in the target namespace:
kubectl create secret docker-registry my-registry-secret \
--docker-server=REGISTRY_URL \
--docker-username=USERNAME \
--docker-password=PASSWORD \
--namespace YOUR_NAMESPACE
- If you want to make image pull secrets configurable per-deployment, you can parameterize it:
- In the template, add a variable and reference it without quotes:
"imagePullSecrets": {{ image_pull_secrets }}
- In the variables schema:
"image_pull_secrets": {
"type": "array",
"items": { "type": "object", "properties": { "name": { "type": "string" } } }
}
- Then set job_variables on the deployment to:
{
"image_pull_secrets": [
{
"name": "my-registry-secret"
}
]
}
Docs for reference:
- Kubernetes worker guide: Prefect 3: Kubernetes worker
- Managing work pools: Manage work pools
If you’re not using a Kubernetes work pool, let me know which worker type you’re on and I’ll tailor the steps.