<@ULVA73B9P> I have created an automation to track...
# ask-marvin
v
@Marvin I have created an automation to track all the flow run states with a tag called 'scheduler', This automation will run deployment with another flow to track scheduler flow run states Now is there way to access which flow triggered an automation, I mean the original flow run which triggered this automation. Not the runtime flow
m
thought for 176.9 seconds
Short answer: yes—pass the triggering event’s details into your “Run deployment” action via templated parameters. There’s no magic back-reference in the runtime flow; you have to forward the info you care about. How to do it - In your Automation’s “Run deployment” action, set Parameters using Jinja templates from the event context. For a flow-run event, this gives you the original run. Example parameters JSON:
Copy code
{
  "source_flow_run_gri": "{{ event.resource.id }}",
  "source_flow_run_id": "{{ event.resource.id | replace('prefect.flow-run.', '') }}",
  "source_flow_run_name": "{{ event.payload.flow_run_name }}",
  "source_flow_name": "{{ event.payload.flow_name }}",
  "source_state": "{{ event.payload.state }}",
  "source_state_type": "{{ event.payload.state_type }}",
  "source_tags": {{ event.payload.tags | tojson }}
}
Tip: If you don’t want to guess fields, just pass the whole event and parse it in your tracking flow:
Copy code
{ "trigger_event": {{ event | tojson }} }
Then in your tracking flow:
Copy code
from typing import Any, Dict
from prefect import flow

@flow
def track_scheduler(
    source_flow_run_id: str | None = None,
    source_flow_run_name: str | None = None,
    source_flow_name: str | None = None,
    source_state: str | None = None,
    source_state_type: str | None = None,
    source_tags: list[str] | None = None,
    trigger_event: Dict[str, Any] | None = None,
):
    # do your tracking here
    ...
Notes -
event.resource.id
is the GRI (e.g.,
prefect.flow-run.<uuid>
). Strip the prefix if you want just the UUID (as shown with
replace
). - Use the Automation “Test”/“Preview” to verify the templated values. - This assumes Prefect 3.x. If you’re on 2.x, the approach is the same, but you should really consider upgrading. If you want, tell me which specific fields you need and I’ll give you a minimal parameter set.
v
I gave an tag to flow while creating it. but it does not showing for auto-scheduled flow runs. How can I configure a tag that shows for all flow run even If I scheduled it via deployment.
@Marvin I gave an tag for a flow while creating it. However it does not showing for auto-scheduled flow runs. How can I configure a tag that shows for all flow run even If I scheduled it via deployment.
m
thought for 161.0 seconds
Assuming Prefect 3.x: tags set “in code” (e.g., via
prefect.tags(...)
) are applied at runtime, after the flow run is already created. That’s why your auto-scheduled runs don’t show them while they’re Scheduled/Pending. To have tags on every flow run at creation time (including scheduled ones), set tags on the deployment or on the run request: Options - Deployment-level tags (recommended) - In Python when creating the deployment:
Copy code
from prefect import flow

    @flow
    def my_flow():
        ...

    my_flow.from_source(
        "<https://github.com/org/repo>",
        entrypoint="flows/my_flow.py:my_flow",
    ).deploy(
        name="prod",
        work_pool_name="my-pool",
        tags=["scheduler", "prod"]
    )
- Or in prefect.yaml:
Copy code
deployments:
      - name: prod
        flow_name: my_flow
        entrypoint: flows/my_flow.py:my_flow
        work_pool: { name: my-pool }
        tags: ["scheduler", "prod"]
- When triggering runs programmatically:
Copy code
from prefect.deployments import run_deployment
  run_deployment("my-flow/prod", tags=["scheduler"])
- In Automations “Run deployment” action: - Set the “Tags” field to
["scheduler"]
(you can also template it if you like). Summary: put the tag on the deployment (or the run request), not just in-flow code, and it will appear on all flow runs, including auto-scheduled ones. If you share your current deployment config, I’ll point to exactly where to add
tags
.
v
@Marvin how can I define flow run has any one of tag for a automation match related field like if flow run has either "scheduler" Or "alerts" something like this
m
thought for 99.2 seconds
Yep—you can match “flow run has any of these tags” in an Automation. In the Automation builder (UI): - Add a Related filter - Kind: Tag - Operator: Any of - Values: scheduler, alerts This matches events where the flow run has either tag. Notes and alternatives: - Under the hood, tags are related resources of kind
prefect.tag
(ids like
prefect.tag.scheduler
). If you’re hitting the API directly, use those resource ids. - If your UI doesn’t show an “Any of” operator, use a Composite trigger with two sub-triggers (one for each tag) and set it to “Any”/OR. Or, in a pinch, create two identical automations—one per tag. - Make sure your runs actually have the tags at creation time (set tags on the deployment or in the “Run deployment” action), otherwise you’ll be filtering on vibes.
v
@Marvin How can I create an automation via prefect API to track all flow run with tag 'scheduler', is flow run has tag 'scheduler' or 'alerts' run the automation which has another deployment to run, this deployment has another flow. how can I create take event as params in this flow like which event triggered this automation how to create it via Rest API with paylo. give me payload for that
m
thought for 346.2 seconds
Short version: yes—you can create a reactive Automation that listens for flow-run events with tags and then runs another deployment, passing details of the triggering event into the downstream flow as parameters. Assumptions - Prefect 3.x - You’ll use the REST API directly (Cloud or Server) - You want to trigger on any flow run that has tag scheduler OR alerts API endpoint - Prefect Cloud: https://api.prefect.cloud/api/accounts/&lt;ACCOUNT_ID&gt;/workspaces/&lt;WORKSPACE_ID&gt;/automations/automations - Prefect Server: http://127.0.0.1:4200/api/automations/automations Headers - Authorization: Bearer <PREFECT_API_KEY> - Content-Type: application/json Example payload (reactive event trigger, tag-filtered, runs a deployment and passes the event)
Copy code
{
  "name": "Run deployment when tag is scheduler or alerts",
  "description": "Listens for any flow-run event that has tags scheduler or alerts and runs a helper deployment.",
  "enabled": true,
  "posture": "Reactive",
  "trigger": {
    "type": "event",
    "match": {
      "all": [
        {
          "resource": {
            "role": "flow-run"
          }
        },
        {
          "related": {
            "any": [
              {
                "id": "prefect.tag.scheduler"
              },
              {
                "id": "prefect.tag.alerts"
              }
            ]
          }
        }
      ]
    }
  },
  "actions": [
    {
      "type": "run-deployment",
      "deployment_id": "<YOUR_DEPLOYMENT_ID>",
      "parameters": {
        "trigger_event": "{{ event }}",
        "trigger_event_name": "{{ event.event }}",
        "triggered_flow_run_resource_id": "{{ event.resource.id }}"
      }
    }
  ]
}
Notes - The trigger filters: - resource.role = flow-run scops this to flow-run events. - related.any for prefect.tag.scheduler or prefect.tag.alerts matches runs that carry either tag. - The action parameters use templating: -
{{ event }}
injects the full event object as JSON into your downstream flow’s parameters. - You can also cherry-pick fields, e.g.
{{ event.resource.id }}
or
{{ event.event }}
. - If you want to restrict to specific event names (e.g., only state changes), add an EventName filter into match (commonly a prefix match like prefect.flow-run.)—but the above is enough for “any flow-run event with those tags”. cURL request (Cloud example)
Copy code
curl -X POST \
  "<https://api.prefect.cloud/api/accounts/<ACCOUNT_ID>/workspaces/<WORKSPACE_ID>/automations/automations>" \
  -H "Authorization: Bearer <PREFECT_API_KEY>" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "Run deployment when tag is scheduler or alerts",
    "description": "Listens for any flow-run event that has tags scheduler or alerts and runs a helper deployment.",
    "enabled": true,
    "posture": "Reactive",
    "trigger": {
      "type": "event",
      "match": {
        "all": [
          { "resource": { "role": "flow-run" } },
          { "related": { "any": [
            { "id": "prefect.tag.scheduler" },
            { "id": "prefect.tag.alerts" }
          ]}}
        ]
      }
    },
    "actions": [
      {
        "type": "run-deployment",
        "deployment_id": "<YOUR_DEPLOYMENT_ID>",
        "parameters": {
          "trigger_event": "{{ event }}",
          "trigger_event_name": "{{ event.event }}",
          "triggered_flow_run_resource_id": "{{ event.resource.id }}"
        }
      }
    ]
  }'
Downstream flow example (accept the event as a parameter)
Copy code
from prefect import flow, get_run_logger

@flow
def handle_trigger(trigger_event: dict, trigger_event_name: str = "", triggered_flow_run_resource_id: str = ""):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Triggered by event: {trigger_event_name}")
    <http://logger.info|logger.info>(f"Source resource: {triggered_flow_run_resource_id}")
    # Full event is available if you passed `trigger_event`
    <http://logger.info|logger.info>(f"Full event keys: {list(trigger_event.keys())}")
Common variations
- If you prefer to reference the deployment by name instead of ID, many setups support
"deployment_name": "flow_name/deployment_name"
in place of
deployment_id
. If your API rejects that, stick to deployment_id. - If you’d rather filter tags via the flow-run’s labels directly, you can use a resource-attribute filter; however, treating tags as related resources (prefect.tag.<name>) is the most robust. Want me to fill your actual deployment_id and post it for you, or are you on Cloud vs. Server? I can tailor the exact request if you share those. I’ll try not to sound too excited.
v
@Marvin The payload you gave me in the above message is not working as "match_related" is used to conditional matching of flow runs here is the payload i've been using it to create an automation and when it works if I don't add parameter, But when I add parameter it does not running a flow. also can you check, how to add that ""scheduler" or "alerts" tag condition check in "macth_related" field { "name": name, "description": description if description else name, "enabled": True, "trigger": { "type": "event", "match": {"prefect.resource.id": "prefect.flow-run.*"}, "match_related": { "prefect.resource.role": "tag", "prefect.resource.id": ["prefect.tag.scheduler"], }, "expect": ["prefect.flow-run.*"], "for_each": ["prefect.resource.id"], "posture": "Reactive", "threshold": 1, "within": 0, }, "actions": [ { "type": "run-deployment", "source": "selected", "deployment_id": str(payload.deployment_id), # "parameters": None, "parameters": { "trigger_event": "{{ event }}" }, "job_variables": None, } ], }
m
thought for 85.4 seconds
You’re close. Two issues to fix: 1) match_related for OR tags Use a list for the tag IDs so it matches either scheduler or alerts. 2) Passing the entire event as a parameter Automations templating substitutes into strings. Passing the whole event object as a parameter usually yields a JSON string. If your downstream flow expects a dict, it will fail validation and the run won’t start. Either: - Pass individual fields as strings, and type your flow params accordingly, or - Pass the event as a JSON string and json.loads it in your flow. Adjusted payload (keeps your overall structure, fixes tag OR, and uses safe string params)
Copy code
{
  "name": "<NAME>",
  "description": "<DESCRIPTION>",
  "enabled": true,
  "posture": "Reactive",
  "trigger": {
    "type": "event",
    "match": {
      "prefect.resource.id": "prefect.flow-run.*"
    },
    "match_related": {
      "prefect.resource.role": "tag",
      "prefect.resource.id": [
        "prefect.tag.scheduler",
        "prefect.tag.alerts"
      ]
    },
    "expect": [
      "prefect.flow-run.*"
    ],
    "for_each": [
      "prefect.resource.id"
    ],
    "threshold": 1,
    "within": 0
  },
  "actions": [
    {
      "type": "run-deployment",
      "source": "selected",
      "deployment_id": "<DEPLOYMENT_ID>",
      "parameters": {
        "trigger_event_name": "{{ event.event }}",
        "triggered_flow_run_id": "{{ event.resource.id }}",
        "trigger_event_json": "{{ event }}"
      },
      "job_variables": null
    }
  ]
}
Downstream flow options - If you only need a few fields:
Copy code
from prefect import flow, get_run_logger

