Aaron Gonzalez
08/07/2025, 2:47 PMMarvin
08/07/2025, 2:47 PMMarvin
08/07/2025, 2:48 PMrequirements.txt
File
txt
# requirements.txt
prefect>=3.0.0
# Add your other dependencies here
pandas==2.0.0
requests>=2.28.0
# etc...
2. Create a Dockerfile
dockerfile
FROM python:3.9-slim
# Install system dependencies if needed
RUN apt-get update && apt-get install -y \
gcc \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt
# Copy your flow code
COPY flows/ /flows/
# Set working directory
WORKDIR /flows
3. Build and Push the Image
bash
docker build -t your-registry/your-image:tag .
docker push your-registry/your-image:tag
4. Deploy Your Flow
When deploying your flow, you'll need to specify the container image. Here's an example:
python
from prefect import flow
from prefect.deployments import Deployment
@flow
def my_flow():
# your flow code here
pass
if __name__ == "__main__":
deployment = flow.from_source(
source="<https://github.com/your-repo/your-project>",
entrypoint="flows/your_flow.py:my_flow",
).deploy(
name="k8s-flow",
work_pool_name="your-k8s-pool",
image="your-registry/your-image:tag"
)
Key Points to Remember:
1. System Dependencies: If you need system-level packages, install them using apt-get
(or the appropriate package manager) in your Dockerfile.
2. Python Dependencies: List all Python packages in requirements.txt
. Make sure to pin versions to avoid unexpected updates.
3. Base Image Selection: Choose an appropriate base image. python:3.9-slim
is a good starting point, but you might need a different version or a more complete image based on your needs.
4. Multi-stage Builds: For more complex dependencies or to reduce image size, consider using multi-stage Docker builds.
Best Practices:
- Always pin your dependency versions to ensure reproducibility
- Use Docker layer caching effectively by putting the most stable dependencies first
- Consider using virtual environments within the container if you have complex dependency requirements
- Document any special system requirements in your repository
If you need to debug dependency issues:
1. You can exec into the container to check installed packages: kubectl exec -it <pod-name> -- /bin/bash
2. Use pip list
inside the container to verify installed packages
3. Check the container logs for any import or dependency-related errors
Would you like me to elaborate on any of these points or provide more specific examples for your use case?Aaron Gonzalez
08/07/2025, 2:49 PMMarvin
08/07/2025, 2:49 PMMarvin
08/07/2025, 2:49 PMuv
in your Docker containers, and it's a great choice for faster dependency installation! Here's how to implement it:
1. Basic Dockerfile with uv
dockerfile
FROM python:3.9-slim
# Install uv
COPY --from=<http://ghcr.io/astral-sh/uv:latest|ghcr.io/astral-sh/uv:latest> /uv /uvx /bin/
# Set UV to install packages system-wide
ENV UV_SYSTEM_PYTHON=1
# Copy requirements
COPY requirements.txt .
# Install dependencies with uv (much faster than pip!)
RUN uv pip install -r requirements.txt
# Copy your flow code
COPY flows/ /flows/
WORKDIR /flows
2. Alternative Approach with Virtual Environment
dockerfile
FROM python:3.9-slim
# Install uv
COPY --from=<http://ghcr.io/astral-sh/uv:latest|ghcr.io/astral-sh/uv:latest> /uv /uvx /bin/
# Create and activate a virtual environment using uv
RUN uv venv /venv
ENV PATH="/venv/bin:$PATH"
# Copy and install requirements
COPY requirements.txt .
RUN uv pip install -r requirements.txt
# Copy your flow code
COPY flows/ /flows/
WORKDIR /flows
3. Using uv's pip-compile (for dependency locking)
dockerfile
FROM python:3.9-slim as builder
# Install uv for dependency compilation
COPY --from=<http://ghcr.io/astral-sh/uv:latest|ghcr.io/astral-sh/uv:latest> /uv /uvx /bin/
# Copy requirements files
COPY <http://requirements.in|requirements.in> .
# Generate locked requirements
RUN uv pip compile <http://requirements.in|requirements.in> -o requirements.txt
FROM python:3.9-slim
# Copy uv from builder
COPY --from=<http://ghcr.io/astral-sh/uv:latest|ghcr.io/astral-sh/uv:latest> /uv /uvx /bin/
ENV UV_SYSTEM_PYTHON=1
# Copy the compiled requirements
COPY --from=builder requirements.txt .
# Install dependencies
RUN uv pip install -r requirements.txt
# Copy your flow code
COPY flows/ /flows/
WORKDIR /flows
Key Benefits of Using uv:
1. Speed: uv is significantly faster than pip (often 5-10x faster)
2. Better Caching: uv's installation process works better with Docker layer caching
3. Reliable Builds: uv's deterministic installs help ensure consistent environments
Best Practices:
1. Use UV_SYSTEM_PYTHON=1
when you want to install packages system-wide
2. Consider using uv pip compile
for generating locked requirements
3. If you need both development and production dependencies:
dockerfile
# Install production dependencies
RUN uv pip install -r requirements.txt
# Install development dependencies only in dev stage
RUN if [ "${ENVIRONMENT}" = "development" ] ; then uv pip install -r requirements-dev.txt ; fi
Pro Tips:
- You can use uv's lockfile feature to ensure deterministic builds:
bash
# On your local machine
uv pip compile <http://requirements.in|requirements.in> -o requirements.txt
- For faster builds, you can use uv's parallel installation:
dockerfile
RUN uv pip install --no-cache -r requirements.txt
Would you like me to explain any of these approaches in more detail or provide examples for a specific use case? I can also share some benchmarks comparing uv vs pip installation times in Docker if that would be helpful.Aaron Gonzalez
08/13/2025, 2:23 PMMarvin
08/13/2025, 2:23 PMMarvin
08/13/2025, 2:26 PMConcurrentTaskRunner
. Use ThreadPoolTaskRunner
for local concurrency. Here’s a complete example that creates a single progress artifact and updates it as each concurrent task finishes.
from prefect import flow, task, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.futures import as_completed
from prefect.artifacts import create_progress_artifact, update_progress_artifact
import random
import time
@task
def do_work(i: int) -> int:
# pretend this is useful
time.sleep(random.uniform(0.3, 1.5))
return i * i
@flow(task_runner=ThreadPoolTaskRunner(max_workers=8))
def concurrent_progress(n: int = 20):
logger = get_run_logger()
# Create a progress artifact at 0%
artifact_id = create_progress_artifact(
progress=0.0,
description=f"Processing {n} items"
)
# Kick off tasks concurrently
futures = [do_work.submit(i) for i in range(n)]
completed = 0
for fut in as_completed(futures):
# Surface errors from each task (and keep going)
try:
result = fut.result()
<http://logger.info|logger.info>(f"Task finished with result: {result}")
except Exception as exc:
logger.warning(f"A task failed: {exc!r}")
finally:
completed += 1
update_progress_artifact(
artifact_id=artifact_id,
progress=completed / n, # 0.0 -> 1.0
description=f"{completed}/{n} completed"
)
# Optionally return all results (will raise if any failed)
return [f.result() for f in futures]
if __name__ == "__main__":
concurrent_progress()
Notes:
- Progress artifacts expect a float between 0.0 and 1.0. The description is optional but nice for humans.
- Using as_completed
keeps progress updates tied to real task completions.
- If you truly are on Prefect 2.x and want ConcurrentTaskRunner
, say so; otherwise this is the 3.x way.Aaron Gonzalez
09/02/2025, 1:17 PMMarvin
09/02/2025, 1:17 PMMarvin
09/02/2025, 1:21 PMrun_deployment()
returns a FlowRun object. To get the actual value returned by the child flow, wait for that flow run to finish and then read the result from its final state.
Here are two ready-to-use patterns:
Synchronous parent flow
from prefect import flow
from prefect.deployments import run_deployment
from prefect.client.orchestration import SyncPrefectClient
from prefect.states import get_state_result
@flow
def parent():
# Kick off the child flow via its deployment
flow_run = run_deployment(
name="my-child-flow/my-deployment",
parameters={"x": 1},
)
# Wait for the child run to reach a terminal state
with SyncPrefectClient() as client:
final_state = client.wait_for_flow_run(flow_run.id)
# Extract the return value from the child's final state
child_result = get_state_result(final_state)
return child_result
Async parent flow
from prefect import flow
from prefect.deployments import run_deployment
from prefect.client.orchestration import get_client
from prefect.states import get_state_result
@flow
async def parent_async():
flow_run = run_deployment("my-child-flow/my-deployment", parameters={"x": 1})
async with get_client() as client:
final_state = await client.wait_for_flow_run(flow_run.id)
child_result = get_state_result(final_state)
return child_result
Important notes
- Result persistence: To read a child flow’s return value across processes, that result must be persisted. In most setups this works out of the box, but if you’ve disabled default persistence, either:
- Enable it globally (env var PREFECT_RESULTS_PERSIST_BY_DEFAULT=true
), or
- Ensure the child flow persists its result explicitly:
from prefect import flow
@flow(persist_result=True)
def child(...):
return {"some": "value"}
- Large/complex outputs: For big payloads, consider writing to storage within the child flow and returning a reference (e.g., URL, key). Artifacts are also a good fit for sharing rich outputs across runs.
References
- Results (persistence and retrieval): Prefect 3.x docs: Results
If you can share whether your parent flow is sync or async and how big the child’s return value is, I can tailor the example further.Aaron Gonzalez
09/02/2025, 1:32 PMMarvin
09/02/2025, 1:32 PMMarvin
09/02/2025, 1:35 PMawait get_state_result(flow_run.state)
on the state returned by run_deployment()
, which is not the final state. Instead, wait for completion, then read the result:
from prefect.deployments import run_deployment
from prefect.client.orchestration import get_client
from prefect.states import get_state_result
# start the child run
flow_run = run_deployment("my-child-flow/my-deployment", parameters={...})
# wait for terminal state, then fetch the result
async with get_client() as client:
final_state = await client.wait_for_flow_run(flow_run.id)
new_failed_users = await get_state_result(final_state)
2) Ensure the child flow’s return value is persisted
To retrieve a child flow’s return value across runs, the result must be persisted. Enable persistence on the child flow (recommended):
from prefect import flow
@flow(persist_result=True) # ensure the return value is stored
def child_flow(...):
...
return some_value
If you prefer config:
- Set env var PREFECT_RESULTS_PERSIST_BY_DEFAULT=true
to persist results by default
- Optionally set a default storage block, e.g. PREFECT_RESULTS_DEFAULT_STORAGE_BLOCK="s3/my-block"
(or any configured block) so results have a durable location
Reference: Results in Prefect 3.x
Quick checklist
- Use wait_for_flow_run(flow_run.id)
to get the final state before reading results
- Make sure the child flow persists its result (@flow(persist_result=True)
or global setting)
- Keep the child’s return value serializable (or configure a serializer/storage as needed)
If you share your child flow definition (and whether you can modify it), I can tailor the exact changes.Aaron Gonzalez
09/02/2025, 1:35 PMAaron Gonzalez
09/02/2025, 1:36 PMMarvin
09/02/2025, 1:36 PMMarvin
09/02/2025, 1:36 PMget_state_result(state)
, Prefect will fetch and deserialize that stored value for you.
Key implications
- Cross-process retrieval: Other processes (like a parent flow calling run_deployment
) can fetch the child’s return value from the API because the data is durably stored, not just in memory.
- Storage and serialization: You can control where and how it’s stored via the decorator:
- result_storage=
a storage block (e.g., S3, GCS, Azure, local filesystem)
- result_serializer=
the serializer to use (e.g., JSON/pickle)
- Defaults and global config:
- If you don’t specify result_storage
on the flow/task, Prefect uses your default results storage (if configured).
- You can enable persistence by default for all flows/tasks with the env var PREFECT_RESULTS_PERSIST_BY_DEFAULT=true
.
- You can set a default storage block with PREFECT_RESULTS_DEFAULT_STORAGE_BLOCK
(e.g., s3/my-results-block
) so Prefect knows where to put results.
Example: ensure a child flow’s return value is retrievable by a parent
from prefect import flow
from prefect.filesystems import S3Bucket
s3 = S3Bucket.load("my-results") # configure once in UI/CLI
@flow(persist_result=True, result_storage=s3)
def child_flow(x: int) -> dict:
return {"value": x + 1}
Then in the parent, wait for the child to finish and fetch its result:
from prefect.deployments import run_deployment
from prefect.client.orchestration import get_client
from prefect.states import get_state_result
async def parent():
fr = run_deployment("child-flow/deployment", parameters={"x": 41})
async with get_client() as client:
final_state = await client.wait_for_flow_run(fr.id)
result = await get_state_result(final_state) # fetches from storage
return result
If persist_result
is not enabled (and there’s no default persistence), you’ll see MissingResult when trying to read a state fetched from the API—exactly the error you hit.
Docs: Results in Prefect 3.xAaron Gonzalez
09/02/2025, 1:38 PMMarvin
09/02/2025, 1:38 PMMarvin
09/02/2025, 1:41 PMflow_run.state
immediately after run_deployment()
. Wait for the child to finish first, then read the final state:
from prefect.deployments import run_deployment
from prefect.client.orchestration import get_client
from prefect.states import get_state_result
flow_run = run_deployment("child-flow/deployment", parameters={...})
async with get_client() as client:
final_state = await client.wait_for_flow_run(flow_run.id)
value = await get_state_result(final_state)
2) Use a shared result store (not the default local path)
For cross-process retrieval, persist the child’s result to a storage both sides can access. Two common options:
- Remote/shared storage (recommended)
- Create a RemoteFileSystem block pointing to S3/GS/Azure/… (via fsspec).
- Use that block in your child flow.
Example:
# One-time setup (or create the block in the UI)
from prefect.filesystems import RemoteFileSystem
remote = RemoteFileSystem(basepath="<s3://my-bucket/prefect-results>")
remote.save("results", overwrite=True)
Then make the child persist there:
from prefect import flow
from prefect.filesystems import RemoteFileSystem
remote = RemoteFileSystem.load("results")
@flow(persist_result=True, result_storage=remote)
def child_flow(...):
return {"ok": True}
- Shared local volume (if all runs share the same mounted path)
- Create a LocalFileSystem block with a basepath that is a shared, mounted volume available to both parent and child at the same path.
from prefect.filesystems import LocalFileSystem
shared = LocalFileSystem(basepath="/mnt/prefect-results")
shared.save("shared-results", overwrite=True)
@flow(persist_result=True, result_storage=LocalFileSystem.load("shared-results"))
def child_flow(...):
return {"ok": True}
Global defaults (optional)
- Set env vars so you don’t have to annotate every flow:
- PREFECT_RESULTS_PERSIST_BY_DEFAULT=true
- PREFECT_RESULTS_DEFAULT_STORAGE_BLOCK=remote-file-system/results
(or local-file-system/shared-results
)
- You can inspect your blocks:
prefect block ls
prefect block inspect remote-file-system/results
Common pitfall to avoid
- Reading flow_run.state
immediately after run_deployment()
and/or persisting to a local path that isn’t shared will lead to MissingResult or “Path does not exist.”
Docs
- Results overview and configuration: Prefect 3.x: Results
If you can confirm whether your parent and child run on different machines/containers, I can suggest the best storage option (remote vs shared local) and help with exact block configuration.Aaron Gonzalez
09/02/2025, 1:59 PMMarvin
09/02/2025, 1:59 PMMarvin
09/02/2025, 2:01 PMfrom prefect import flow
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result
@flow
async def parent():
# start the child
fr = run_deployment("child-flow/deployment", parameters={...})
# wait until the child reaches a terminal state
fr = await wait_for_flow_run(fr.id)
# fetch the child's return value (requires persisted results)
value = await get_state_result(fr.state)
return value
Notes
- You can pass an existing client if you already have one open: await wait_for_flow_run(fr.id, client=my_client)
.
- Make sure the child flow persists its result (e.g., @flow(persist_result=True, result_storage=<shared or remote block>)
) so the parent process can fetch it.
- Using local default storage across different machines/containers will cause “path does not exist” errors. Prefer a shared/remote storage block (S3/GCS/Azure/RemoteFileSystem) for results.
Refs
- Helper function: prefect.flow_runs.wait_for_flow_run
- Result persistence: Prefect 3.x ResultsAaron Gonzalez
09/04/2025, 1:06 PMMarvin
09/04/2025, 1:06 PMMarvin
09/04/2025, 1:12 PM/api
). For example:
- Correct: <https://prefect.mydomain.com/api>
- Incorrect: <https://prefect.mydomain.com>
(may route to UI and return 405 on POST)
There’s a known class of proxy/ingress misroutes that trigger 405s; see the provider repo’s troubleshooting notes.
4) Sanity check via CLI against the same endpoint:
- List pools:
prefect work-pool ls
- Create a queue within a pool (replace names accordingly):
prefect work-queue create --work-pool "my-pool" "terms-and-conditions"
If the CLI works but Terraform doesn’t, it’s likely provider config or an outdated resource.
Recommended pattern in Prefect 3.x
- Create a Work Pool (e.g., type kubernetes
) and run a worker against it.
- Optionally create one or more queues within that pool (or use the pool’s default queue).
- Point your deployment at the work_pool (and optionally a work_queue within that pool).
Example outline (Terraform)
- Note: field names may differ slightly by provider version. If you share your current TF snippet, I can adapt it precisely.
provider "prefect" {
# If self-hosted server behind ingress, be sure to include /api
server_url = "<https://prefect.yourdomain.com/api>"
# If using Cloud, use account/workspace + API key instead
# api_key = var.prefect_api_key
}
resource "prefect_work_pool" "k8s_pool" {
name = "k8s-pool"
type = "kubernetes"
# base_job_template typically includes your k8s job spec
# base_job_template = jsonencode({ ... })
}
# Optional: create a named queue within that pool (or use the default queue)
# Depending on provider version, this resource may require pool scoping:
# e.g., work_pool_name or work_pool_id
resource "prefect_work_queue" "terms_and_conditions" {
work_pool_name = prefect_work_pool.k8s_pool.name
name = "terms-and-conditions"
# Optional: concurrency_limit, priority, filter, etc.
}
# Your deployment can reference the pool and (optionally) the queue
# resource "prefect_deployment" "my_deployment" {
# name = "my-deployment"
# work_pool_name = prefect_work_pool.k8s_pool.name
# work_queue_name = prefect_work_queue.terms_and_conditions.name
# ...
# }
If you’re actually on Prefect 2.x, queues are still first-class, but I’d still suggest moving to workers + work pools or upgrading to 3.x.
Could you share:
- Prefect version (Server/Cloud) and how you’re hosting it (k8s ingress details)?
- Your Terraform provider version and provider configuration (redact secrets)?
- The Terraform snippet for prefect_work_queue
you’re using?
With that, I can pinpoint whether this is an API-version mismatch, a provider-version issue, or an ingress/base URL problem.
Helpful referencesMarvin
09/04/2025, 1:12 PM