David Martin Calalang
08/21/2025, 4:53 PMMarvin
08/21/2025, 4:53 PMMarvin
08/21/2025, 5:00 PMimport asyncio
from prefect.client.orchestration import get_client
from prefect.runtime import flow_run
async def fetch_times():
async with get_client() as client:
run = await client.read_flow_run(flow_run.id)
return run.start_time, run.end_time
start, end = asyncio.run(fetch_times())
print("Prefect flow run window:", start, end)
Docs: Flow runs API
Note: If you need the ECS task’s actual “startedAt/stoppedAt” rather than the flow run window, pull those from AWS (see #3).
2) Capture and surface the ECS Task identity in Prefect
Inside the ECS task running your flow, you can read the ECS Task Metadata endpoint to get the Task ARN, then create a Prefect artifact that links to the task in the AWS console:
import os
import requests
from urllib.parse import quote_plus
from prefect.artifacts import create_link_artifact
def publish_ecs_task_link():
md_uri = os.environ.get("ECS_CONTAINER_METADATA_URI_V4") or os.environ.get("ECS_CONTAINER_METADATA_URI")
if not md_uri:
return # Not running on ECS or metadata not enabled
task_meta = requests.get(f"{md_uri}/task").json()
task_arn = task_meta["TaskARN"] # arn:aws:ecs:REGION:ACCOUNT:task/CLUSTER/TASK_ID
cluster_arn = task_meta.get("Cluster") # arn:aws:ecs:REGION:ACCOUNT:cluster/CLUSTER
region = task_arn.split(":")[3]
cluster_name = cluster_arn.split("/")[-1]
task_id = task_arn.split("/")[-1]
# Link to the ECS task in the AWS Console
ecs_link = (
f"https://{region}.<http://console.aws.amazon.com/ecs/v2/clusters/{quote_plus(cluster_name)}/tasks/{quote_plus(task_id)}|console.aws.amazon.com/ecs/v2/clusters/{quote_plus(cluster_name)}/tasks/{quote_plus(task_id)}>"
)
create_link_artifact(key="ecs-task", link=ecs_link, description="ECS task for this Prefect run")
publish_ecs_task_link()
Artifacts docs: Creating artifacts
3) ECS resource usage (CPU/memory) and “task running” timestamps
- Prefect does not collect these metrics. Use AWS:
- For per-task CPU/memory utilization, enable CloudWatch Container Insights on your ECS cluster and view/query metrics there.
- For precise task lifecycle times, use DescribeTasks
on the ECS API to get startedAt
/ stoppedAt
.
You can optionally record these details back to Prefect as artifacts:
import boto3
from datetime import timezone
from prefect.artifacts import create_markdown_artifact
def aws_task_times_and_note(task_arn, cluster_name, region):
ecs = boto3.client("ecs", region_name=region)
resp = ecs.describe_tasks(cluster=cluster_name, tasks=[task_arn])
task = resp["tasks"][0]
started = task.get("startedAt")
stopped = task.get("stoppedAt")
md = []
md.append(f"- Task ARN: `{task_arn}`")
if started: md.append(f"- ECS startedAt: `{started.astimezone(timezone.utc)}`")
if stopped: md.append(f"- ECS stoppedAt: `{stopped.astimezone(timezone.utc)}`")
create_markdown_artifact(key="ecs-task-timing", markdown="\n".join(md))
# Pair with publish_ecs_task_link() above to get task_arn/cluster/region
AWS references:Marvin
08/21/2025, 5:00 PM