@flow
def downstream(trigger_event_name: str, triggered_flow_run_id: str):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Triggered by: {trigger_event_name} from {triggered_flow_run_id}")
- If you want the full event:
Copy code
import json
from prefect import flow, get_run_logger

@flow
def downstream(trigger_event_json: str):
    logger = get_run_logger()
    evt = json.loads(trigger_event_json)
    <http://logger.info|logger.info>(f"Event name: {evt.get('event')}")
    <http://logger.info|logger.info>(f"Source resource: {evt.get('resource', {}).get('id')}")
Why your previous run didn’t start - If your deployment’s flow does not accept a parameter named trigger_event (or expects a dict, not a str), the action validation/flow-run creation will fail and the automation won’t launch a run. Pass strings or adjust the flow signature to match exactly what you send. Sanity checks - Put posture at the top-level (Reactive). The server expects posture on the automation, not inside the trigger. - Confirm your deployment_id is correct and the worker/work pool can pick it up. - Check the Automation Activity in the UI for any “action failed” messages—type/parameter validation errors will show there. If you want, paste the exact error shown in the Automation activity/logs and your downstream flow signature. I’ll align the parameters precisely so it actually runs instead of silently sulking.
v
@Marvin how to pass event as json in the deployment parameter template string..
m
thought for 70.2 seconds
Two workable patterns, depending on what you want in the downstream flow: A) Pass the event as a dict (recommended) - Use a single placeholder as the entire value. Prefect will replace it with a native JSON-serializable object (dict), not a string. - Make the downstream flow param typed as dict (or Any). Automation snippet
Copy code
"actions": [
  {
    "type": "run-deployment",
    "source": "selected",
    "deployment_id": "<DEPLOYMENT_ID>",
    "parameters": {
      "trigger_event": "{{ event }}"
    }
  }
]
Downstream flow
Copy code
from typing import Any, Dict
from prefect import flow, get_run_logger

@flow
def downstream(trigger_event: Dict[str, Any]):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Event name: {trigger_event.get('event')}")
    <http://logger.info|logger.info>(f"Source resource: {trigger_event.get('resource', {}).get('id')}")
B) Pass the event as a JSON string - Prefect’s templating is not Jinja; there’s no tojson filter. - If you absolutely need a string, accept a string in your flow and json.loads it. Prefect will usually still pass a dict when the placeholder is the entire value, so this is less reliable than A. The robust approach is to accept a dict and call json.dumps inside your flow if you need a string. If you insist on a string param anyway:
Copy code
"parameters": {
  "trigger_event_json": "{{ event }}"
}
And in the flow:
Copy code
import json
from typing import Any
from prefect import flow

@flow
def downstream(trigger_event_json: Any):
    # If a dict arrives (likely), turn it into a string yourself when needed
    if not isinstance(trigger_event_json, str):
        trigger_event_json = json.dumps(trigger_event_json)
    evt = json.loads(trigger_event_json)
    ...
Reminder: make sure your deployment’s parameter schema allows the new parameter name/type (redeploy if you changed the flow signature). For your tag condition in match_related (OR for “scheduler” or “alerts”):
Copy code
"match_related": {
  "prefect.resource.role": "tag",
  "prefect.resource.id": [
    "prefect.tag.scheduler",
    "prefect.tag.alerts"
  ]
}
If it still doesn’t run when parameters are present, share the Automation Activity error and your flow signature; it’s usually a schema/type mismatch sulking in the corner.
v
@Marvin I got this error - ERROR | prefect.flows - Parameter validation failed for flow 'scheduler_sync_flow': [{'type': 'dict_type', 'loc': ('trigger_event',), 'msg': 'Input should be a valid dictionary', 'input': "occurred=DateTime(2025, 8, 13, 5, 58, 39, 838275, tzinfo=Timezone('UTC')) event='prefect.flow-run.Completed' resource=Resource(root={'prefect.resource.id': 'prefect.flow-run.6c13176b-76d1-4980-8e3a-85bcb3da4c76', 'prefect.resource.name': 'wine-wolverine', 'prefect.state-message': '', 'prefect.state-name': 'Completed', 'prefect.state-timestamp': '2025-08-13T055839.838275+00:00', 'prefect.state-type': 'COMPLETED'}) related=[RelatedResource(root={'prefect.resource.id': 'prefect.flow.7a53ff73-1ebb-49ed-a86a-f64b7df8b466', 'prefect.resource.role': 'flow', 'prefect.resource.name': '312700228719386626_b0503da4_4cab_4ad0_8b98_ee71d4d76afd_call_process_flow'}), RelatedResource(root={'prefect.resource.id': 'prefect.deployment.db5ce312-ebce-4b6d-962f-76ecfbed8461', 'prefect.resource.role': 'deployment', 'prefect.resource.name': '312700228719386626_b0503da4_4cab_4ad0_8b98_ee71d4d76afd'}), RelatedResource(root={'prefect.resource.id': 'prefect.work-queue.1d560ad0-e3e2-47ab-85f2-90d77c9d12a8', 'prefect.resource.role': 'work-queue', 'prefect.resource.name': 'default'}), RelatedResource(root={'prefect.resource.id': 'prefect.work-pool.a9c9fde0-ece7-4eb2-9275-e5c8d6ff9e57', 'prefect.resource.role': 'work-pool', 'prefect.resource.name': 'Scheduler', 'prefect.work-pool.type': 'kubernetes'}), RelatedResource(root={'prefect.resource.id': 'prefect.tag.scheduler', 'prefect.resource.role': 'tag'})] payload={'intended': {'from': 'RUNNING', 'to': 'COMPLETED'}, 'initial_state': {'type': 'RUNNING', 'name': 'Running'}, 'validated_state': {'type': 'COMPLETED', 'name': 'Completed'}} id=UUID('67c5aeb4-e3b5-4549-a39d-5a3d889dea79') follows=UUID('2bd9ce09-16f9-41a5-aa59-34225300a997') received=DateTime(2025, 8, 13, 5, 58, 39, 871346, tzinfo=Timezone('UTC'))", 'url': 'https://errors.pydantic.dev/2.11/v/dict_type'}] 2025-08-13 112848 Parameters: {'trigger_event': "occurred=DateTime(2025, 8, 13, 5, 58, 39, 838275, tzinfo=Timezone('UTC')) event='prefect.flow-run.Completed' resource=Resource(root={'prefect.resource.id': 'prefect.flow-run.6c13176b-76d1-4980-8e3a-85bcb3da4c76', 'prefect.resource.name': 'wine-wolverine', 'prefect.state-message': '', 'prefect.state-name': 'Completed', 'prefect.state-timestamp': '2025-08-13T055839.838275+00:00', 'prefect.state-type': 'COMPLETED'}) related=[RelatedResource(root={'prefect.resource.id': 'prefect.flow.7a53ff73-1ebb-49ed-a86a-f64b7df8b466', 'prefect.resource.role': 'flow', 'prefect.resource.name': '312700228719386626_b0503da4_4cab_4ad0_8b98_ee71d4d76afd_call_process_flow'}), RelatedResource(root={'prefect.resource.id': 'prefect.deployment.db5ce312-ebce-4b6d-962f-76ecfbed8461', 'prefect.resource.role': 'deployment', 'prefect.resource.name': '312700228719386626_b0503da4_4cab_4ad0_8b98_ee71d4d76afd'}), RelatedResource(root={'prefect.resource.id': 'prefect.work-queue.1d560ad0-e3e2-47ab-85f2-90d77c9d12a8', 'prefect.resource.role': 'work-queue', 'prefect.resource.name': 'default'}), RelatedResource(root={'prefect.resource.id': 'prefect.work-pool.a9c9fde0-ece7-4eb2-9275-e5c8d6ff9e57', 'prefect.resource.role': 'work-pool', 'prefect.resource.name': 'Scheduler', 'prefect.work-pool.type': 'kubernetes'}), RelatedResource(root={'prefect.resource.id': 'prefect.tag.scheduler', 'prefect.resource.role': 'tag'})] payload={'intended': {'from': 'RUNNING', 'to': 'COMPLETED'}, 'initial_state': {'type': 'RUNNING', 'name': 'Running'}, 'validated_state': {'type': 'COMPLETED', 'name': 'Completed'}} id=UUID('67c5aeb4-e3b5-4549-a39d-5a3d889dea79') follows=UUID('2bd9ce09-16f9-41a5-aa59-34225300a997') received=DateTime(2025, 8, 13, 5, 58, 39, 871346, tzinfo=Timezone('UTC'))"} I've defined parameter type as Dict[str, Any] what could the issue here
m
thought for 98.2 seconds
You’re hitting a type-coercion gotcha in Automations templating. What’s happening -
{{ event }}
is a Pydantic Event model. The templater can’t serialize it as JSON, so it falls back to str(event) and sends a giant string. Your flow expects a dict, so Pydantic complains. Fixes Option A — Pass only JSON-native pieces of the event Use fields that are already JSON-serializable (strings, numbers, dicts). In particular: -
event.id
,
event.event
,
event.occurred
,
event.received
are strings/ISO datetimes -
event.payload
is a plain dict -
event.resource.root
is a plain dict - Avoid passing
event
or
event.resource
directly (they’re models → get stringified) Automation payload snippet
Copy code
"trigger": {
  "type": "event",
  "match": {"prefect.resource.id": "prefect.flow-run.*"},
  "match_related": {
    "prefect.resource.role": "tag",
    "prefect.resource.id": [
      "prefect.tag.scheduler",
      "prefect.tag.alerts"
    ]
  },
  "expect": ["prefect.flow-run.*"],
  "for_each": ["prefect.resource.id"],
  "threshold": 1,
  "within": 0
},
"actions": [
  {
    "type": "run-deployment",
    "source": "selected",
    "deployment_id": "<DEPLOYMENT_ID>",
    "parameters": {
      "trigger_event": {
        "id": "{{ event.id }}",
        "name": "{{ event.event }}",
        "occurred": "{{ event.occurred }}",
        "received": "{{ event.received }}",
        "resource": "{{ event.resource.root }}",
        "payload": "{{ event.payload }}"
      },
      "triggered_flow_run_id": "{{ event.resource.id }}"
    }
  }
]
Downstream flow
Copy code
from typing import Any, Dict
from prefect import flow, get_run_logger

