<@ULVA73B9P> how do I manage dependencies inside a...
# marvin-ai
a
@Marvin how do I manage dependencies inside a container running in k8s?
m
thought for 63.4 seconds
Let me provide you with a practical guide for managing dependencies in your Kubernetes-deployed Prefect containers. Here's how to do it: 1. Create a
requirements.txt
File
Copy code
txt
# requirements.txt
prefect>=3.0.0
# Add your other dependencies here
pandas==2.0.0
requests>=2.28.0
# etc...
2. Create a Dockerfile
Copy code
dockerfile
FROM python:3.9-slim

# Install system dependencies if needed
RUN apt-get update && apt-get install -y \
    gcc \
    && rm -rf /var/lib/apt/lists/*

# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt

# Copy your flow code
COPY flows/ /flows/

# Set working directory
WORKDIR /flows
3. Build and Push the Image
Copy code
bash
docker build -t your-registry/your-image:tag .
docker push your-registry/your-image:tag
4. Deploy Your Flow When deploying your flow, you'll need to specify the container image. Here's an example:
Copy code
python
from prefect import flow
from prefect.deployments import Deployment

@flow
def my_flow():
    # your flow code here
    pass

if __name__ == "__main__":
    deployment = flow.from_source(
        source="<https://github.com/your-repo/your-project>",
        entrypoint="flows/your_flow.py:my_flow",
    ).deploy(
        name="k8s-flow",
        work_pool_name="your-k8s-pool",
        image="your-registry/your-image:tag"
    )
Key Points to Remember: 1. System Dependencies: If you need system-level packages, install them using
apt-get
(or the appropriate package manager) in your Dockerfile. 2. Python Dependencies: List all Python packages in
requirements.txt
. Make sure to pin versions to avoid unexpected updates. 3. Base Image Selection: Choose an appropriate base image.
python:3.9-slim
is a good starting point, but you might need a different version or a more complete image based on your needs. 4. Multi-stage Builds: For more complex dependencies or to reduce image size, consider using multi-stage Docker builds. Best Practices: - Always pin your dependency versions to ensure reproducibility - Use Docker layer caching effectively by putting the most stable dependencies first - Consider using virtual environments within the container if you have complex dependency requirements - Document any special system requirements in your repository If you need to debug dependency issues: 1. You can exec into the container to check installed packages:
kubectl exec -it <pod-name> -- /bin/bash
2. Use
pip list
inside the container to verify installed packages 3. Check the container logs for any import or dependency-related errors Would you like me to elaborate on any of these points or provide more specific examples for your use case?
a
@Marvin can I use uv inside the docker? if so how?
m
thought for 45.9 seconds
Yes, you can absolutely use
uv
in your Docker containers, and it's a great choice for faster dependency installation! Here's how to implement it: 1. Basic Dockerfile with uv
Copy code
dockerfile
FROM python:3.9-slim

# Install uv
COPY --from=<http://ghcr.io/astral-sh/uv:latest|ghcr.io/astral-sh/uv:latest> /uv /uvx /bin/

# Set UV to install packages system-wide
ENV UV_SYSTEM_PYTHON=1

# Copy requirements
COPY requirements.txt .

# Install dependencies with uv (much faster than pip!)
RUN uv pip install -r requirements.txt

# Copy your flow code
COPY flows/ /flows/

WORKDIR /flows
2. Alternative Approach with Virtual Environment
Copy code
dockerfile
FROM python:3.9-slim

# Install uv
COPY --from=<http://ghcr.io/astral-sh/uv:latest|ghcr.io/astral-sh/uv:latest> /uv /uvx /bin/

# Create and activate a virtual environment using uv
RUN uv venv /venv
ENV PATH="/venv/bin:$PATH"

# Copy and install requirements
COPY requirements.txt .
RUN uv pip install -r requirements.txt

# Copy your flow code
COPY flows/ /flows/

WORKDIR /flows
3. Using uv's pip-compile (for dependency locking)
Copy code
dockerfile
FROM python:3.9-slim as builder

# Install uv for dependency compilation
COPY --from=<http://ghcr.io/astral-sh/uv:latest|ghcr.io/astral-sh/uv:latest> /uv /uvx /bin/

# Copy requirements files
COPY <http://requirements.in|requirements.in> .

# Generate locked requirements
RUN uv pip compile <http://requirements.in|requirements.in> -o requirements.txt

FROM python:3.9-slim

# Copy uv from builder
COPY --from=<http://ghcr.io/astral-sh/uv:latest|ghcr.io/astral-sh/uv:latest> /uv /uvx /bin/
ENV UV_SYSTEM_PYTHON=1

# Copy the compiled requirements
COPY --from=builder requirements.txt .

# Install dependencies
RUN uv pip install -r requirements.txt

# Copy your flow code
COPY flows/ /flows/

WORKDIR /flows
Key Benefits of Using uv: 1. Speed: uv is significantly faster than pip (often 5-10x faster) 2. Better Caching: uv's installation process works better with Docker layer caching 3. Reliable Builds: uv's deterministic installs help ensure consistent environments Best Practices: 1. Use
UV_SYSTEM_PYTHON=1
when you want to install packages system-wide 2. Consider using
uv pip compile
for generating locked requirements 3. If you need both development and production dependencies:
Copy code
dockerfile
# Install production dependencies
RUN uv pip install -r requirements.txt

# Install development dependencies only in dev stage
RUN if [ "${ENVIRONMENT}" = "development" ] ; then uv pip install -r requirements-dev.txt ; fi
Pro Tips: - You can use uv's lockfile feature to ensure deterministic builds:
Copy code
bash
# On your local machine
uv pip compile <http://requirements.in|requirements.in> -o requirements.txt
- For faster builds, you can use uv's parallel installation:
Copy code
dockerfile
RUN uv pip install --no-cache -r requirements.txt
Would you like me to explain any of these approaches in more detail or provide examples for a specific use case? I can also share some benchmarks comparing uv vs pip installation times in Docker if that would be helpful.
a
@Marvin can you give me an example of a flow using the concurrenttaskrunner that creates a progress artifact and updates it with each concurrent task completion?
m
thought for 181.9 seconds
Short answer: in Prefect 3.x there isn’t a
ConcurrentTaskRunner
. Use
ThreadPoolTaskRunner
for local concurrency. Here’s a complete example that creates a single progress artifact and updates it as each concurrent task finishes.
Copy code
from prefect import flow, task, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.futures import as_completed
from prefect.artifacts import create_progress_artifact, update_progress_artifact

import random
import time


@task
def do_work(i: int) -> int:
    # pretend this is useful
    time.sleep(random.uniform(0.3, 1.5))
    return i * i


@flow(task_runner=ThreadPoolTaskRunner(max_workers=8))
def concurrent_progress(n: int = 20):
    logger = get_run_logger()

    # Create a progress artifact at 0%
    artifact_id = create_progress_artifact(
        progress=0.0,
        description=f"Processing {n} items"
    )

    # Kick off tasks concurrently
    futures = [do_work.submit(i) for i in range(n)]

    completed = 0
    for fut in as_completed(futures):
        # Surface errors from each task (and keep going)
        try:
            result = fut.result()
            <http://logger.info|logger.info>(f"Task finished with result: {result}")
        except Exception as exc:
            logger.warning(f"A task failed: {exc!r}")
        finally:
            completed += 1
            update_progress_artifact(
                artifact_id=artifact_id,
                progress=completed / n,                # 0.0 -> 1.0
                description=f"{completed}/{n} completed"
            )

    # Optionally return all results (will raise if any failed)
    return [f.result() for f in futures]


if __name__ == "__main__":
    concurrent_progress()
Notes: - Progress artifacts expect a float between 0.0 and 1.0. The description is optional but nice for humans. - Using
as_completed
keeps progress updates tied to real task completions. - If you truly are on Prefect 2.x and want
ConcurrentTaskRunner
, say so; otherwise this is the 3.x way.
a
@Marvin I’m runnign a flow within a flow using run_deployment(). How can I extract the value returned by the flow triggered by run_deployment() within the other flow?
m
thought for 274.9 seconds
Great question. Assuming Prefect 3.x:
run_deployment()
returns a FlowRun object. To get the actual value returned by the child flow, wait for that flow run to finish and then read the result from its final state. Here are two ready-to-use patterns: Synchronous parent flow
Copy code
from prefect import flow
from prefect.deployments import run_deployment
from prefect.client.orchestration import SyncPrefectClient
from prefect.states import get_state_result

@flow
def parent():
    # Kick off the child flow via its deployment
    flow_run = run_deployment(
        name="my-child-flow/my-deployment",
        parameters={"x": 1},
    )

    # Wait for the child run to reach a terminal state
    with SyncPrefectClient() as client:
        final_state = client.wait_for_flow_run(flow_run.id)

    # Extract the return value from the child's final state
    child_result = get_state_result(final_state)

    return child_result
Async parent flow
Copy code
from prefect import flow
from prefect.deployments import run_deployment
from prefect.client.orchestration import get_client
from prefect.states import get_state_result

@flow
async def parent_async():
    flow_run = run_deployment("my-child-flow/my-deployment", parameters={"x": 1})

    async with get_client() as client:
        final_state = await client.wait_for_flow_run(flow_run.id)

    child_result = get_state_result(final_state)
    return child_result
Important notes - Result persistence: To read a child flow’s return value across processes, that result must be persisted. In most setups this works out of the box, but if you’ve disabled default persistence, either: - Enable it globally (env var
PREFECT_RESULTS_PERSIST_BY_DEFAULT=true
), or - Ensure the child flow persists its result explicitly:
Copy code
from prefect import flow

    @flow(persist_result=True)
    def child(...):
        return {"some": "value"}
- Large/complex outputs: For big payloads, consider writing to storage within the child flow and returning a reference (e.g., URL, key). Artifacts are also a good fit for sharing rich outputs across runs. References - Results (persistence and retrieval): Prefect 3.x docs: Results If you can share whether your parent flow is sync or async and how big the child’s return value is, I can tailor the example further.
a
@Marvin I get this Encountered exception during execution: MissingResult(‘State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.’) Traceback (most recent call last): File “/usr/local/lib/python3.12/site-packages/prefect/flow_engine.py”, line 1335, in run_context yield self File “/usr/local/lib/python3.12/site-packages/prefect/flow_engine.py”, line 1397, in run_flow_async await engine.call_flow_fn() File “/usr/local/lib/python3.12/site-packages/prefect/flow_engine.py”, line 1349, in call_flow_fn result = await call_with_parameters(self.flow.fn, self.parameters) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File “/app/pe_prefect/flows/batch_users.py”, line 47, in batch_users new_failed_users = await get_state_result(flow_run.state) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File “/usr/local/lib/python3.12/site-packages/prefect/states.py”, line 85, in get_state_result return await _get_state_result( ^^^^^^^^^^^^^^^^^^^^^^^^ File “/usr/local/lib/python3.12/site-packages/prefect/states.py”, line 170, in _get_state_result raise MissingResult( prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
m
thought for 170.7 seconds
That error means you’re trying to read a result from a state that doesn’t have persisted data. Two things to fix: 1) Wait for the child flow run to finish and use its final state Right now you’re calling
await get_state_result(flow_run.state)
on the state returned by
run_deployment()
, which is not the final state. Instead, wait for completion, then read the result:
Copy code
from prefect.deployments import run_deployment
from prefect.client.orchestration import get_client
from prefect.states import get_state_result

# start the child run
flow_run = run_deployment("my-child-flow/my-deployment", parameters={...})

# wait for terminal state, then fetch the result
async with get_client() as client:
    final_state = await client.wait_for_flow_run(flow_run.id)

new_failed_users = await get_state_result(final_state)
2) Ensure the child flow’s return value is persisted To retrieve a child flow’s return value across runs, the result must be persisted. Enable persistence on the child flow (recommended):
Copy code
from prefect import flow

@flow(persist_result=True)  # ensure the return value is stored
def child_flow(...):
    ...
    return some_value
If you prefer config: - Set env var
PREFECT_RESULTS_PERSIST_BY_DEFAULT=true
to persist results by default - Optionally set a default storage block, e.g.
PREFECT_RESULTS_DEFAULT_STORAGE_BLOCK="s3/my-block"
(or any configured block) so results have a durable location Reference: Results in Prefect 3.x Quick checklist - Use
wait_for_flow_run(flow_run.id)
to get the final state before reading results - Make sure the child flow persists its result (
@flow(persist_result=True)
or global setting) - Keep the child’s return value serializable (or configure a serializer/storage as needed) If you share your child flow definition (and whether you can modify it), I can tailor the exact changes.
a
what does it mean when persist_result is set to True?
@Marvin what does it mean when persist_result is set to True?
m
thought for 37.0 seconds
Setting persist_result=True on a flow (or task) tells Prefect to serialize the function’s return value and store it in your configured result storage. The flow’s final state will then contain a reference to that stored data instead of the raw value. Later, when you call
get_state_result(state)
, Prefect will fetch and deserialize that stored value for you. Key implications - Cross-process retrieval: Other processes (like a parent flow calling
run_deployment
) can fetch the child’s return value from the API because the data is durably stored, not just in memory. - Storage and serialization: You can control where and how it’s stored via the decorator: -
result_storage=
a storage block (e.g., S3, GCS, Azure, local filesystem) -
result_serializer=
the serializer to use (e.g., JSON/pickle) - Defaults and global config: - If you don’t specify
result_storage
on the flow/task, Prefect uses your default results storage (if configured). - You can enable persistence by default for all flows/tasks with the env var
PREFECT_RESULTS_PERSIST_BY_DEFAULT=true
. - You can set a default storage block with
PREFECT_RESULTS_DEFAULT_STORAGE_BLOCK
(e.g.,
s3/my-results-block
) so Prefect knows where to put results. Example: ensure a child flow’s return value is retrievable by a parent
Copy code
from prefect import flow
from prefect.filesystems import S3Bucket

s3 = S3Bucket.load("my-results")  # configure once in UI/CLI

@flow(persist_result=True, result_storage=s3)
def child_flow(x: int) -> dict:
    return {"value": x + 1}
Then in the parent, wait for the child to finish and fetch its result:
Copy code
from prefect.deployments import run_deployment
from prefect.client.orchestration import get_client
from prefect.states import get_state_result

async def parent():
    fr = run_deployment("child-flow/deployment", parameters={"x": 41})
    async with get_client() as client:
        final_state = await client.wait_for_flow_run(fr.id)
    result = await get_state_result(final_state)  # fetches from storage
    return result
If
persist_result
is not enabled (and there’s no default persistence), you’ll see MissingResult when trying to read a state fetched from the API—exactly the error you hit. Docs: Results in Prefect 3.x
a
@Marvin Finished in state Failed(‘Flow run encountered an exception: ValueError: Path /home/prefect/.prefect/storage/c0e1459116944899815adc81758ce18a does not exist.’) 023737 PM prefect.flow_runs Encountered exception during execution: ValueError(‘Path /home/prefect/.prefect/storage/c0e1459116944899815adc81758ce18a does not exist.’) Traceback (most recent call last): File “/usr/local/lib/python3.12/site-packages/prefect/flow_engine.py”, line 1335, in run_context yield self File “/usr/local/lib/python3.12/site-packages/prefect/flow_engine.py”, line 1397, in run_flow_async await engine.call_flow_fn() File “/usr/local/lib/python3.12/site-packages/prefect/flow_engine.py”, line 1349, in call_flow_fn result = await call_with_parameters(self.flow.fn, self.parameters) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File “/app/pe_prefect/flows/batch_users.py”, line 47, in batch_users new_failed_users = await get_state_result(flow_run.state) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File “/usr/local/lib/python3.12/site-packages/prefect/states.py”, line 85, in get_state_result return await _get_state_result( ^^^^^^^^^^^^^^^^^^^^^^^^ File “/usr/local/lib/python3.12/site-packages/prefect/states.py”, line 160, in _get_state_result result = await _get_state_result_data_with_retries( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File “/usr/local/lib/python3.12/site-packages/prefect/states.py”, line 117, in _get_state_result_data_with_retries record = await ResultStore._from_metadata(state.data) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File “/usr/local/lib/python3.12/site-packages/prefect/results.py”, line 371, in _from_metadata result = await store.aread(metadata.storage_key) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File “/usr/local/lib/python3.12/site-packages/prefect/results.py”, line 655, in aread return await self._read(key=key, holder=holder, _sync=False) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File “/usr/local/lib/python3.12/site-packages/prefect/results.py”, line 606, in _read content = await call_explicitly_async_block_method( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File “/usr/local/lib/python3.12/site-packages/prefect/_internal/compatibility/blocks.py”, line 19, in call_explicitly_async_block_method return await getattr(block, f”a{method}“)(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File “/usr/local/lib/python3.12/site-packages/prefect/filesystems.py”, line 306, in aread_path raise ValueError(f”Path {path} does not exist.“) ValueError: Path /home/prefect/.prefect/storage/c0e1459116944899815adc81758ce18a does not exist.
m
thought for 170.4 seconds
Thanks for the traceback — it pinpoints the issue: Prefect tried to fetch the child flow’s persisted result from a LocalFileSystem path that doesn’t exist in the process reading it. What’s happening - Your child flow’s result was persisted to the default LocalFileSystem (typically ~/.prefect/storage on the machine/container where the child ran). - The parent flow (or wherever you’re calling get_state_result) is a different environment, so that local path isn’t there. Prefect tries to read /home/prefect/.prefect/storage/..., but that file only exists on the child’s host. Fix it in two parts 1) Wait for the final state before reading the result Don’t read
flow_run.state
immediately after
run_deployment()
. Wait for the child to finish first, then read the final state:
Copy code
from prefect.deployments import run_deployment
from prefect.client.orchestration import get_client
from prefect.states import get_state_result

flow_run = run_deployment("child-flow/deployment", parameters={...})

async with get_client() as client:
    final_state = await client.wait_for_flow_run(flow_run.id)

value = await get_state_result(final_state)
2) Use a shared result store (not the default local path) For cross-process retrieval, persist the child’s result to a storage both sides can access. Two common options: - Remote/shared storage (recommended) - Create a RemoteFileSystem block pointing to S3/GS/Azure/… (via fsspec). - Use that block in your child flow. Example:
Copy code
# One-time setup (or create the block in the UI)
from prefect.filesystems import RemoteFileSystem

remote = RemoteFileSystem(basepath="<s3://my-bucket/prefect-results>")
remote.save("results", overwrite=True)
Then make the child persist there:
Copy code
from prefect import flow
from prefect.filesystems import RemoteFileSystem

remote = RemoteFileSystem.load("results")

@flow(persist_result=True, result_storage=remote)
def child_flow(...):
    return {"ok": True}
- Shared local volume (if all runs share the same mounted path) - Create a LocalFileSystem block with a basepath that is a shared, mounted volume available to both parent and child at the same path.
Copy code
from prefect.filesystems import LocalFileSystem
shared = LocalFileSystem(basepath="/mnt/prefect-results")
shared.save("shared-results", overwrite=True)

@flow(persist_result=True, result_storage=LocalFileSystem.load("shared-results"))
def child_flow(...):
    return {"ok": True}
Global defaults (optional) - Set env vars so you don’t have to annotate every flow: -
PREFECT_RESULTS_PERSIST_BY_DEFAULT=true
-
PREFECT_RESULTS_DEFAULT_STORAGE_BLOCK=remote-file-system/results
(or
local-file-system/shared-results
) - You can inspect your blocks:
Copy code
prefect block ls
prefect block inspect remote-file-system/results
Common pitfall to avoid - Reading
flow_run.state
immediately after
run_deployment()
and/or persisting to a local path that isn’t shared will lead to MissingResult or “Path does not exist.” Docs - Results overview and configuration: Prefect 3.x: Results If you can confirm whether your parent and child run on different machines/containers, I can suggest the best storage option (remote vs shared local) and help with exact block configuration.
a
@Marvin the client doesn’t have a wait_for_flow_run method.
m
thought for 102.6 seconds
You’re right — the client doesn’t have a wait_for_flow_run method. The correct helper is the module-level function prefect.flow_runs.wait_for_flow_run. Use it like this (async parent flow):
Copy code
from prefect import flow
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result

@flow
async def parent():
    # start the child
    fr = run_deployment("child-flow/deployment", parameters={...})

    # wait until the child reaches a terminal state
    fr = await wait_for_flow_run(fr.id)

    # fetch the child's return value (requires persisted results)
    value = await get_state_result(fr.state)
    return value
Notes - You can pass an existing client if you already have one open:
await wait_for_flow_run(fr.id, client=my_client)
. - Make sure the child flow persists its result (e.g.,
@flow(persist_result=True, result_storage=<shared or remote block>)
) so the parent process can fetch it. - Using local default storage across different machines/containers will cause “path does not exist” errors. Prefer a shared/remote storage block (S3/GCS/Azure/RemoteFileSystem) for results. Refs - Helper function: prefect.flow_runs.wait_for_flow_run - Result persistence: Prefect 3.x Results
a
@Marvin I’m trying to create a work queue on a deployment in k8s using terraform but I keep getting this error │ Error: Error during create Work Queue │ │ with prefect_work_queue.terms_and_conditions, │ on work_queues.tf line 1, in resource “prefect_work_queue” “terms_and_conditions”: │ 1: resource “prefect_work_queue” “terms_and_conditions” { │ │ Could not create Work Queue, unexpected error: failed to create work queue: status code=405 Method Not Allowed, error={“detail”:“Method Not Allowed”} ╵
m
thought for 357.3 seconds
Thanks for the details — that 405 “Method Not Allowed” when creating a work queue via Terraform usually points to a mismatch between the API you’re calling and the server you’re hitting. A few important notes for Prefect 3.x: - Workers poll Work Pools. Work Queues still exist, but they are scoped to a Work Pool. Creating a “global” work queue (like in older 2.x patterns) will fail against 3.x APIs. - If your Terraform resource is trying to POST to a legacy work-queue endpoint (not scoped to a pool), Prefect 3.x can return 405. - 405 can also occur if your provider is pointing at the wrong base URL (e.g., UI root) instead of the API endpoint behind your k8s ingress. Quick checks 1) Which Prefect version are you targeting? If 3.x (default assumption), make sure you’re creating queues within a Work Pool (or just use the pool’s default queue). 2) Which Terraform provider version are you using? If it doesn’t support pool-scoped queues (or only supports legacy queues), upgrade to the latest Prefect Terraform provider. 3) Verify the provider’s URL config. For Prefect Server behind k8s/ingress, ensure the provider points at the API endpoint (commonly includes
/api
). For example: - Correct:
<https://prefect.mydomain.com/api>
- Incorrect:
<https://prefect.mydomain.com>
(may route to UI and return 405 on POST) There’s a known class of proxy/ingress misroutes that trigger 405s; see the provider repo’s troubleshooting notes. 4) Sanity check via CLI against the same endpoint: - List pools:
Copy code
prefect work-pool ls
- Create a queue within a pool (replace names accordingly):
Copy code
prefect work-queue create --work-pool "my-pool" "terms-and-conditions"
If the CLI works but Terraform doesn’t, it’s likely provider config or an outdated resource. Recommended pattern in Prefect 3.x - Create a Work Pool (e.g., type
kubernetes
) and run a worker against it. - Optionally create one or more queues within that pool (or use the pool’s default queue). - Point your deployment at the work_pool (and optionally a work_queue within that pool). Example outline (Terraform) - Note: field names may differ slightly by provider version. If you share your current TF snippet, I can adapt it precisely.
Copy code
provider "prefect" {
  # If self-hosted server behind ingress, be sure to include /api
  server_url = "<https://prefect.yourdomain.com/api>"
  # If using Cloud, use account/workspace + API key instead
  # api_key = var.prefect_api_key
}

resource "prefect_work_pool" "k8s_pool" {
  name = "k8s-pool"
  type = "kubernetes"

  # base_job_template typically includes your k8s job spec
  # base_job_template = jsonencode({ ... })
}

# Optional: create a named queue within that pool (or use the default queue)
# Depending on provider version, this resource may require pool scoping:
# e.g., work_pool_name or work_pool_id
resource "prefect_work_queue" "terms_and_conditions" {
  work_pool_name = prefect_work_pool.k8s_pool.name
  name           = "terms-and-conditions"
  # Optional: concurrency_limit, priority, filter, etc.
}

# Your deployment can reference the pool and (optionally) the queue
# resource "prefect_deployment" "my_deployment" {
#   name            = "my-deployment"
#   work_pool_name  = prefect_work_pool.k8s_pool.name
#   work_queue_name = prefect_work_queue.terms_and_conditions.name
#   ...
# }
If you’re actually on Prefect 2.x, queues are still first-class, but I’d still suggest moving to workers + work pools or upgrading to 3.x. Could you share: - Prefect version (Server/Cloud) and how you’re hosting it (k8s ingress details)? - Your Terraform provider version and provider configuration (redact secrets)? - The Terraform snippet for
prefect_work_queue
you’re using? With that, I can pinpoint whether this is an API-version mismatch, a provider-version issue, or an ingress/base URL problem. Helpful references
- Work pools and workers (Prefect 3.x): Docs: Work pools - CLI for work queues (3.x): Docs: prefect work-queue - Migrating to workers/pools: Upgrade agents to workers - Provider repo (latest releases, troubleshooting): prefecthq/terraform-provider-prefect - Blog: Managing deployments with Terraform: Managing Prefect deployments with Terraform