Mateusz
10/01/2025, 2:24 AMMarvin
10/01/2025, 2:24 AMMarvin
10/01/2025, 2:28 AM@flow
from cron (or by hand), the run will show up in the Cloud UI with logs, states, and task details.
- Setup:
1) Log in once (stores a local profile):
prefect cloud login -k YOUR_API_KEY -w your-account/your-workspace
2) Keep running your Python entrypoint from cron as you do today. As long as your code calls an @flow
, Prefect will create a flow run in Cloud and stream logs to the UI.
- You can also optionally create a deployment and only start a worker during your run window if you want Cloud to schedule/queue runs for you â but itâs not required if you prefer cron.
- Learn more: Cloud Free pricing and Prefect docs
Option B: Self-hosted âon-demandâ Prefect Server with a persistent Postgres you already have
- Cost: You provide Postgres (can be your existing warehouse instance) and only start the Prefect API/UI when needed.
- How it works:
- Prefect Server needs a persistent Postgres database to store orchestration metadata. You can point Prefect Server at your warehouse Postgres (ideally a separate database or schema).
- You can bring the server up only during the run window (e.g., via cron), run your flow(s), and then shut it down. Your metadata will persist in Postgres. You can bring the UI up on-demand to inspect historical runs because data lives in the DB.
- Caveats:
- Donât use the same DB schema as your warehouse tables; keep Prefect in its own DB/schema.
- While the server is down, you wonât be able to view the UI or run flows that report to the API. If you want Prefect to handle scheduling, the server must be up. Since youâre using cron, this is fine.
- Minimal on-demand flow:
1) Ensure your Postgres is reachable and create a DB for Prefect (e.g., prefect
).
2) Set the DB connection for the API:
export PREFECT_API_DATABASE_CONNECTION_URL="<postgresql+asyncpg://USER:PASS@HOST:5432/prefect>"
3) Start the server right before your run:
prefect server start --no-ui --no-services &
SERVER_PID=$!
- --no-services
disables background services like the scheduler to keep it lighter. You can omit this if you want those features.
4) Run your flow script (your @flow
-decorated entrypoint):
python path/to/your_script.py
5) Stop the server:
kill $SERVER_PID
6) When you want to view the UI, start the server with the UI flag pointing to the same DB:
prefect server start --ui
Then visit the printed URL in your browser.
Notes and tradeoffs
- Using Prefect Cloud Free is the simplest and usually the cheapest: no servers to run, UI always available, scheduling/late-runs handled for you. You can still keep cron if thatâs your preference.
- Self-hosting with your warehouse Postgres works but is more hands-on. Prefect will create and migrate its own schema; keep it isolated to avoid noisy-neighbor issues. Youâll need to bring up the API whenever you run flows or want to view the UI.
- If you want Cloud to schedule your daily run but donât want a 24/7 worker, you can:
- Create a deployment in Cloud and schedule it.
- Let runs queue up if no worker is online at schedule time.
- Start a worker only during your run window; it will pick up any queued/late runs.
- CLI references (verified):
- Login: prefect cloud login --help
- Server: prefect server start --help
- Worker: prefect worker start --help
- Deployments: prefect deploy --help
Quick recommendationMarvin
10/01/2025, 2:28 AMMateusz
10/01/2025, 2:34 AM#!/bin/bash
source .virtualenvs/main/bin/activate
source $NVM_DIR/nvm.sh
exec "$@"
And I invoke the flows via python -m <http://flows.my|flows.my>_flow
. Feel free to suggest a proper entrypoint for this đMarvin
10/01/2025, 2:34 AMMarvin
10/01/2025, 2:41 AMPREFECT_API_DATABASE_CONNECTION_URL
(use postgresql+asyncpg://...
)
- API URL env var for clients: PREFECT_API_URL
(defaults to `http://127.0.0.1:4200/api`)
- Default port: 4200; health endpoint: /api/health
- Server CLI: prefect server start
supports --ui/--no-ui
and --no-services
(for a lighter, API-only mode)
- Docs:
- Self-hosted overview
- Managing settings (env vars)
- Settings reference
- Health endpoint
Option 1: Replace your entrypoint with an âon-demandâ server wrapper
This starts the server, waits until itâs healthy, runs your flow, and shuts the server down. It keeps your Node + venv setup.
#!/usr/bin/env bash
set -euo pipefail
# Your original env setup
source .virtualenvs/main/bin/activate
if [ -n "${NVM_DIR:-}" ] && [ -f "$NVM_DIR/nvm.sh" ]; then
# shellcheck source=/dev/null
source "$NVM_DIR/nvm.sh"
fi
# Required: Postgres URL for Prefect metadata (use a separate DB/schema than your warehouse tables!)
: "${PREFECT_DB_URL:?Set PREFECT_DB_URL, e.g. <postgresql+asyncpg://USER:PASS@HOST:5432/prefect>}"
# Tunables
PREFECT_API_HOST="${PREFECT_API_HOST:-127.0.0.1}"
PREFECT_API_PORT="${PREFECT_API_PORT:-4200}"
PREFECT_ENABLE_UI="${PREFECT_ENABLE_UI:-false}" # "true" to run with UI
PREFECT_NO_SERVICES="${PREFECT_NO_SERVICES:-true}" # "true" to disable background services
PREFECT_WAIT_TIMEOUT="${PREFECT_WAIT_TIMEOUT:-60}" # seconds to wait for API health
FLOW_CMD=("$@") # e.g., python -m flows.my_flow
# Export settings for server + client
export PREFECT_API_DATABASE_CONNECTION_URL="$PREFECT_DB_URL"
export PREFECT_API_URL="http://${PREFECT_API_HOST}:${PREFECT_API_PORT}/api"
# Apply DB migrations up front (safe to run repeatedly)
prefect server database upgrade || true
# Build server flags
SERVER_FLAGS=(--host "$PREFECT_API_HOST" --port "$PREFECT_API_PORT")
[ "$PREFECT_ENABLE_UI" = "true" ] && SERVER_FLAGS+=(--ui) || SERVER_FLAGS+=(--no-ui)
[ "$PREFECT_NO_SERVICES" = "true" ] && SERVER_FLAGS+=(--no-services)
# Start server in background
echo "Starting Prefect server on ${PREFECT_API_URL}..."
prefect server start "${SERVER_FLAGS[@]}" &
SERVER_PID=$!
cleanup() {
echo "Stopping Prefect server (PID $SERVER_PID)..."
kill "$SERVER_PID" 2>/dev/null || true
wait "$SERVER_PID" 2>/dev/null || true
}
trap cleanup EXIT
# Wait for health
echo "Waiting for Prefect API to be healthy..."
deadline=$((SECONDS + PREFECT_WAIT_TIMEOUT))
until curl -sf "http://${PREFECT_API_HOST}:${PREFECT_API_PORT}/api/health" >/dev/null; do
if (( SECONDS > deadline )); then
echo "Prefect API did not become healthy within ${PREFECT_WAIT_TIMEOUT}s" >&2
exit 1
fi
sleep 1
done
echo "Prefect API is healthy."
# Run your flow (local execution will report metadata/logs to the server)
if [ ${#FLOW_CMD[@]} -eq 0 ]; then
echo "No command provided; defaulting to: python -m flows.my_flow"
python -m flows.my_flow
else
echo "Running: ${FLOW_CMD[*]}"
"${FLOW_CMD[@]}"
fi
echo "Flow finished. Cleaning up."
How youâd use it
- Build your image with this as the entrypoint.
- Your cron can run the container like:
- docker run --rm -e PREFECT_DB_URL="<postgresql+asyncpg://USER:PASS@HOST:5432/prefect>" your-image python -m flows.my_flow
- The server is brought up just-in-time and shut down when the flow finishes.Marvin
10/01/2025, 2:41 AM-e PREFECT_ENABLE_UI=true
and no command; it will start the server with UI. Then visit <http://localhost:4200>
(or your host/port). Stop the container when youâre done.
Option 2: Keep your current entrypoint and add a wrapper script
If you prefer to keep exec "$@"
in your entrypoint, create a second script (e.g., run_with_prefect_server.sh
) that implements the same start/wait/run/stop logic and call it from cron:
- docker run --rm your-image ./run_with_prefect_server.sh python -m flows.my_flow
Option 3: Two short-lived containers (docker compose)
Run a server container and a runner container only during your window.
- Up server:
- docker compose up -d prefect_server
- Run flows:
- docker compose run --rm runner python -m flows.my_flow
- Stop server:
- docker compose stop prefect_server
Compose sketch:
services:
prefect_server:
image: prefecthq/prefect:3-latest
environment:
PREFECT_API_DATABASE_CONNECTION_URL: ${PREFECT_DB_URL}
PREFECT_UI_ENABLED: "true" # set to "false" for API only
command: >
prefect server start --host 0.0.0.0 --port 4200 --no-services --ui
ports:
- "4200:4200"
runner:
image: your-image
environment:
PREFECT_API_URL: <http://prefect_server:4200/api>
depends_on:
- prefect_server
Notes and best practices
- DB isolation: Use a separate database or schema for Prefect; donât mix with warehouse tables. Grant a least-privilege user.
- TLS: If your Postgres requires TLS, add ?sslmode=require
(or your orgâs settings) to the connection URL.
- Scheduler: Since youâre using cron, --no-services
is fine and keeps the server lighter.
- Local runs: You do not need deployments or workers. Local @flow
runs will report to the configured API and appear in the UI.
If you share how youâre launching your container from cron (docker run/compose, env injection), I can tailor the script and commands exactly to your setup.Mateusz
10/01/2025, 2:48 AMMarvin
10/01/2025, 2:49 AMMarvin
10/01/2025, 2:51 AMPREFECT_SERVER_DATABASE_SQLALCHEMY_CONNECT_ARGS_TLS_CA_FILE
- PREFECT_SERVER_DATABASE_SQLALCHEMY_CONNECT_ARGS_TLS_CERT_FILE
(if mTLS)
- PREFECT_SERVER_DATABASE_SQLALCHEMY_CONNECT_ARGS_TLS_KEY_FILE
(if mTLS)
- You can also set PREFECT_API_DATABASE_CONNECTION_URL
to your Postgres URL. If your org mandates âsslmode=requireâ, you can include it in the URL, but note that DSN parameters vary by driver; the connect-args approach above is preferred for asyncpg.
- Health endpoint: <http://127.0.0.1:4200/api/health>
Wrapper script (keeps your current entrypoint)
Place this in your image as run_with_prefect_server.sh
. It accepts your flow command (e.g., python -m flows.my_flow
) and handles TLS CA from an env var.
#!/usr/bin/env bash
set -euo pipefail
# Reuse your existing environment bootstrap from the image
if [ -f ".virtualenvs/main/bin/activate" ]; then
source .virtualenvs/main/bin/activate
fi
if [ -n "${NVM_DIR:-}" ] && [ -f "$NVM_DIR/nvm.sh" ]; then
# shellcheck source=/dev/null
source "$NVM_DIR/nvm.sh"
fi
# Required DB URL (asyncpg)
: "${PREFECT_DB_URL:?Set PREFECT_DB_URL, e.g. <postgresql+asyncpg://USER:PASS@HOST:5432/prefect>}"
# Optional: CA PEM passed as secret; weâll write it to a file Prefect can use
CA_PATH="/tmp/db_root_ca.crt"
if [ -n "${DB_CA_PEM:-}" ]; then
echo "Writing DB CA PEM to ${CA_PATH}..."
printf "%s" "$DB_CA_PEM" > "$CA_PATH"
fi
# Prefect API settings (server + client)
export PREFECT_API_DATABASE_CONNECTION_URL="$PREFECT_DB_URL"
export PREFECT_API_URL="${PREFECT_API_URL:-<http://127.0.0.1:4200/api>}"
# TLS connect-args for Server DB (preferred way to enforce certificate verification)
if [ -f "$CA_PATH" ]; then
export PREFECT_SERVER_DATABASE_SQLALCHEMY_CONNECT_ARGS_TLS_CA_FILE="$CA_PATH"
fi
# If you use mTLS, also export:
# export PREFECT_SERVER_DATABASE_SQLALCHEMY_CONNECT_ARGS_TLS_CERT_FILE=/path/to/client.crt
# export PREFECT_SERVER_DATABASE_SQLALCHEMY_CONNECT_ARGS_TLS_KEY_FILE=/path/to/client.key
# Lightweight server flags for on-demand API
SERVER_FLAGS=(--host 127.0.0.1 --port 4200 --no-ui --no-services)
# Migrate DB (idempotent)
prefect server database upgrade || true
# Start server
echo "Starting Prefect server..."
prefect server start "${SERVER_FLAGS[@]}" &
SERVER_PID=$!
cleanup() {
echo "Stopping Prefect server (PID $SERVER_PID)..."
kill "$SERVER_PID" 2>/dev/null || true
wait "$SERVER_PID" 2>/dev/null || true
}
trap cleanup EXIT
# Wait for health
for i in {1..60}; do
if curl -sf "<http://127.0.0.1:4200/api/health>" >/dev/null; then
echo "Prefect API healthy."
break
fi
sleep 1
done
if ! curl -sf "<http://127.0.0.1:4200/api/health>" >/dev/null; then
echo "Prefect API did not become healthy in time." >&2
exit 1
fi
# Run the flow
if [ $# -eq 0 ]; then
echo "No command provided; defaulting to: python -m flows.my_flow"
python -m flows.my_flow
else
echo "Running: $*"
exec "$@"
fi
Dockerfile sketch
- Keep your current entrypoint; call the wrapper from the ECS taskâs command.
```
FROM python:3.11-slim
# Install Prefect 3.x and your deps
RUN pip install --no-cache-dir "prefect>=3,<4" asyncpg
# Add your code
WORKDIR /app
COPY . /app
# Wrapper
COPY run_with_prefect_server.sh /usr/local/bin/run_with_prefect_server.sh
RUN chmod +x /usr/local/bin/run_with_prefect_server.sh
# Keep your simple entrypointMarvin
10/01/2025, 2:51 AMTerraform: ECS Fargate scheduled task
- This creates:
- ECS cluster
- Task execution/task roles
- Log group
- Task definition (Fargate) running your image + wrapper
- EventBridge Scheduler cron to launch it
- Replace placeholders (VPC, subnets, SGs, image, secrets).
# -----------------------------------------------------------------------------
# Variables (example)
# -----------------------------------------------------------------------------
variable "vpc_id" {}
variable "private_subnet_ids" { type = list(string) }
variable "security_group_ids" { type = list(string) }
variable "container_image" { description = "ECR image URI for your runner" }
variable "db_url_secret_arn" { description = "Secrets Manager ARN storing PREFECT_DB_URL" }
variable "db_ca_pem_secret_arn" { description = "Secrets Manager ARN storing DB CA PEM" }
# -----------------------------------------------------------------------------
# ECS cluster
# -----------------------------------------------------------------------------
resource "aws_ecs_cluster" "prefect" {
name = "prefect-on-demand-cluster"
}
# -----------------------------------------------------------------------------
# CloudWatch logs
# -----------------------------------------------------------------------------
resource "aws_cloudwatch_log_group" "prefect" {
name = "/ecs/prefect-runner"
retention_in_days = 14
}
# -----------------------------------------------------------------------------
# IAM: Task execution role (pull image, write logs)
# -----------------------------------------------------------------------------
data "aws_iam_policy_document" "task_execution_assume" {
statement {
actions = ["sts:AssumeRole"]
principals { type = "Service" identifiers = ["ecs-tasks.amazonaws.com"] }
}
}
resource "aws_iam_role" "task_execution" {
name = "prefect-task-execution-role"
assume_role_policy = data.aws_iam_policy_document.task_execution_assume.json
}
resource "aws_iam_role_policy_attachment" "task_execution_managed" {
role = aws_iam_role.task_execution.name
policy_arn = "arnawsiam:awspolicy/service-role/AmazonECSTaskExecutionRolePolicy"
}
# -----------------------------------------------------------------------------
# IAM: Task role (to read Secrets Manager)
# -----------------------------------------------------------------------------
data "aws_iam_policy_document" "task_assume" {
statement {
actions = ["sts:AssumeRole"]
principals { type = "Service" identifiers = ["ecs-tasks.amazonaws.com"] }
}
}
data "aws_iam_policy_document" "task_secrets_access" {
statement {
actions = ["secretsmanager:GetSecretValue"]
resources = [var.db_url_secret_arn, var.db_ca_pem_secret_arn]
}
}
resource "aws_iam_role" "task" {
name = "prefect-task-role"
assume_role_policy = data.aws_iam_policy_document.task_assume.json
}
resource "aws_iam_role_policy" "task_secrets" {
name = "prefect-task-secrets"
role = aws_iam_role.task.id
policy = data.aws_iam_policy_document.task_secrets_access.json
}
# -----------------------------------------------------------------------------
# ECS Task Definition
# -----------------------------------------------------------------------------
resource "aws_ecs_task_definition" "prefect_runner" {
family = "prefect-runner"
requires_compatibilities = ["FARGATE"]
network_mode = "awsvpc"
cpu = "512"
memory = "1024"
execution_role_arn = aws_iam_role.task_execution.arn
task_role_arn = aws_iam_role.task.arn
container_definitions = jsonencode([
{
name = "runner"
image = var.container_image
essential = true
# Call the wrapper and pass the flow command
command = ["/usr/local/bin/run_with_prefect_server.sh", "python", "-m", "flows.my_flow"]
environment = [Marvin
10/01/2025, 2:51 AMprefect server start --host 0.0.0.0 --port 4200 --ui
- Same DB env/secret config
- Attach a temporary ALB/NLB or use ECS Exec/SSM port forwarding if you canât expose a load balancer publicly.
- Stop that task when youâre finished; metadata persists in Postgres.
Answering your SSL question directly
- Yes: pass the server certificate and enforce verification.
- Preferred: mount the PEM from Secrets Manager into the container as an env var (e.g., DB_CA_PEM
), write it to a file, then set:
- PREFECT_SERVER_DATABASE_SQLALCHEMY_CONNECT_ARGS_TLS_CA_FILE=/tmp/db_root_ca.crt
Marvin
10/01/2025, 2:51 AMPREFECT_API_DATABASE_CONNECTION_URL
, but driver support varies. The connect-args method above is driver-agnostic and reliable for asyncpg.
If you share how your VPC/DB are set up (RDS vs. on-prem, TLS requirements, mTLS or not), I can tweak the Terraform and wrapper script to match exactly, including mutual TLS and ALB config for the UI task.