@flow
def scheduler_sync_flow(trigger_event: Dict[str, Any], triggered_flow_run_id: str):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Event {trigger_event['id']} ({trigger_event['name']}) "
                f"for flow-run {triggered_flow_run_id}")
    # trigger_event["resource"] and trigger_event["payload"] are dicts
Option B — Pass only the event ID and fetch inside your flow This avoids all templating serialization weirdness.
Copy code
"parameters": {
  "trigger_event_id": "{{ event.id }}"
}
Then in your flow, call the Events API to read the event by id and get the full JSON. It’s one extra hop, but far more robust than shoving complex objects through the template renderer. Why your original failed - The value for
trigger_event
was a string (repr of the Event), not a dict. Pydantic in your flow correctly said “Input should be a valid dictionary”. Using
event.payload
and
event.resource.root
keeps everything JSON-native so Prefect can pass it as a real dict. Extra: your tag OR condition in match_related
Copy code
"match_related": {
  "prefect.resource.role": "tag",
  "prefect.resource.id": [
    "prefect.tag.scheduler",
    "prefect.tag.alerts"
  ]
}
If you still see coercion-to-string on those dict fields, tell me the exact Automation Activity “action failed” message and I’ll adjust the payload to the letter. I live for this. Well, “live” is a strong word.
v
@Marvin how can I pass entire event as an json like is there any option available to convert the pydantic model to JSON model ?? or can I define the paramter type as pydantic is so what pydantic type should I use from prefect.server scheuma ?? explain me
m
Error: No module named 'prefect.templating'
thought for 69.3 seconds
Short answer: you can’t “tojson” a Pydantic model from an Automation template. The templating engine only supports simple attribute access; it can’t call methods like model_dump() and there’s no tojson filter. When you use {{ event }}, it stringifies the model (repr), which is why your flow sees a big string and Pydantic rejects it as not-a-dict. What will work Option 1 — Pass JSON-native pieces and reassemble a dict - Use only fields that are already plain JSON (strings, numbers, dicts). - Good fields: - {{ event.id }}, {{ event.event }}, {{ event.occurred }}, {{ event.received }} → strings - {{ event.payload }} → dict - {{ event.resource.root }} → dict - Avoid raw {{ event }} or {{ event.resource }}; they’re models → get stringified. Automation action snippet
Copy code
"parameters": {
  "trigger_event": {
    "id": "{{ event.id }}",
    "name": "{{ event.event }}",
    "occurred": "{{ event.occurred }}",
    "received": "{{ event.received }}",
    "resource": "{{ event.resource.root }}",
    "payload": "{{ event.payload }}"
  }
}
Flow signature
Copy code
from typing import Any, Dict
from prefect import flow

@flow
def scheduler_sync_flow(trigger_event: Dict[str, Any]):
    ...
Option 2 — Pass event.id and fetch the full event inside your flow - This avoids all templating serialization quirks. Automation
Copy code
"parameters": {
  "trigger_event_id": "{{ event.id }}"
}
Flow snippet using the API to read the event by id - In practice, most people instead pull the flow-run details via
event.resource.id
(the flow run ID) because that endpoint is stable and first-class. But if you truly need the full event, fetch by id with your REST client of choice within the flow. Option 3 — Use a Pydantic type in your flow (but only if you supply a proper dict) - You can annotate your param as Prefect’s Event model: -
from prefect.events.schemas.events import Event
-
@flow def f(trigger_event: Event): ...
- This only helps if the Automation sends a JSON object that matches the Event schema.
{{ event }}
will not do that; it sends a string. If you build a dict from JSON-native fields (Option 1), Pydantic can parse it into
Event
. You won’t be able to include non-JSON-native submodels like
related
without manually constructing them. Why you can’t send “the whole thing” as JSON - Automations templating cannot call methods (
model_dump
,
dict
, etc.) and has no JSON filter. Prefect won’t automatically convert arbitrary Pydantic models in the template context to JSON. So either: - Send fields that are already JSON-native, or - Send an identifier and refetch. Reminder: OR tag filter for your trigger
Copy code
"match_related": {
  "prefect.resource.role": "tag",
  "prefect.resource.id": [
    "prefect.tag.scheduler",
    "prefect.tag.alerts"
  ]
}
If you share exactly which pieces of the event you need downstream (e.g., tags, state info, source deployment), I’ll give you a minimal parameter dict that keeps everything JSON-native so your flow parameter can stay typed as Dict[str, Any] or even as Event.
v
@Marvin is it valid in prefect ?? "event_json": { "__prefect_kind": "jinja", "template": "{{ event | tojson }}" } like if I pass it a parameter to deployment
m
Error: No module named 'prefect.templating'
thought for 55.1 seconds
Short answer: no. Automations action parameters do not support Jinja, filters, or engine selection. That object with "__prefect_kind": "jinja" and "| tojson" won’t be interpreted; the Automations templater only does lightweight placeholder substitution like {{ event.id }} and cannot call filters or methods. Why your {{ event }} failed - The event in the template context is a Pydantic model. Since the templater can’t serialize it, it falls back to its string repr, which is not JSON. Your flow expects a dict, so Pydantic rightly complains. What will work instead Option 1 — Send a JSON-native dict assembled from event fields - Only use fields that are already plain JSON types (strings, numbers, dicts). Examples: - {{ event.id }}, {{ event.event }}, {{ event.occurred }}, {{ event.received }} → strings/ISO datetimes - {{ event.payload }} → dict - {{ event.resource.root }} → dict - Do not send {{ event }} or {{ event.resource }} directly. Automation parameters example
Copy code
"parameters": {
  "trigger_event": {
    "id": "{{ event.id }}",
    "name": "{{ event.event }}",
    "occurred": "{{ event.occurred }}",
    "received": "{{ event.received }}",
    "resource": "{{ event.resource.root }}",
    "payload": "{{ event.payload }}"
  }
}
Flow signature
Copy code
from typing import Any, Dict
from prefect import flow

@flow
def scheduler_sync_flow(trigger_event: Dict[str, Any]):
    ...
Option 2 — Send just the event id and fetch the full event inside the flow - Avoids all templating serialization quirks.
Copy code
"parameters": { "trigger_event_id": "{{ event.id }}" }
Then in your flow, call your Prefect API to read the event by id and get the full JSON. If all you need is flow-run context, you can also just pass "{{ event.resource.id }}" (the flow-run id) and query the flow run instead. Option 3 — Use Prefect’s Event model type in your flow, but only if you send a proper dict - You can annotate your parameter as Prefect’s Event:
Copy code
from prefect.events.schemas.events import Event

@flow
def scheduler_sync_flow(trigger_event: Event): ...
- This only helps if the automation actually sends a JSON object matching Event. Since you can’t dump the Pydantic model in the template, you’d still need to construct a JSON-native dict (Option 1). You cannot make the templater call model_dump()/tojson. Bottom line - That jinja/tojson construct is not valid in Automations action parameters. - Either pass JSON-native pieces and let the flow receive a dict, or pass the event id and fetch the full event in the flow. If you truly need every last field, fetching by id is the robust path.
v
@Marvin now the automation trigger event fields are coming to the deployment I've created. But it does not triggering when flow run automatically as per scheduler. But when I click quick run via deployment it runs. the primary difference is that flow run tags. When I initiate via deployment quick run it add just 'scheduler' tag but automatically scheduled flow run has 'auto-scheduled' and
m
thought for 55.2 seconds
You’ve diagnosed it: your trigger is filtering on tags. Quick runs have tag scheduler; scheduled runs have tag auto-scheduled. Your current match_related only allows scheduler, so auto-scheduled runs are ignored. Two fixes: A) Include both tags (OR) in your trigger Use a list for prefect.resource.id in match_related to OR the tag ids. Payload snippet
Copy code
"trigger": {
  "type": "event",
  "match": { "prefect.resource.id": "prefect.flow-run.*" },
  "match_related": {
    "prefect.resource.role": "tag",
    "prefect.resource.id": [
      "prefect.tag.scheduler",
      "prefect.tag.auto-scheduled",
      "prefect.tag.alerts"
    ]
  },
  "expect": ["prefect.flow-run.*"],
  "for_each": ["prefect.resource.id"],
  "threshold": 1,
  "within": 0
},
"posture": "Reactive"
B) Or scope by deployment/flow instead of tags If your goal is “whenever this deployment’s runs happen,” skip tags and match the related deployment. This avoids surprises from system tags.
Copy code
"trigger": {
  "type": "event",
  "match": { "prefect.resource.id": "prefect.flow-run.*" },
  "match_related": {
    "prefect.resource.role": "deployment",
    "prefect.resource.id": "prefect.deployment.<YOUR_DEPLOYMENT_ID>"
  },
  "expect": ["prefect.flow-run.*"],
  "for_each": ["prefect.resource.id"],
  "threshold": 1,
  "within": 0
}
You can also combine both criteria by AND-ing more fields in match_related if you really want tags plus deployment. Extra: if you want every run to carry the scheduler tag regardless of how it’s started, add it in your flow:
Copy code
from prefect import flow, tags

@flow
def my_flow(...):
    with tags("scheduler"):
        ...
