<@ULVA73B9P> Can you please help me create a prese...
# ask-marvin
b
@Marvin Can you please help me create a presentation on the topic “Workflow Orchestration with Prefect”? I want to talk about the core concepts of Prefect with examples, and also cover the workflow automation aspects. A simple example for each type of workflow (ML, data pipeline, and business flow) would be great for me.
m
thought for 276.8 seconds
Absolutely—happy to help! Below is a ready-to-use talk outline with speaker notes and copy-pasteable code examples for Prefect 3.x. It covers core concepts, workflow automation, and three simple workflow examples (ML, data pipeline, and business process). If you want, I can turn this into a Google Slides or PowerPoint deck—just tell me the audience (data eng, ML, ops), length of the talk, and your runtime (local, Docker, K8s, Cloud). Presentation outline: Workflow Orchestration with Prefect 1) Why orchestration? - Move from “scripts on a server” to reliable, observable, repeatable workflows - Visibility into states, retries, schedules, and SLAs - Easy promotion from local dev to production with deployments and workers 2) Prefect at a glance (3.x) - Python-first: orchestrate native Python code - State engine: every task and flow run has a state (e.g., Running, Completed, Failed, Retrying) - Deployments + Work Pools + Workers: production runtime model - Automations: trigger actions on events (e.g., notify on failure) - Links: - Work pools and workers blog - Prefect 3.2 release notes 3) Core concepts (with examples) - Flow: your orchestration entrypoint - Task: smaller, retryable units that run within a flow - Subflow: call one flow from another for reuse - Parameters: function args on flows - States: Completed/Failed/Retrying/Cached/etc. - Task runners: parallelism via threads or processes - Blocks/Secrets: securely store credentials and config 4) The building blocks in code - Flows and tasks, parameters, retries, parallelism
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner

@task(retries=3, retry_delay_seconds=5)
def fetch_record(i: int) -> int:
    # Simulate IO work
    return i * 2

@flow(name="example-flow", task_runner=ThreadPoolTaskRunner(max_workers=8))
def example_flow(n: int = 10) -> int:
    # Run tasks in parallel with a thread pool
    results = []
    for i in range(n):
        results.append(fetch_record.submit(i))
    values = [r.result() for r in results]
    return sum(values)

if __name__ == "__main__":
    print(example_flow(5))
5) Subflows and secrets - Blocks let you store config and credentials centrally - Secrets avoid hardcoding sensitive data
Copy code
from prefect import flow, task
from prefect.blocks.system import Secret

@task
def use_api_key() -> None:
    api_key = Secret.load("my-api-key").get()
    # use the key safely

@flow
def child():
    use_api_key()

@flow
def parent():
    child()  # subflow call
- Links: - Store secrets (blocks) 6) From script to production - Create a deployment (new 3.x pattern) - Run it on a worker connected to a work pool - Add a schedule, concurrency limits, and triggers Two common paths: A) Programmatic deployment from source
Copy code
from prefect.flows import Flow

Flow.from_source(
    source="<https://github.com/your-org/your-repo>",
    entrypoint="flows/etl.py:etl_flow",  # path/to/file.py:function_name
).deploy(
    name="etl-prod",
    work_pool_name="docker-pool",
    cron="0 * * * *",             # run hourly
    concurrency_limit=2,          # per-deployment limit
    tags=["prod"],
    description="ETL hourly pipeline",
)
B) CLI (verified) - Create deployment from local project entrypoint:
Copy code
prefect deploy ./flows/etl.py:etl_flow --name etl-prod
- Create a work pool and start a worker:
Copy code
prefect work-pool create "docker-pool" --type docker
prefect worker start --pool "docker-pool"
Note: For Docker/Kubernetes workers, install extras (in CI or local) like:
Copy code
uv run --with 'prefect[docker]' prefect work-pool create "docker-pool" --type docker
7) Schedules and concurrency - Add schedules (cron/interval/rrule) either in code via
Flow.deploy(...)
or in the UI
- Concurrency: per-deployment (via
concurrency_limit
) or global limits - Links: - Global concurrency limits 8) Caching and retries - Retries are task-level (e.g.,
retries=3, retry_delay_seconds=10
) - Caching can avoid re-computation for unchanged inputs - Link: Caching in Prefect 3.x 9) Notifications and automations - Use notification blocks (Slack, Teams, email) inside flows - Use Automations to react to events (e.g., Flow Run Failed -> send Slack) - CLI management (list/pause/resume) is available; create automations in the UI
Copy code
prefect automation ls
prefect automation pause <AUTOMATION_ID>
prefect automation resume <AUTOMATION_ID>
- Notification blocks: Blocks &amp; secrets and see block types in code below Copy-paste examples by workflow type A) Data pipeline (ETL) - Parallel extraction, simple transform, load - Retries on flaky IO, secrets for credentials, deployment with a schedule
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.blocks.system import Secret

