Madushan Pathirana
09/26/2025, 10:51 AMMarvin
09/26/2025, 10:51 AMMarvin
09/26/2025, 10:57 AMserver.api.auth_string
which maps to the env var PREFECT_SERVER__API__AUTH_STRING
(note the double underscores). Using PREFECT_SERVER_API_AUTH_STRING
will not be picked up.
- On clients (including curl calls), you can use HTTP Basic Auth directly. If you’re using a Prefect client/worker, you can set PREFECT_API_AUTH_STRING
to the same user:pass
.
- Make sure clients also set PREFECT_API_URL
, e.g. http://<your-ec2-host>:4200/api
.
Docs:
- Security settings (basic auth)
- List/filter deployments
- Create a flow run from a deployment
Typical flow: find the deployment ID, then create a flow run from it.
1) List deployments (find the ID)
curl -u 'admin:pass' \
-H 'Content-Type: application/json' \
-X POST http://<your-ec2-host>:4200/api/deployments/filter \
-d '{"limit": 200}'
- Replace admin:pass
with your auth string.
- Adjust host/port as needed.
2) Trigger a run from a specific deployment
curl -u 'admin:pass' \
-H 'Content-Type: application/json' \
-X POST http://<your-ec2-host>:4200/api/deployments/<deployment_id>/create-flow-run \
-d '{
"parameters": {
"your_param": "value"
}
}'
- You’ll get a JSON response with the created flow_run.id
.
Alternative: using Authorization header instead of -u
curl -H "Authorization: Basic $(printf 'admin:pass' | base64)" \
-H 'Content-Type: application/json' \
-X POST http://<your-ec2-host>:4200/api/deployments/<deployment_id>/create-flow-run \
-d '{}'
Python (async) example with the Prefect client
import asyncio
from prefect.client.orchestration import get_client
async def main():
# Ensure PREFECT_API_URL and PREFECT_API_AUTH_STRING are set in your env
async with get_client() as client:
flow_run = await client.create_flow_run_from_deployment(
deployment_id="<deployment_id>",
parameters={"your_param": "value"},
)
print(flow_run.id)
asyncio.run(main())
Notes:
- The REST API creates flow runs from deployments. Running arbitrary code “ad hoc” via API is not supported; create a deployment first, then trigger it.
- For listing deployments, the API uses POST to /deployments/filter
(you can pass filters in the JSON body or just use {"limit": N}
and filter client-side).
Quick check:
- Do you already have a deployment for the flow you want to run? If yes, share the flow/deployment name and I can give you a copy-paste curl that finds that deployment and triggers it. Also, confirm whether you can switch your server env var to PREFECT_SERVER__API__AUTH_STRING
so auth is actually enabled.Madushan Pathirana
09/26/2025, 11:08 AMMarvin
09/26/2025, 11:09 AMMarvin
09/26/2025, 11:14 AMimport asyncio
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import (
DeploymentFilter, DeploymentFilterName,
FlowFilter, FlowFilterName,
)
async def find_deployment_id():
async with get_client() as client:
deployments = await client.read_deployments(
deployment_filter=DeploymentFilter(
name=DeploymentFilterName(any_=["dev"])
),
flow_filter=FlowFilter(
name=FlowFilterName(any_=["ebay"])
),
)
if not deployments:
return None
# If multiple match, pick the most recently created
deployments.sort(key=lambda d: d.created, reverse=True)
return deployments[0].id
dep_id = asyncio.run(find_deployment_id())
print(dep_id)
Option B — Deployment name contains both (e.g., “ebay/dev”)
Use a name pattern:
import asyncio
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import DeploymentFilter, DeploymentFilterName
async def find_deployment_id():
async with get_client() as client:
deployments = await client.read_deployments(
deployment_filter=DeploymentFilter(
name=DeploymentFilterName(like_="*ebay*dev*")
)
)
if not deployments:
return None
deployments.sort(key=lambda d: d.created, reverse=True)
return deployments[0].id
dep_id = asyncio.run(find_deployment_id())
print(dep_id)
Option C — You store environment as a tag (recommended)
If you tag the deployment with “dev”, filter by deployment name and tag:
import asyncio
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import (
DeploymentFilter, DeploymentFilterName, DeploymentFilterTags
)
async def find_deployment_id():
async with get_client() as client:
deployments = await client.read_deployments(
deployment_filter=DeploymentFilter(
name=DeploymentFilterName(any_=["ebay"]),
tags=DeploymentFilterTags(all_=["dev"]),
)
)
if not deployments:
return None
deployments.sort(key=lambda d: d.created, reverse=True)
return deployments[0].id
dep_id = asyncio.run(find_deployment_id())
print(dep_id)
REST API example (self-hosted Prefect Server)
POST to /api/deployments/filter
. Here’s an example that filters for flow name “ebay” and deployment name “dev”, returns the most recent first, and limits to one result:
curl -X POST "$PREFECT_API_URL/api/deployments/filter" \
-H "Authorization: Bearer $PREFECT_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"deployments": { "name": {"any_": ["dev"]} },
"flows": { "name": {"any_": ["ebay"]} },
"limit": 1,
"offset": 0,
"sort": "CREATED_DESC"
}'
Docs for deeper reference:
- Filter schemas (Python)
- Read deployments (REST)
If you can confirm whether “dev” is your deployment name or a tag, I can tailor the snippet exactly to your setup.Madushan Pathirana
09/26/2025, 11:19 AMMarvin
09/26/2025, 11:19 AMMarvin
09/26/2025, 11:23 AMprefect deployment run <FLOW_NAME>/ebay
There’s no flag to specify a version; to run a specific version you must use the deployment ID:
prefect deployment run --id <DEPLOYMENT_ID>
Python (Prefect 3.x)
- Run latest by name:
import asyncio
from prefect.client.orchestration import get_client
async def run_latest():
async with get_client() as client:
fr = await client.run_deployment(name="<FLOW_NAME>/ebay", parameters={})
print(fr)
asyncio.run(run_latest())
- Run a specific version “dev” by first resolving the ID, then running:
import asyncio
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import DeploymentFilter, DeploymentFilterName
async def run_specific_version():
async with get_client() as client:
# 1) Get deployments named 'ebay'
deployments = await client.read_deployments(
deployment_filter=DeploymentFilter(
name=DeploymentFilterName(any_=["ebay"])
)
)
if not deployments:
raise RuntimeError("No deployments named 'ebay' found")
# 2) Pick the one with version == 'dev'
target = next((d for d in deployments if getattr(d, "version", None) == "dev"), None)
if not target:
raise RuntimeError("No 'ebay' deployment with version 'dev' found")
# 3) Run via deployment_id
fr = await client.run_deployment(deployment_id=target.id, parameters={})
print(fr)
asyncio.run(run_specific_version())
Recommendation if you want to avoid ID lookups
- Name your deployments by environment instead (e.g., flow name “ebay”, deployment name “dev”), so you can run:
prefect deployment run ebay/dev
or
await client.run_deployment(name="ebay/dev")
This targets the (latest) deployment for that environment by name alone.
References
- Filters (Python): prefect-client-schemas-filters
- Read deployments (REST): read-deployments
If you share your flow’s name, I can give you the exact CLI command you’d use today.Madushan Pathirana
09/26/2025, 11:39 AMMarvin
09/26/2025, 11:39 AMMarvin
09/26/2025, 11:41 AMMadushan Pathirana
09/26/2025, 12:03 PMMarvin
09/26/2025, 12:04 PMMarvin
09/26/2025, 12:06 PMname
in YAML should be just “dev” or “stage”; the full name becomes <FLOW_NAME>/<DEPLOYMENT_NAME> (e.g., ebay/dev).
- Reference your ECS EC2 work pool by name under work_pool.name
.
- Use tags and parameters per deployment (e.g., pass which RDS credentials variable to use).
- Add schedules only to the ones you want (e.g., stage/prod), and leave dev as manual.
Example prefect.yaml
name: marketplace
prefect-version: 3.0.0
# Optional: You can add a shared section to reuse settings via YAML anchors
definitions:
work_pool: &ecs_pool
name: ecs-ec2-pool
# You can add ECS job variables here if needed (image, cpu, memory, etc.)
# job_variables:
# image: "<http://123456789012.dkr.ecr.ap-southeast-1.amazonaws.com/my-image:latest|123456789012.dkr.ecr.ap-southeast-1.amazonaws.com/my-image:latest>"
# cpu: 512
# memory: 1024
deployments:
# ---- eBay flow ----
- name: dev
entrypoint: flows/ebay.py:ebay # @flow(name="ebay") in code
tags: ["dev", "ebay"]
parameters:
env: "dev"
db_credentials_variable: "RDS_EBAY_DEV" # stored in Prefect
work_pool: *ecs_pool
schedule: null
- name: stage
entrypoint: flows/ebay.py:ebay
tags: ["stage", "ebay"]
parameters:
env: "stage"
db_credentials_variable: "RDS_EBAY_STAGE"
work_pool: *ecs_pool
schedule:
cron: "0 * * * *" # hourly
timezone: "Asia/Singapore"
# ---- Amazon flow ----
- name: dev
entrypoint: flows/amazon.py:amazon # @flow(name="amazon") in code
tags: ["dev", "amazon"]
parameters:
env: "dev"
db_credentials_variable: "RDS_AMAZON_DEV"
work_pool: *ecs_pool
schedule: null
- name: stage
entrypoint: flows/amazon.py:amazon
tags: ["stage", "amazon"]
parameters:
env: "stage"
db_credentials_variable: "RDS_AMAZON_STAGE"
work_pool: *ecs_pool
schedule:
cron: "*/30 * * * *" # every 30 minutes
timezone: "Asia/Singapore"
Deploy them all
prefect --no-prompt deploy --all
Run them (no IDs needed for latest)
prefect deployment run ebay/dev
prefect deployment run ebay/stage
prefect deployment run amazon/dev
prefect deployment run amazon/stage
Notes for your ECS EC2 setup
- Ensure your ECS work pool (type “ecs”) exists and your worker is running on your EC2 host.
- Because your runs execute on ECS, make sure your code is accessible to the worker (e.g., via a container image or pull steps like git/S3). If you want, I can add a git/S3 pull example to your YAML.
Docs
- Prefect YAML (multi-deployment config): prefect-yaml
- Schedules: schedules
If you share your flow file paths and the exact work pool name, I’ll tailor this YAML precisely (including ECS job_variables if you need CPU/memory/image overrides).