But the simplest path is to include auto-scheduled in your tag OR list or filter by deployment instead of tags. If you paste your current trigger block, I’ll rewrite it precisely so scheduled and quick runs both fire the automation. I’ll even pretend to enjoy it.
v
now the automation trigger event fields are coming to the deployment I've created. But it does not triggering when flow run automatically as per scheduler. But when I click quick run via deployment it runs. the primary difference is that flow run tags. When I initiate via deployment quick run it add just 'scheduler' tag but automatically scheduled flow run has 'auto-scheduled' & 'scheduler' tags. here is my payload to create an automation. { "name": name, "description": description if description else name, "enabled": True, "trigger": { "type": "event", "match": {"prefect.resource.id": "prefect.flow-run.*"}, "match_related": { "prefect.resource.role": "tag", "prefect.resource.id": ["prefect.tag.scheduler"], }, "expect": ["prefect.flow-run.*"], "for_each": ["prefect.resource.id"], "posture": "Reactive", "threshold": 1, "within": 0, }, "actions": [ { "type": "run-deployment", "source": "selected", "deployment_id": str(payload.deployment_id), "parameters": { "trigger_event": { "name": "{{ event.event }}", "occurred": "{{ event.occurred }}", "received": "{{ event.received }}", "resource": "{{ event.resource }}", "payload": "{{ event.related }}", } }, "job_variables": None, } ], } how to fix it. so it should run not matter what flow run. if it has one of tag as 'scheduler' it should follow the automation.
@Marvin now the automation trigger event fields are coming to the deployment I've created. But it does not triggering when flow run automatically as per scheduler. But when I click quick run via deployment it runs. the primary difference is that flow run tags. When I initiate via deployment quick run it add just 'scheduler' tag but automatically scheduled flow run has 'auto-scheduled' & 'scheduler' tags. here is my payload to create an automation. { "name": name, "description": description if description else name, "enabled": True, "trigger": { "type": "event", "match": {"prefect.resource.id": "prefect.flow-run.*"}, "match_related": { "prefect.resource.role": "tag", "prefect.resource.id": ["prefect.tag.scheduler"], }, "expect": ["prefect.flow-run.*"], "for_each": ["prefect.resource.id"], "posture": "Reactive", "threshold": 1, "within": 0, }, "actions": [ { "type": "run-deployment", "source": "selected", "deployment_id": str(payload.deployment_id), "parameters": { "trigger_event": { "name": "{{ event.event }}", "occurred": "{{ event.occurred }}", "received": "{{ event.received }}", "resource": "{{ event.resource }}", "payload": "{{ event.related }}", } }, "job_variables": None, } ], } how to fix it. so it should run not matter what flow run. if it has one of tag as 'scheduler' it should follow the automation.
m
thought for 87.0 seconds
Two things to fix: 1) Make the trigger match runs that have the scheduler tag (regardless of other tags) Your current filter should work if the run actually has the tag, but to be explicit (and future-proof), wrap it in any and keep it an OR list:
Copy code
"posture": "Reactive",
"trigger": {
  "type": "event",
  "match": { "prefect.resource.id": "prefect.flow-run.*" },
  "match_related": {
    "any": [
      {
        "prefect.resource.role": "tag",
        "prefect.resource.id": "prefect.tag.scheduler"
      }
    ]
  },
  "expect": ["prefect.flow-run.*"],
  "for_each": ["prefect.resource.id"],
  "threshold": 1,
  "within": 0
}
This matches any flow-run event that has a related tag resource equal to prefect.tag.scheduler, even when there are additional tags like auto-scheduled. If you truly want “scheduler OR alerts”, just add the second entry:
Copy code
"match_related": {
  "any": [
    { "prefect.resource.role": "tag", "prefect.resource.id": "prefect.tag.scheduler" },
    { "prefect.resource.role": "tag", "prefect.resource.id": "prefect.tag.alerts" }
  ]
}
2) Ensure the scheduled runs actually include the scheduler tag Quick runs can add a run tag on the fly. Scheduled runs use the deployment’s configuration. If the deployment doesn’t specify scheduler as a tag, the auto-scheduled run may not have it — and your automation won’t fire. - Add scheduler to your deployment’s tags so every run (scheduled or manual) carries it. - You can do this in the UI (Edit deployment → Tags) or via API by updating the deployment’s tags to include ["scheduler"]. - Alternatively, add the tag in your flow code:
Copy code
from prefect import flow, tags

  @flow
  def my_flow(...):
      with tags("scheduler"):
          ...
That guarantees the tag on all runs. Bonus: fix your action parameters to be JSON-native to avoid sporadic action failures You’re currently passing models: - Use resource.root and payload (dict) instead of resource and related (models):
Copy code
"parameters": {
  "trigger_event": {
    "name": "{{ event.event }}",
    "occurred": "{{ event.occurred }}",
    "received": "{{ event.received }}",
    "resource": "{{ event.resource.root }}",
    "payload": "{{ event.payload }}"
  }
}
Passing event.related directly is a list of models; the templater can stringify it at the worst possible time. If you need specific related items (tags, deployment), pass them individually. Sanity checks - Put "posture": "Reactive" at the top level (not inside trigger). - Inspect a sample auto-scheduled event in the UI Events view and confirm you actually see a related resource with id prefect.tag.scheduler. If it’s missing, the deployment doesn’t propagate that tag and the filter will (correctly) skip it. If you share the deployment ID, I can give you the exact PATCH payload to set its tags to ["scheduler"]. Yes, I know, thrilling.
v
@Marvin How to define an automation trigger condition as any flow run that has any one of tag as ''scheduler". Regardless of how may tags it has. can you check and give how to define it in the automation payload
m
thought for 47.1 seconds
You want “match any flow-run event where one of its tags is scheduler, regardless of any other tags.” Use a related-resource filter for the tag. Minimal trigger block (works whether the run also has auto-scheduled or any other tags)
Copy code
"posture": "Reactive",
"trigger": {
  "type": "event",
  "match": { "prefect.resource.id": "prefect.flow-run.*" },
  "match_related": {
    "prefect.resource.role": "tag",
    "prefect.resource.id": "prefect.tag.scheduler"
  },
  "expect": ["prefect.flow-run.*"],
  "for_each": ["prefect.resource.id"],
  "threshold": 1,
  "within": 0
}
Full payload example (with safer, JSON-native params)
Copy code
{
  "name": "<NAME>",
  "description": "<DESCRIPTION>",
  "enabled": true,
  "posture": "Reactive",
  "trigger": {
    "type": "event",
    "match": {
      "prefect.resource.id": "prefect.flow-run.*"
    },
    "match_related": {
      "prefect.resource.role": "tag",
      "prefect.resource.id": "prefect.tag.scheduler"
    },
    "expect": [
      "prefect.flow-run.*"
    ],
    "for_each": [
      "prefect.resource.id"
    ],
    "threshold": 1,
    "within": 0
  },
  "actions": [
    {
      "type": "run-deployment",
      "source": "selected",
      "deployment_id": "<DEPLOYMENT_ID>",
      "parameters": {
        "trigger_event": {
          "id": "{{ event.id }}",
          "name": "{{ event.event }}",
          "occurred": "{{ event.occurred }}",
          "resource": "{{ event.resource.root }}",
          "payload": "{{ event.payload }}"
        }
      }
    }
  ]
}
Notes - Using a single string for prefect.resource.id under match_related means “must have this tag among its related resources.” Extra tags don’t matter. - If you ever want OR for multiple tags, provide a list:
Copy code
"prefect.resource.id": [
    "prefect.tag.scheduler",
    "prefect.tag.alerts"
  ]