@task(retries=3, retry_delay_seconds=5)
def extract(source: str) -> list[int]:
    # Pretend to call an external API with an API key
    api_key = Secret.load("etl-api-key").get()
    return [1, 2, 3]  # example data

@task
def transform(items: list[int]) -> list[int]:
    return [x * 10 for x in items]

@task
def load(rows: list[int]) -> None:
    print(f"Loaded {len(rows)} rows: {rows[:3]}...")

@flow(name="etl-pipeline", task_runner=ThreadPoolTaskRunner(max_workers=8))
def etl_flow(source: str = "api"):
    raw = extract.submit(source)
    clean = transform.submit(raw)
    load.submit(clean)

if __name__ == "__main__":
    etl_flow()
Programmatic deployment:
Copy code
from prefect.flows import Flow

Flow.from_source(
    source="<https://github.com/your-org/your-repo>",
    entrypoint="flows/etl.py:etl_flow",
).deploy(
    name="etl-hourly",
    work_pool_name="docker-pool",
    cron="0 * * * *",        # hourly
    concurrency_limit=1,     # avoid overlapping runs
    tags=["etl", "prod"],
)
B) ML workflow (train/evaluate) - Parameters for hyperparameters - Use a process pool for CPU-bound training
Copy code
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner
import time

@task
def load_data():
    # stand-in for real dataset loading
    X = list(range(1000))
    y = [x % 2 for x in X]
    return X, y

@task
def train_model(X, y, lr: float = 0.1):
    time.sleep(2)  # simulate training
    return {"lr": lr, "accuracy": 0.9}

@task
def evaluate(model):
    return model["accuracy"]

@flow(name="ml-train", task_runner=ProcessPoolTaskRunner())
def train_pipeline(lr: float = 0.1):
    X, y = load_data()
    model = train_model(X, y, lr)
    acc = evaluate(model)
    print(f"Accuracy: {acc}")
    return acc

if __name__ == "__main__":
    train_pipeline(0.05)
Programmatic deployment (daily retrain):
Copy code
from prefect.flows import Flow

Flow.from_source(
    source="<https://github.com/your-org/ml-repo>",
    entrypoint="flows/train.py:train_pipeline",
).deploy(
    name="ml-train-daily",
    work_pool_name="docker-pool",
    cron="0 2 * * *",  # daily at 02:00
    tags=["ml", "training"],
)
C) Business flow (notification/approval pattern) - Send Slack notifications via a notification block - Often paired with Automations for failure alerts in production ``` from prefect import flow, task from prefect.blocks.notifications import SlackWebhook @task def generate_report() -> str: return "Quarterly KPI report content" @task def notify_slack(message: str): slack = SlackWebhook.load("alerts") # configure block in UI slack.notify(message) @flow(name="report-approval") def business_flow(): report = generate_report()
notify_slack.submit("New report generated; ready for review.") # Human approval happens outside of Prefect (e.g., in Slack or a ticket) notify_slack.submit("Report approved and delivered.") return report if name == "__main__": business_flow()
Copy code
Programmatic deployment (weekday mornings):
from prefect.flows import Flow Flow.from_source( source="https://github.com/your-org/biz-flows", entrypoint="flows/report.py:business_flow", ).deploy( name="report-weekdays", work_pool_name="docker-pool", cron="0 9 * * 1-5", # Mon-Fri at 09:00 tags=["business", "reporting"], ) ``` Helpful library surface (confirmed) - Task runners for parallelism: - `prefect.task_runners`: ThreadPoolTaskRunner, ProcessPoolTaskRunner - Notification blocks: - `prefect.blocks.notifications`: SlackWebhook, MicrosoftTeamsWebhook, PagerDutyWebHook, TwilioSMS, SendgridEmail, etc. - Schedules: - `prefect.schedules`: Cron, Interval, RRule classes (or pass `cron=`/`interval=` to
Flow.deploy
) - Concurrency: -
concurrency_limit
at deployment time (see
Flow.deploy
signature) - Global limits: Global concurrency limits Additional resources - Workflow design patterns with Prefect - Caching in Prefect 3.x - Task mapping at scale - Work pools and workers blog - Store secrets (blocks) Questions to tailor this for you - Who’s the audience (data engineers, ML, platform/ops)? - How long is the talk? - What runtime do you target (local, Docker, Kubernetes, Prefect Cloud)? - Want this turned into a formatted slide deck (Google Slides, PPTX, or Markdown)?