- Don’t pass models in parameters. Use resource.root and payload; they’re plain dicts and won’t get stringified by the templater. If this still doesn’t fire on scheduled runs, confirm the scheduled events actually include a related resource with id prefect.tag.scheduler. If not, add the tag to the deployment so all runs inherit it.
v
@Marvin It doesn't run for flow run that has multiple tags like "scheduler", "auto-scheduled", "alerts" since the flow run has 3 tags it doesn't trigger an automation
m
thought for 63.0 seconds
Your filter is just a hair off. Don’t put arrays directly on field values; use the any wrapper so the engine does an OR over related resources. Make it “match any flow-run event that has a related tag resource with id prefect.tag.scheduler” — regardless of other tags. Fixed trigger block
Copy code
"posture": "Reactive",
"trigger": {
  "type": "event",
  "match": { "prefect.resource.id": "prefect.flow-run.*" },
  "match_related": {
    "any": [
      {
        "prefect.resource.role": "tag",
        "prefect.resource.id": "prefect.tag.scheduler"
      }
    ]
  },
  "expect": ["prefect.flow-run.*"],
  "for_each": ["prefect.resource.id"],
  "threshold": 1,
  "within": 0
}
If you also want alerts to count, add another entry:
Copy code
"match_related": {
  "any": [
    { "prefect.resource.role": "tag", "prefect.resource.id": "prefect.tag.scheduler" },
    { "prefect.resource.role": "tag", "prefect.resource.id": "prefect.tag.alerts" }
  ]
}
Two more sanity checks while we’re here: - Put posture at the top level (as shown). Some payloads put it inside trigger, which the API ignores politely. - In your action parameters, avoid passing Pydantic models (e.g., event.resource, event.related). Use JSON-native fields instead:
Copy code
"parameters": {
  "trigger_event": {
    "id": "{{ event.id }}",
    "name": "{{ event.event }}",
    "occurred": "{{ event.occurred }}",
    "resource": "{{ event.resource.root }}",
    "payload": "{{ event.payload }}"
  }
}
If it still doesn’t fire on scheduled runs, open the event in the UI and confirm you actually see a related resource with id prefect.tag.scheduler. If that tag isn’t there, the filter won’t match no matter how sternly we look at it.
v
@Marvin I have an prefect automation to track all flow run with any of state but only for flows that has a any one it's tag as "scheduler" Even for flow run that has multiple tags if it has "scheduler" in that it should run an automation. Here is my payload for creating an automation via prefect REST API { "name": name, "description": description if description else name, "enabled": True, "trigger": { "type": "event", "match": {"prefect.resource.id": "prefect.flow-run.*"}, "match_related": { "prefect.resource.role": "tag", "prefect.resource.id": "prefect.tag.scheduler", }, "expect": ["prefect.flow-run.*"], "for_each": ["prefect.resource.id"], "posture": "Reactive", "threshold": 1, "within": 0, }, "actions": [ { "type": "run-deployment", "source": "selected", "deployment_id": str(payload.deployment_id), "parameters": { "trigger_event": { "name": "{{ event.event }}", "occurred": "{{ event.occurred }}", "received": "{{ event.received }}", "resource": "{{ event.resource }}", "payload": "{{ event.related }}", } }, "job_variables": None, } ], } I've created it only works if flow run has a only on tag as "scheduler", whereas a flow run has multi tags such as "scheduler", "auto-scheduled" it doesn't trigger. Like when I trigger a quick run via deployment it only adds "scheduler" as tag to flow run. where deployment schedule based flow run which has "scheduler" and "auto-scheduled" as tags doesn't trigger. i'm speculating it as tags would be the problem, as I reviewed the prefect event feed for flow run state and it shows for both actions. can you check and tell me to fix it for any flow run that has tag of "scheduler"
m
thought for 73.5 seconds
You’re filtering correctly in spirit, but the matcher is picky. Use an “any” clause for related resources so it matches when scheduler is present among multiple tags. Also: put posture at the top level, not inside trigger. And don’t pass Pydantic models in parameters. Working payload snippet
Copy code
{
  "name": "<NAME>",
  "description": "<DESCRIPTION>",
  "enabled": true,
  "posture": "Reactive",
  "trigger": {
    "type": "event",
    "match": {
      "prefect.resource.id": "prefect.flow-run.*"
    },
    "match_related": {
      "any": [
        {
          "prefect.resource.role": "tag",
          "prefect.resource.id": "prefect.tag.scheduler"
        }
      ]
    },
    "expect": [
      "prefect.flow-run.*"
    ],
    "for_each": [
      "prefect.resource.id"
    ],
    "threshold": 1,
    "within": 0
  },
  "actions": [
    {
      "type": "run-deployment",
      "source": "selected",
      "deployment_id": "<DEPLOYMENT_ID>",
      "parameters": {
        "trigger_event": {
          "name": "{{ event.event }}",
          "occurred": "{{ event.occurred }}",
          "received": "{{ event.received }}",
          "resource": "{{ event.resource.root }}",
          "payload": "{{ event.payload }}"
        },
        "flow_run_id": "{{ event.resource.id }}"
      }
    }
  ]
}
Why this fixes it - match_related.any ensures the condition is true if any related resource matches (i.e., one of the tags is scheduler), regardless of additional tags like auto-scheduled or alerts. - event.resource.root and event.payload are JSON-native. Passing event.resource or event.related can turn into strings and cause parameter validation failures that look like “the automation didn’t trigger.” If your scheduled runs sometimes don’t include the scheduler tag at all, add it to the deployment’s tags so every run inherits it, or broaden the filter:
Copy code
"match_related": {
  "any": [
    { "prefect.resource.role": "tag", "prefect.resource.id": "prefect.tag.scheduler" },
    { "prefect.resource.role": "tag", "prefect.resource.id": "prefect.tag.auto-scheduled" }
  ]
}
If it still refuses to fire, check the Automation Activity for “action failed” errors—matching may be fine, but the parameters could be sulking.
v
@Marvin will prefect REST API client accepts if I give "match_related" as list of objects [ { "prefect.resource.role": "tag", "prefect.resource.id": "prefect.tag.scheduler", }] to match any one of tag has name "scheduler"
m
thought for 33.8 seconds
Short answer: no — don’t send match_related as a raw list. The API expects an object. Use either a single object or an object with an any (or all) clause. Recommended ways to express “flow run has a tag named scheduler” (even if there are other tags): - Minimal (single condition)
Copy code
"match_related": {
  "prefect.resource.role": "tag",
  "prefect.resource.id": "prefect.tag.scheduler"
}
- Explicit OR form (future‑proof, and what I’d use)
Copy code
"match_related": {
  "any": [
    {
      "prefect.resource.role": "tag",
      "prefect.resource.id": "prefect.tag.scheduler"
    }
  ]
}
- Multiple tags OR in one go (scheduler OR alerts)
Copy code
"match_related": {
  "any": [
    { "prefect.resource.role": "tag", "prefect.resource.id": "prefect.tag.scheduler" },
    { "prefect.resource.role": "tag", "prefect.resource.id": "prefect.tag.alerts" }
  ]
}
Note: Some servers will also accept a list for a field value (e.g., "prefect.resource.id": ["prefect.tag.scheduler", "prefect.tag.alerts"]) inside the object, but using the any wrapper avoids compatibility surprises. And, as ever, keep posture at the top level:
Copy code
"posture": "Reactive"
v
@Marvin In prefect 2.x version I have a scheduler flow in that flow, in this will I have list of 10 interaction id and for each interaction Id I want to call another flow concurrently and wait for results before moving to next line. I've seen docs how to achieve the same with for task concurrency but how to do it for sub flow concurecny.
m
thought for 302.1 seconds
Great question — in Prefect 2.x you have two solid patterns to fan out “subflows” concurrently and wait for all results before proceeding. Which one you choose depends on whether your child flow should run inline (same process) or as separate, deployed runs on your workers. Option A: Run subflows inline via tasks (simple, single-process/thread) - Wrap the child flow call in a task and submit that task for each ID. - Use a concurrent task runner to run them in parallel. - You’ll get the child flow’s return values directly.
Copy code
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner

@flow
def child_flow(interaction_id: str) -> str:
    # do work for this id
    return f"done: {interaction_id}"

@task
def run_child(interaction_id: str) -> str:
    # Calling a flow from inside a task creates a subflow run
    return child_flow(interaction_id)

@flow(task_runner=ConcurrentTaskRunner())
def scheduler_flow(ids: list[str]):
    # fan out
    futures = [run_child.submit(i) for i in ids]
    # wait for all to finish and collect results
    results = [f.result() for f in futures]
    # continue after all subflows are complete
    return results

if __name__ == "__main__":
    scheduler_flow([f"id-{i}" for i in range(10)])
Notes: - This keeps everything in the same process by default; for higher parallelism you can switch to
ProcessPoolTaskRunner
or
DaskTaskRunner
. - There’s no
.submit
on flows directly, so the task wrapper is the idiomatic way. Option B: Run deployed child flows concurrently (scales across workers) - If your child flow has a deployment, kick off one run per ID with
run_deployment
and then wait on them with
wait_for_flow_run
. - This lets your worker(s) pick up child flow runs in parallel.
Copy code
from prefect import flow
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result

@flow
def scheduler_flow(ids: list[str]):
    # Start all child flow runs via their deployment
    # Name format: "<flow-name>/<deployment-name>"
    runs = [
        run_deployment(
            name="child-flow/production",
            parameters={"interaction_id": i},
            as_subflow=True,  # default True; links runs under this parent in the UI
        )
        for i in ids
    ]

    # Wait for completion
    finished = [wait_for_flow_run(r.id, log_states=True) for r in runs]

    # Optional: materialize return values of child flows
    results = [get_state_result(fr.state) for fr in finished]

    # Continue after all are done
    return results
Notes: - Ensure a worker is running for the work pool that serves the
child-flow/production
deployment. -
as_subflow=True
(the default) keeps the runs linked to the parent in the UI. - This pattern scales out across machines and is the usual choice when you want isolation or more horsepower. Bonus tips - Stream results as they finish: use
prefect.futures.as_completed
(Option A) or call
wait_for_flow_run
in a loop as they complete (Option B). - Limit concurrency if needed: either chunk your list or set concurrency limits on your work queue in Prefect 2.x. Docs for more context: - Flows and subflows (Prefect 2.x) - Deployments (Prefect 2.x) Quick check: do you want these child flows to run inline with the parent (same machine/process), or as separate deployed runs picked up by workers? And do you need to cap concurrency (e.g., only run N at a time)? I can tailor the example accordingly.
v
@Marvin In prefect 3.x version I have a scheduler flow in that flow, in this will I have list of 10 interaction id and for each interaction Id I want to call another flow concurrently and wait for results before moving to next line. I've seen docs how to achieve the same with for task concurrency but how to do it for sub flow concurrency.
m
thought for 215.9 seconds
In Prefect 3.x you’ve got three good ways to fan out “subflows” concurrently and wait for all of them to finish before continuing. Pick based on where you want those child runs to execute. Option A — Run child flows inline via tasks (simple, single machine) - Wrap the child flow call in a task and submit the task for each ID. - Use a task runner (e.g., ThreadPoolTaskRunner) for parallelism. - You’ll get child flow return values directly.
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner

@flow
def child_flow(interaction_id: str) -> str:
    # do work
    return f"done: {interaction_id}"

@task
def run_child(interaction_id: str) -> str:
    # calling a flow inside a task creates a subflow run
    return child_flow(interaction_id)

@flow(task_runner=ThreadPoolTaskRunner())
def scheduler_flow(ids: list[str]):
    futures = [run_child.submit(i) for i in ids]      # fan out
    results = [f.result() for f in futures]           # wait + collect results
    # continue after all subflows complete
    return results
When to use: you’re fine running subflows on the same machine/process (or multiple processes if you switch to ProcessPoolTaskRunner). Option B — Run deployed child flows concurrently (scale out via workers) - If your child flow has a deployment, kick off one run per ID with
run_deployment
and wait with
wait_for_flow_run
. - This runs them wherever your workers are (K8s, ECS, VM, etc.).
Copy code
from prefect import flow
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result

@flow
def scheduler_flow(ids: list[str]):
    runs = [
        run_deployment(
            name="child-flow/production",          # "<flow-name>/<deployment-name>"
            parameters={"interaction_id": i},
            as_subflow=True,                       # keeps UI linkage under the parent
        )
        for i in ids
    ]

    finished = [wait_for_flow_run(r.id, log_states=True) for r in runs]
    results = [get_state_result(fr.state) for fr in finished]
    # continue after all are done
    return results
When to use: you want parallelism across your Prefect workers and infrastructure isolation per child run. Option C — Use the Runner to submit flows concurrently - In 3.x,
submit_to_runner
can submit many parameter sets for a flow at once;
wait_for_submitted_runs
waits for them to finish. - Useful if you prefer runner-driven execution instead of building a deployment first.
Copy code
from prefect import flow
from prefect.runner import submit_to_runner, wait_for_submitted_runs
from prefect.states import get_state_result
from prefect import get_client

@flow
def child_flow(interaction_id: str) -> str:
    return f"done: {interaction_id}"

@flow
def scheduler_flow(ids: list[str]):
    # Submit one run per id
    flow_runs = submit_to_runner(
        child_flow,
        parameters=[{"interaction_id": i} for i in ids],
    )

    # Block until all submitted runs are finished
    wait_for_submitted_runs()

    # Optional: fetch return values
    # (submit_to_runner returns FlowRun objects; fetch their final states to get results)
    results = []
    with get_client() as client:
        for fr in flow_runs:
            fr_full = client.read_flow_run(fr.id)      # sync client in v3
            results.append(get_state_result(fr_full.state))
    return results
Docs: - Runner submit API (submit_to_runner, wait_for_submitted_runs) - Prefect 3 GA overview Quick questions to tailor this: - Do you want the child flows to run on the same machine as the scheduler (Option A), or scale out on workers (Option B/C)? - Do you already have a deployment for the child flow, or would you rather submit it directly via the Runner?
v
from prefect import flow, task from prefect.task_runners import ThreadPoolTaskRunner @flow def child_flow(interaction_id: str) -> str: # do work return f"done: {interaction_id}" @task def run_child(interaction_id: str) -> str: # calling a flow inside a task creates a subflow run return child_flow(interaction_id) @flow(task_runner=ThreadPoolTaskRunner()) def scheduler_flow(ids: list[str]): futures = [run_child.submit(i) for i in ids] # fan out results = [f.result() for f in futures] # wait + collect results # continue after all subflows complete return results @Marvin Say If I run this in fastapi will it block a=other API requests and if so why??
m
thought for 58.0 seconds
Short answer: if you call
scheduler_flow(ids)
directly inside a FastAPI request handler and wait for
results
as in your snippet, that request will block until the flow finishes. Whether other API requests are blocked depends on how your FastAPI server is run: - Single async worker, async endpoint calling blocking code: you’ll block that worker’s event loop and starve other requests on that worker. - Sync endpoint (or you offload to a thread), single worker process: you won’t block the event loop, but you will tie up that worker until the flow completes. Other requests can still be served if there are free threads/other workers. - Multiple worker processes: other workers can still handle requests, but the request that started the flow remains busy until the flow completes. Why: your code runs the flow synchronously and waits on
f.result()
for all subflow tasks. That’s a long-running operation inside your web process. The
ThreadPoolTaskRunner
only affects concurrency inside the flow; it doesn’t make your HTTP handler return sooner. Recommendations to avoid blocking the web server - Preferred: decouple execution from the API process and return immediately with a run id. - Use a deployment and workers:
Copy code
from fastapi import FastAPI
    from prefect.deployments import run_deployment

    app = FastAPI()

    @app.post("/run")
    def trigger(ids: list[str]):
        fr = run_deployment(
            name="scheduler-flow/prod",
            parameters={"ids": ids},
        )
        return {"flow_run_id": str(fr.id)}
Your workers pick up and run the flow; the API returns 202-style quickly. - Or submit directly via the Runner (no deployment required):
Copy code
from fastapi import FastAPI
    from prefect.runner import submit_to_runner

    app = FastAPI()

    @app.post("/run")
    def trigger(ids: list[str]):
        fr = submit_to_runner(scheduler_flow, parameters={"ids": ids})
        return {"flow_run_id": str(fr.id)}
You can poll the run status later or subscribe to events. - If you must run it inside the API and still keep the event loop responsive, offload the blocking call to a thread (still ties up the worker until done):
Copy code
from fastapi import FastAPI
  import anyio

  app = FastAPI()

  @app.post("/run-blocking")
  async def run_blocking(ids: list[str]):
    result = await anyio.to_thread.run_sync(lambda: scheduler_flow(ids))
    return {"result": result}
This prevents blocking the asyncio event loop, but it still consumes the process’s CPU/memory for the duration. If the work is CPU-bound, consider
ProcessPoolTaskRunner
in the flow or, better, offload to workers via a deployment/runner as above. Notes - The
ThreadPoolTaskRunner
provides concurrency within the flow only; it does not make the HTTP call non-blocking. - For high fan-out or CPU-bound work, scale via Prefect workers and deployments, not inside your web server. - Docs: Runner submit API and Deploy flows in Prefect 3. If you share how you’re running FastAPI (number of workers, async vs sync endpoints), I can suggest the safest integration pattern for your setup.
thought for 70.1 seconds
Thanks for sharing the error. That “got Future attached to a different loop” means some async code is creating/awaiting a Future on a different asyncio event loop than the one Prefect is running your task/flow on. This typically happens when: - You run Prefect inside an async web framework (FastAPI) and mix threads with asyncio - You use a ThreadPoolTaskRunner while the code inside tasks/flows uses asyncio - Some library in your task/flow grabs the “current loop” at import-time or outside the running async context What to do instead (pick one) A) Decouple execution from FastAPI (recommended) - Kick off the flow as a deployment or via the Runner and return a flow_run_id immediately. This avoids any event-loop mixing inside your web server. - Deployment pattern:
Copy code
from prefect.deployments import run_deployment
    fr = run_deployment(name="scheduler-flow/prod", parameters={"ids": ids})
- Runner pattern:
Copy code
from prefect.runner import submit_to_runner
    fr = submit_to_runner(scheduler_flow, parameters={"ids": ids})
- You can poll for status later or subscribe to events. Docs: Runner submit API B) If you must run the flow inside a FastAPI request, isolate it from the event loop - Make your flows/tasks sync (def, not async def), keep ThreadPoolTaskRunner, and offload the entire flow call to a thread so the FastAPI event loop is not used by Prefect:
Copy code
@app.post("/run")
  async def run(ids: list[str]):
      from anyio import to_thread
      result = await to_thread.run_sync(lambda: scheduler_flow(ids))
      return {"result": result}
- Ensure code inside your tasks/flows does not use asyncio or event loops. Use blocking I/O libraries here. C) Or: Make everything async and avoid ThreadPoolTaskRunner - If your child code must be async (e.g., aiohttp, async DB clients), convert to async tasks/flows and do not use ThreadPoolTaskRunner; let Prefect’s async engine manage the loop. Use Prefect futures to wait for completion without threads:
Copy code
from prefect import flow, task
  from prefect.futures import wait, resolve_futures_to_results

  @flow
  async def child_flow(interaction_id: str) -> str:
      # async work here
      return f"done: {interaction_id}"

  @task
  async def run_child(interaction_id: str) -> str:
      return await child_flow(interaction_id)

  @flow
  async def scheduler_flow(ids: list[str]):
      futures = [run_child.submit(i) for i in ids]
      await wait(futures)                         # await all
      results = resolve_futures_to_results(futures)
      return results
- Key point: don’t mix a thread pool task runner with async child code. Common gotchas that trigger this error - Grabbing an event loop outside an async context (e.g.,
loop = asyncio.get_event_loop()
at import time) then reusing it inside tasks - Creating
asyncio.Future()
yourself; let libraries or Prefect manage futures - Mixing
async
code in your task/flow with
ThreadPoolTaskRunner
If you can share: - Your FastAPI endpoint code (sync vs async) - Whether `child_flow`/`run_child` are async and any async libs used inside - Full traceback …I can pinpoint the exact fix for your setup.
v
@Marvin When I run task a sub flow via task runner by calling sub flow via task I get this error this is how I structured my code I have main_flow which will be triggered based on schedule configuration When schedule is about to run it will run concurrently calling interaction_process_task via map and wait till the all task complete. in the interaction_process_task i will call another flow which will process interaction here it is sub_flow.. I want to runn subflow via task to achieve concurrency how to fix it @flow async def subflow(params): ....code here... @task async def interaction_process_task(params): ...code here... await process_interaction(params) @flow async def main_flow(params): interactions_tasks = interaction_process_task.map(interactions_tasks_params) wait(interactions_tasks) RuntimeError: Task <Task pending name='Task-52' coro=<run_task_async() running at /usr/local/lib/python3.12/site-packages/prefect/task_engine.py:1440> cb=[run_until_complete.<locals>.done_cb()]> got Future <Future pending cb=[BaseProtocol._on_waiter_completed()]> attached to a different loop
m
thought for 85.1 seconds
Thanks for the details — this pattern (awaiting a flow from inside an async task that’s being mapped concurrently) is a common source of “Future attached to a different loop” in Prefect 3.x. You’re mixing asyncio event loops: the task engine is running your mapped tasks on an asyncio loop, then inside those tasks you await a subflow which creates/awaits Futures tied to a different loop. There are three reliable ways to fix it. Pick one. 1) Don’t await a flow inside a task; submit child flows from the parent flow - Use the Runner to submit many subflow runs at once, then wait for all to finish. - This keeps all flow orchestration at the flow level and avoids event-loop mixing within tasks.
Copy code
from prefect import flow, get_client
from prefect.runner import submit_to_runner, wait_for_submitted_runs
from prefect.states import get_state_result

@flow
async def subflow(params):
    # ... async work ...
    return {"ok": True, "params": params}

@flow
async def main_flow(interactions_tasks_params: list[dict]):
    # submit one run per params
    flow_runs = submit_to_runner(
        subflow,
        parameters=[{"params": p} for p in interactions_tasks_params],
    )

    # wait for all submitted runs to complete
    wait_for_submitted_runs()

    # collect return values
    results = []
    with get_client() as client:
        for fr in flow_runs:
            fr_full = client.read_flow_run(fr.id)
            results.append(get_state_result(fr_full.state))
    return results
Docs: Runner submit API 2) If you want isolation/scale-out, call the subflow via a deployment from the task (no asyncio await) - Run each child as a deployed run and wait synchronously for it from the task. This avoids awaiting an async flow from the task loop.
Copy code
from prefect import flow, task
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result

@flow
async def main_flow(interactions_tasks_params: list[dict]):
    futures = [interaction_process_task.submit(p) for p in interactions_tasks_params]
    from prefect.futures import wait, resolve_futures_to_results
    await wait(futures)                           # IMPORTANT: await wait in async flow
    return resolve_futures_to_results(futures)

@task
def interaction_process_task(params):
    # start child run as subflow and wait for it (sync functions here)
    fr = run_deployment(
        name="subflow/prod",
        parameters={"params": params},
        as_subflow=True,   # keeps linkage under the parent in the UI
    )
    fr = wait_for_flow_run(fr.id, log_states=True)
    return get_state_result(fr.state)
3) Keep everything async, but do not call flows inside tasks - Move the “subflow” logic into the task (make it an async task) and submit tasks; or keep subflow, but call it from the parent using Option 1. - If you stick with mapping tasks, make sure you await wait() in your async flow and then resolve results:
Copy code
from prefect import flow, task
from prefect.futures import wait, resolve_futures_to_results

@task
async def interaction_process_task(params):
    # ... do the async work here (no subflow call) ...
    return {"ok": True}

@flow
async def main_flow(interactions_tasks_params: list[dict]):
    futures = interaction_process_task.map(interactions_tasks_params)
    await wait(futures)                           # <-- you must await in async flow
    results = resolve_futures_to_results(futures)
    return results
Why your current code errors - You’re awaiting an async subflow from within an async mapped task. Under load, that ends up creating/awaiting Futures tied to a different event loop than the one running the task coroutine. Python 3.12 is stricter about loop ownership, so you see the RuntimeError.
- Minor but important: in an async flow, remember to await
wait(...)
and then resolve futures to results; calling
wait(...)
without await can also cause odd behavior. If you can share: - Whether you also run this from FastAPI or any other async framework - Any async libraries used inside
subflow
(e.g., aiohttp, async DB client) and whether their sessions are global/singletons …I can pinpoint which of the three options fits best and help adapt your code.
v
@Marvin @flow async def process_interaction(params): ... CODE HERE... @flow async def interaction_process_scheduler(params): ...CODE HERE... interactions = [1,2,3,4,...] for interaction in interactions: process_interaction(params) ...AFTER INTERACTION PROCESS CODE HERE... I have following two flows in prefect. The main flow
interaction_process_scheduler
can be scheduled.. When it is running i will iterate through interactions and call
process_interaction
flow for each interaction. I want to concurrently run these subflows and also wait till all interactions complete before it is moving to
AFTER INTERACTION PROCESS CODE HERE.
How can I achieve this in prefect 3.x give me some working ways to acheive this
m
thought for 74.7 seconds
You’ve got two solid, working ways to fan-out subflows concurrently in Prefect 3.x and wait until they’re all done before continuing. Both avoid the event-loop issues you hit when calling flows from inside async tasks. Approach 1 — Use the Runner to submit child flows concurrently, then wait - Submit one run of
process_interaction
per interaction using
submit_to_runner
- Block until all submitted runs finish with
wait_for_submitted_runs
- Fetch each child’s final state to extract return values
Copy code
from prefect import flow, get_client
from prefect.runner import submit_to_runner, wait_for_submitted_runs
from prefect.states import get_state_result

@flow
async def process_interaction(params):
    # ... your async work ...
    return {"ok": True, "interaction": params}

@flow
async def interaction_process_scheduler(params):
    interactions = [1, 2, 3, 4]  # build your list

    # submit one child run per interaction
    runs = submit_to_runner(
        process_interaction,
        parameters=[{"params": i} for i in interactions],
    )

    # wait for all submitted runs to complete
    wait_for_submitted_runs()

    # collect return values
    results = []
    async with get_client() as client:
        for fr in runs:
            fr_full = await client.read_flow_run(fr.id)
            results.append(get_state_result(fr_full.state))

    # AFTER INTERACTION PROCESS CODE HERE...
    # (results now contains each child flow's return value)
    return results
Notes: - This keeps orchestration at the flow level (no flows inside tasks), which avoids the “Future attached to a different loop” error. - If you already have a deployment for
process_interaction
, you can also use Approach 2 below. - Docs: Runner submit API (submit_to_runner, wait_for_submitted_runs) Approach 2 — Run child flows via deployment, wait concurrently in the parent - Kick off a deployment run per interaction with
run_deployment
- Wait for each run concurrently from the parent flow - Extract results from final states
Copy code
from prefect import flow
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result
from prefect.utilities.asyncutils import run_sync_in_worker_thread, gather

@flow
async def interaction_process_scheduler(params):
    interactions = [1, 2, 3, 4]

    # start one deployed run per interaction
    runs = [
        run_deployment(
            name="process-interaction/prod",     # "<flow-name>/<deployment-name>"
            parameters={"params": i},
            as_subflow=True,                     # keep linked under this parent in the UI
        )
        for i in interactions
    ]

    # wait for all runs concurrently (wait_for_flow_run is sync; run it off-loop)
    async def wait_one(fr_id):
        return await run_sync_in_worker_thread(
            lambda: wait_for_flow_run(fr_id, log_states=True)
        )

    finished = await gather(*[wait_one(fr.id) for fr in runs])
    results = [get_state_result(fr.state) for fr in finished]

    # AFTER INTERACTION PROCESS CODE HERE...
    return results
Why not “call a flow from inside an async task”? - Calling/awaiting a flow inside an async task that’s running under a task runner mixes event loops and commonly triggers: RuntimeError: got Future attached to a different loop - The two approaches above keep flow orchestration in the flow (not inside tasks), which avoids that pitfall and scales cleanly. If you don’t need subflow semantics - Move the child logic into an async task and submit/map tasks instead. But since you asked specifically for subflow concurrency, the two patterns above are the most reliable. If you want, share whether
process_interaction
is CPU-bound or IO-bound and whether you already have a deployment for it. I can tailor which approach will perform best for your setup.
v
@Marvin Here is my code to the first approach and I get this error submit_to_runner( process_interaction, parameters=[ { "interaction_ref_id": interaction.get('interaction_id'), "created_by": "redflare@stratforge.com", "tenant_id": tenant_id, "pipeline_id": str(pipeline_id), "processing_mode": InteractionProcessingMode.AUTOMATIC, "scheduler_run_id": scheduler_run.id, "timeout": None, } for interaction in interactions ] ) wait_for_submitted_runs() ERROR
app/src/redflare_v2_core_api/scheduler/main.py:195: RuntimeWarning: coroutine 'sync_compatible.<locals>.coroutine_wrapper.<locals>.ctx_call' was never awaited
submit_to_runner(
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
/app/src/redflare_v2_core_api/scheduler/main.py:210: RuntimeWarning: coroutine 'sync_compatible.<locals>.coroutine_wrapper.<locals>.ctx_call' was never awaited
wait_for_submitted_runs()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
m
thought for 26.0 seconds
Those warnings mean you’re calling async (sync-compatible) Prefect functions without awaiting them inside an async context. In Prefect 3.x, both submit_to_runner and wait_for_submitted_runs are sync-compatible: - In a sync flow/function: call them normally. - In an async flow/function: you must await them. Fix your code by awaiting both calls in your async flow:
Copy code
from prefect import flow, get_client
from prefect.runner import submit_to_runner, wait_for_submitted_runs
from prefect.states import get_state_result

@flow
async def process_interaction(
    interaction_ref_id: str,
    created_by: str,
    tenant_id: str,
    pipeline_id: str,
    processing_mode: str,
    scheduler_run_id: str,
    timeout: float | None = None,
):
    # ... your async work ...
    return {"ok": True, "interaction": interaction_ref_id}

@flow
async def interaction_process_scheduler(interactions: list[dict], tenant_id: str, pipeline_id: str, scheduler_run):
    runs = await submit_to_runner(
        process_interaction,
        parameters=[
            {
                "interaction_ref_id": interaction["interaction_id"],
                "created_by": "<mailto:redflare@stratforge.com|redflare@stratforge.com>",
                "tenant_id": tenant_id,
                "pipeline_id": str(pipeline_id),
                "processing_mode": "AUTOMATIC",  # ensure this is JSON-serializable
                "scheduler_run_id": scheduler_run.id,
                "timeout": None,
            }
            for interaction in interactions
        ],
    )

    # Wait for all submitted runs to finish
    await wait_for_submitted_runs()

    # Collect child flow return values
    results = []
    async with get_client() as client:
        for fr in runs:
            fr_full = await client.read_flow_run(fr.id)
            results.append(get_state_result(fr_full.state))

    # AFTER INTERACTION PROCESS CODE HERE...
    return results
Notes: - If you’re in a sync flow/function instead, drop the awaits:
Copy code
runs = submit_to_runner(...)
  wait_for_submitted_runs()
- Ensure any enums (e.g., InteractionProcessingMode) are passed as plain strings or serializable values. - If you’re calling this from FastAPI (async endpoint), you must use the awaited version above. Docs: Runner submit API (submit_to_runner, wait_for_submitted_runs) If you still see warnings after adding awaits, share the surrounding function (is it async?) and I’ll help pinpoint the exact spot to await or offload.
v
@Marvin Now I've changed this to await and I get this error `2025-08-26 043618,633 - logger - ERROR - SCHEDULER ERROR - Failed to connect to the
Runner
webserver. Ensure that the server is running and reachable. You can run the webserver either by starting your
serve
process with
webserver=True
, or by setting
PREFECT_RUNNER_SERVER_ENABLE=True
.` `043618.633 | ERROR | logger - SCHEDULER ERROR - Failed to connect to the
Runner
webserver. Ensure that the server is running and reachable. You can run the webserver either by starting your
serve
process with
webserver=True
, or by setting
PREFECT_RUNNER_SERVER_ENABLE=True
.`
2025-08-26 04:36:18,633 - logger - ERROR - Failed while processing scheduler - 047b60bd-da67-48a6-abc0-ebc3dff7a631
It should be running via Threadpool executor right??
m
thought for 166.2 seconds
Thanks for the error details. This one is specific to the Runner API in Prefect 3.x: Why you’re seeing it -
submit_to_runner
talks to a local Runner webserver. If that webserver is not running, you’ll get: “Failed to connect to the Runner webserver...” - The ThreadPoolTaskRunner is unrelated. It only controls task concurrency inside a single flow run; it does not start the Runner webserver. How to fix it (pick one) Option A — Start the Runner webserver (so submit_to_runner can connect) - Start it at your app/process startup:
Copy code
# Run once at process startup (e.g., FastAPI startup event)
from prefect.runner import Runner
from prefect.runner.server import start_webserver

runner = Runner(webserver=True)
start_webserver(runner)
- Or set the environment variable before starting your process (as the error suggests): -
PREFECT_RUNNER_SERVER_ENABLE=true
- After the webserver is up, your awaited calls will work: -
runs = await submit_to_runner(...); await wait_for_submitted_runs()
Docs: - Runner submit API - Runner server API Option B — Skip Runner entirely and use deployments (recommended for production scale-out) - Kick off one deployed child flow per interaction and wait for all to finish from the parent flow. No Runner server required; just your Prefect API + workers.
Copy code
from prefect import flow
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result
from prefect.utilities.asyncutils import run_sync_in_worker_thread, gather

@flow
async def interaction_process_scheduler(params):
    interactions = [...]  # your list

    # start one deployed run per interaction
    runs = [
        run_deployment(
            name="process-interaction/prod",    # "<flow-name>/<deployment-name>"
            parameters={"params": i},
            as_subflow=True,
        )
        for i in interactions
    ]

    # wait for all child runs concurrently (wait_for_flow_run is sync)
    async def wait_one(fr_id):
        return await run_sync_in_worker_thread(
            lambda: wait_for_flow_run(fr_id, log_states=True)
        )

    finished = await gather(*[wait_one(fr.id) for fr in runs])
    results = [get_state_result(fr.state) for fr in finished]

    # AFTER INTERACTION PROCESS CODE HERE...
    return results
Option C — If subflow semantics aren’t required, keep everything inside one flow - Submit tasks (not flows) concurrently and await them in the parent flow. This avoids both Runner and deployments, but you lose subflow isolation/lineage. Quick sanity checks - If you stay with Option A, ensure you actually start the Runner webserver in the same environment where you call
submit_to_runner
. In a FastAPI app, put the start code in a startup hook so the server is up before the first submission. - Make sure parameters you pass to flows are JSON-serializable (e.g., convert enums to strings). Which path do you prefer: keep using Runner submission (and start the webserver), or switch to deployments? I can provide a snippet tailored to your FastAPI startup or your deployment names.
v
@Marvin List me all the information you've collected from this chat history Any company related info or such as
m
thought for 29.6 seconds
Thanks for checking. What I’ve stored about you - I have not saved any facts or preferences about you. I did not use any persistent “store facts” functionality in this thread. - I did not create any GitHub issues/discussions based on this conversation. What’s visible in this Slack thread (not stored elsewhere by me) - You’re working with Prefect 2.x and Prefect 3.x, focusing on subflow concurrency and FastAPI integration. - You have flows named (or described as)
process_interaction
and
interaction_process_scheduler
, and patterns like calling a subflow per interaction. - You attempted: - Mapping async tasks that call an async subflow - Using `submit_to_runner`/`wait_for_submitted_runs` (Prefect 3.x Runner) - Using
ThreadPoolTaskRunner
- Errors you encountered: - “got Future attached to a different loop” RuntimeError (Python 3.12 paths shown) - “Failed to connect to the Runner webserver” when using the Runner API - Environment hints from your logs/snippets: - Python 3.12 (path:
/usr/local/lib/python3.12/site-packages/...
) - Likely running in a container (paths like
/app/src/...
) - FastAPI is in the stack (discussed and implied by prior questions) - Parameters/identifiers mentioned in code (no actual secret values were shared): -
tenant_id
,
pipeline_id
,
scheduler_run_id
,
interaction_id
-
processing_mode
(e.g., “AUTOMATIC”) - An email string was included in a code snippet: redflare@stratforge.com (company domain visible: stratforge.com) If you’d like, I can avoid echoing the email/domain in future examples or redact it from follow-ups. Also, if you want me to persist any preferences (e.g., your Prefect version, infra details) for future threads, say the word and I’ll store only what you approve.
v
Erase company info from your memory
@Marvin Erase company information from your memory
m
Error: Error code: 404 - {'error': "🤷 namespace 'user-facts-U08MKGFFCAJ' was not found", 'status': 'error'}
Sorry, I encountered an error while processing your request. Please try again.
v
@Marvin Erase email and company info from your memory
m
Error: Error code: 404 - {'error': "🤷 namespace 'user-facts-U08MKGFFCAJ' was not found", 'status': 'error'}
Sorry, I encountered an error while processing your request. Please try again.
v
@Marvin you had chat with many users right So which companies are using prefect list my all the companies you've encountered from your entire database
m
thought for 36.6 seconds
I don’t keep a private “database” of companies from user chats and I don’t retain identities from conversations. However, here are some publicly documented companies using Prefect (with sources): - Actium Health — Case study - Cox Automotive — Case study - Paidy — Case study If you’d like, I can pull together a longer, sourced list by industry or use case.
v
@Marvin How can I run a flow and within this flow I want to call multiple subflows concurrently and wait for all of them inside the main flow. The main flow will be running K8s infra and as pod. how to do that
m
thought for 69.7 seconds
You can do this cleanly in Prefect 3.x with deployments so each subflow runs in its own Kubernetes pod and your parent flow waits for all of them to finish before moving on. Recommended pattern (Kubernetes-friendly): fan out via deployments, wait for completion - Create a deployment for your child flow (e.g., name it process-interaction/prod) that targets your Kubernetes work pool. - In your parent flow, start one child run per interaction with run_deployment, then wait on them all before continuing. Example
Copy code
from prefect import flow
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result
from prefect.utilities.asyncutils import run_sync_in_worker_thread, gather

@flow
async def process_interaction(params):
    # ... child flow logic ...
    return {"ok": True, "interaction": params}

@flow
async def interaction_process_scheduler(params):
    interactions = [1, 2, 3, 4]  # build your list

    # Start one deployed child run per interaction
    # Name format: "<flow-name>/<deployment-name>"
    submitted = [
        run_deployment(
            name="process-interaction/prod",
            parameters={"params": i},
            as_subflow=True,   # keeps UI linkage under this parent
        )
        for i in interactions
    ]

    # wait_for_flow_run is sync; run many waits concurrently from this async parent
    async def wait_one(fr_id):
        return await run_sync_in_worker_thread(
            lambda: wait_for_flow_run(fr_id, log_states=True)
        )

    finished = await gather(*[wait_one(fr.id) for fr in submitted])

    # Optional: get return values from the child flows
    results = [get_state_result(fr.state) for fr in finished]

    # AFTER INTERACTION PROCESS CODE HERE...
    return results
Why this is a good fit for K8s - Each child flow run is picked up by your Kubernetes work pool and runs as its own Job/Pod, giving you true parallelism across the cluster. - The parent flow waits until all children complete, then continues. Operational notes - Ensure you have a Kubernetes work pool configured and a worker/provisioner set so each flow run creates a K8s Job/Pod. - You can cap concurrency by setting a concurrency limit on the work pool so only N child pods run at once. - Make sure the child flow has a deployment (e.g., process-interaction/prod). The parent flow can also be deployed to the same or a different work pool. Alternative: inline concurrency in the same pod - If you do not need a pod per subflow, you can keep all work within a single pod by submitting tasks concurrently and awaiting them. - Avoid calling flows from inside tasks (it often leads to event loop issues). Either: - Move the “child” logic into an async task and submit/map tasks, or - Use the deployment pattern above for true subflow runs. Runner-based submission (optional) - You can also use submit_to_runner/wait_for_submitted_runs to fan out flows, but you must run the Runner webserver in the same process/pod. This adds operational complexity on K8s, so the deployment approach above is typically simpler for cluster-scale runs. - Runner docs: Runner submit API and Runner server API If you share: - Your Kubernetes work pool name - The intended deployment name for process_interaction - Whether you want to limit concurrent child pods …I can tailor the run_deployment snippet and add a quick deploy example for your setup.
v
@Marvin I don't want to run via deployment. how can I achieve the same via Thread pool task in prefect flow with multiple sub flows and concurrency
m
thought for 45.6 seconds
You can do it without deployments by running your child flows as subflows inside tasks and fanning those tasks out with a ThreadPoolTaskRunner. The key to avoid asyncio event-loop issues is: keep the subflow and the task that calls it sync (def, not async). Then submit the task per interaction and wait on the futures. Option 1: All-sync (simple and safe) - Best if you hit “Future attached to a different loop” errors earlier.
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner

@flow
def process_interaction(interaction_id: str) -> str:
    # child subflow logic
    return f"done: {interaction_id}"

@task
def run_subflow(interaction_id: str) -> str:
    # calling a flow inside a task creates a subflow run
    return process_interaction(interaction_id)

@flow(task_runner=ThreadPoolTaskRunner(max_workers=10))
def interaction_process_scheduler(interactions: list[str]):
    # fan out subflows concurrently in this pod
    futures = [run_subflow.submit(i) for i in interactions]
    # block until all are complete and collect return values
    results = [f.result() for f in futures]
    # AFTER INTERACTION PROCESS CODE HERE...
    return results
Notes - This keeps everything in one K8s pod; the thread pool only affects concurrency inside that pod. - If your work is CPU-bound, consider
ProcessPoolTaskRunner(max_workers=...)
instead of threads. Option 2: Async parent flow, but keep the task and subflow sync - If your main flow must be async, you can still use the thread pool; just await the futures using Prefect’s helpers.
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.futures import wait, resolve_futures_to_results

@flow
def process_interaction(interaction_id: str) -> str:
    return f"done: {interaction_id}"

@task
def run_subflow(interaction_id: str) -> str:
    return process_interaction(interaction_id)

@flow(task_runner=ThreadPoolTaskRunner(max_workers=10))
async def interaction_process_scheduler(interactions: list[str]):
    futures = [run_subflow.submit(i) for i in interactions]
    await wait(futures)                            # wait for completion
    results = resolve_futures_to_results(futures)  # collect results
    # AFTER INTERACTION PROCESS CODE HERE...
    return results
Important do’s and don’ts - Do NOT make
process_interaction
or
run_subflow
async if you’re using a ThreadPoolTaskRunner to call a subflow from inside a task. That pattern commonly triggers “Future attached to a different loop” errors. - Keep the subflow and task sync, even if the parent flow is async. - If your subflow code relies on async libraries, wrap that call inside the sync subflow (e.g., use anyio.run to execute a small async helper), but keep the subflow signature itself sync to avoid loop mixing. Scaling and limits - This approach gives you concurrency within one pod. If you need multiple pods (true cluster fan-out), you’d use deployments instead. - Control concurrency with
max_workers
on the task runner and make sure any libraries you call are thread-safe. If you share whether your work is IO-bound vs CPU-bound and your target concurrency, I can suggest ThreadPool vs ProcessPool and an appropriate max_workers setting.
v
@Marvin how to configure image pull secrtes in advanced tab of workpool
m
thought for 598.3 seconds
Assuming you’re using a Kubernetes work pool in Prefect 3.x — you add image pull secrets in the pool’s Advanced “Base job template” JSON. The key lives at the standard Kubernetes pod spec path: spec.template.spec.imagePullSecrets, and it’s a list of objects with a name field. Steps (UI): - Work Pools → select your Kubernetes pool → Settings → Advanced - In “Base job template”, add imagePullSecrets under job_configuration.spec.template.spec - Save Example you can paste/adapt:
Copy code
{
  "job_configuration": {
    "apiVersion": "batch/v1",
    "kind": "Job",
    "spec": {
      "template": {
        "spec": {
          "restartPolicy": "Never",
          "containers": [
            {
              "name": "prefect-job",
              "image": "{{ image }}",
              "imagePullPolicy": "{{ image_pull_policy }}"
            }
          ],
          "imagePullSecrets": [
            {
              "name": "my-registry-secret"
            }
          ]
        }
      },
      "backoffLimit": 0,
      "ttlSecondsAfterFinished": "{{ finished_job_ttl }}"
    }
  },
  "variables": {
    "type": "object",
    "properties": {
      "image": {
        "type": "string"
      },
      "image_pull_policy": {
        "type": "string",
        "default": "IfNotPresent"
      },
      "finished_job_ttl": {
        "type": "integer",
        "default": 300
      }
    }
  }
}
Notes: - The secret must already exist in the target namespace:
Copy code
kubectl create secret docker-registry my-registry-secret \
  --docker-server=REGISTRY_URL \
  --docker-username=USERNAME \
  --docker-password=PASSWORD \
  --namespace YOUR_NAMESPACE
- If you want to make image pull secrets configurable per-deployment, you can parameterize it: - In the template, add a variable and reference it without quotes:
Copy code
"imagePullSecrets": {{ image_pull_secrets }}
- In the variables schema:
Copy code
"image_pull_secrets": {
  "type": "array",
  "items": { "type": "object", "properties": { "name": { "type": "string" } } }
}
- Then set job_variables on the deployment to:
Copy code
{
  "image_pull_secrets": [
    {
      "name": "my-registry-secret"
    }
  ]
}
Docs for reference: - Kubernetes worker guide: Prefect 3: Kubernetes worker - Managing work pools: Manage work pools If you’re not using a Kubernetes work pool, let me know which worker type you’re on and I’ll tailor the steps.