<@ULVA73B9P> New to prefect. When creating flows w...
# ask-marvin
a
@Marvin New to prefect. When creating flows with tasks that have dependencies, all the tasks with dependencies fail instantly when it is their time to run. I am using a process worker pool on a ubuntu server. Why might this be
m
thought for 614.3 seconds
A few common things can cause “dependent” tasks to fail immediately in Prefect 3.x—especially when running on a Process work pool. The failure you’re seeing is usually one of these: 1) An upstream task actually failed - Prefect marks downstream tasks as Failed immediately with an UpstreamTaskError when any upstream task fails. - Action: Check the first failing task’s state message/traceback. In the UI, open the flow run > click the earliest failing task > view the state details/logs. Or via CLI:
prefect task-run logs <TASK_RUN_ID>
. 2) ProcessPoolTaskRunner pickling issues - If you are using
@flow(task_runner=ProcessPoolTaskRunner())
, all task inputs/outputs must be picklable and task functions must be defined at module top-level (no lambdas/inner functions). - Symptoms often look like dependent tasks failing “instantly” once they try to materialize an upstream result and you’ll see errors like “TypeError: cannot pickle …” or “Can’t pickle local object …” - Fixes: - Ensure tasks and everything they pass between each other are picklable (no open DB connections, locks, client objects, etc.). - If your workload is I/O bound, switch to threads to avoid pickling constraints:
Copy code
from prefect import flow
    from prefect.task_runners import ThreadPoolTaskRunner

    @flow(task_runner=ThreadPoolTaskRunner())
    def my_flow():
        ...
- For CPU-bound work, keep process pools but only pass simple/serializable data between tasks. 3) Missing or lost results (after a restart) - If the worker or flow process restarts mid-run and your tasks aren’t persisting results, downstream tasks can’t retrieve upstream outputs and may fail immediately with a MissingResult-type message. - Fix: Enable result persistence on tasks that produce outputs used later:
Copy code
from prefect import task

  @task(persist_result=True)
  def produce():
      ...
Consider configuring a durable result store if your runs may outlive a single process. 4) Mixing
.submit()
and direct calls in ways that force eager execution - In Prefect 3.x, use
.submit()
to schedule tasks and pass futures to downstream tasks. Only call
.result()
when you truly need the value. - A safe pattern:
Copy code
a_future = a.submit()
  b_future = b.submit(a_future)  # pass the future, not a_future.result()
If you call a task like a normal function, it runs immediately and errors propagate early. 5) Environment issues on the worker - If imports/dependencies or env vars are missing on the Ubuntu server, tasks can fail right away when they start running in the worker environment. - Action: Confirm your worker environment has the same Python deps and code as your dev machine, and that any secrets/env vars your tasks need are present. Quick diagnostic checklist: - Which task runner are you using? Do you have
@flow(task_runner=ProcessPoolTaskRunner())
in your code? - What is the exact state message/traceback for the first failing dependent task? - Can you share a minimal example of two tasks where the first returns a value and the second consumes it? - What Prefect version are you on (
prefect version
)? If you can paste one of the failing task’s state messages (or a minimal code snippet), I can pinpoint which of the above it is and suggest a precise fix.
a
@Marvin This is a sample piece of code
Copy code
from prefect import flow, task

@task
def task_a():
    print("Running task A")
    return "A's result"

@task
def task_b():
    print("Running task B")
    return "B's result"

@task
def no_op_task(*dependencies):
    # The 'dependencies' tuple will contain the results from task_a and task_b,
    # but the code doesn't use it.
    print("Running no-op task, waiting for dependencies to finish...")
    # This task will run only after task_a and task_b are complete.

@flow
def my_flow():
    a_future = task_a.submit()
    b_future = task_b.submit()

    # The no_op_task will wait for both a_future and b_future to resolve
    # before it starts.
    no_op_task.submit(a_future, b_future)

# Running the flow
if __name__ == "__main__":
    my_flow()
And this is the error on the dependent task
Crash detected! Execution was cancelled by the runtime environment.
m
thought for 81.1 seconds
Thanks for the snippet — that wiring is correct and should work. Given the error text, the problem isn’t your dependency pattern; it’s that the task process is being terminated by the runtime. What “Crash detected! Execution was cancelled by the runtime environment.” means - It’s a crash state, not a normal Python exception. Prefect is telling you the process running that task was killed (e.g., SIGTERM, OOM killer, worker shutdown/restart). Downstream tasks often show this “instantly” as soon as they’re scheduled because the runtime stops them before they even start user code. Most common causes with a Process work pool on Ubuntu 1) Worker/process termination or signals - The worker (or its parent service) is restarted or sends SIGTERM that cascades to child processes (common if running under systemd with aggressive KillMode). - OOM killer terminates the process (less likely with your trivial code). - Action: Check worker logs around the crash time for shutdown/restart messages or OOM. If under systemd, check
journalctl -u <your-worker-service>
for SIGTERM/OOM. 2) Nested multiprocessing/signal interaction - If you set
@flow(task_runner=ProcessPoolTaskRunner())
, your flow will create more processes inside a process-launched job. That combo can be fragile with signals and can present exactly as “cancelled by runtime”. - Quick test: switch the flow to threads to rule out multiprocessing issues:
Copy code
from prefect import flow, task
     from prefect.task_runners import ThreadPoolTaskRunner

     @task
     def task_a():
         print("Running task A")
         return "A's result"

     @task
     def task_b():
         print("Running task B")
         return "B's result"

     @task
     def no_op_task(*dependencies):
         print("Running no-op task...")

     @flow(task_runner=ThreadPoolTaskRunner())
     def my_flow():
         a = task_a.submit()
         b = task_b.submit()
         no_op_task.submit(a, b)

     if __name__ == "__main__":
         my_flow()
- If this fixes it, the issue is multiprocessing+signals. For CPU-bound workloads you can still use processes, but avoid nested process pools in process-based infrastructure and ensure everything passed between tasks is picklable and top-level. 3) Environment/service management - The worker environment may be missing deps/env vars or the service manager is stopping child processes. - Action: Ensure the same code/deps exist on the worker host; if using systemd, try
KillMode=process
for the worker service to avoid killing children on reload; confirm no resource limits (MemoryMax, etc.). Diagnostics to run now - Look at the first crashed task’s logs and state message: -
prefect task-run logs <TASK_RUN_ID>
-
prefect flow-run logs <FLOW_RUN_ID>
- Check worker logs around the failure for any shutdown/restart/OOM. - Try the same flow with ThreadPoolTaskRunner (above). Does the crash go away? - Confirm Prefect version (
prefect version
) and whether you’ve set a task runner explicitly. If you can share: - Prefect version - Whether you’re using
ProcessPoolTaskRunner
- How the worker is started (manual shell vs systemd/service) and any worker logs around the crash - One crashed task’s full state message/traceback …I can pinpoint the root cause and suggest the exact fix.
a
@Marvin Could it be related to the fact that my initial tasks use subprocess.run?
m
thought for 68.1 seconds
Short answer: yes — frequent use of subprocess.run in your upstream tasks can absolutely be the culprit, especially when the flow is running under process-based infrastructure. Why it happens - Process-based execution + subprocesses: A flow run started by a Process work pool is itself a child process. If your flow also uses a process-based task runner (ProcessPoolTaskRunner) or spawns subprocesses, signal handling can get tricky. A signal (SIGTERM/SIGKILL) to the flow process or its process group can terminate child processes abruptly. Prefect then records “Crash detected! Execution was cancelled by the runtime environment.” for tasks that were about to run. - Non-zero exit from subprocess.run(check=True): If your command exits non-zero and you used check=True, it raises CalledProcessError causing the upstream task to fail; downstream tasks will then fail immediately (but usually as upstream failures, not a crash). - Returning complex objects: Returning a CompletedProcess or other non-serializable objects to downstream tasks can create issues in process-based runners. You might see failures when resolving inputs. What to try 1) Quick isolation test - Switch to threads to rule out multiprocessing/signal issues:
Copy code
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner

@flow(task_runner=ThreadPoolTaskRunner())
def my_flow():
    ...
If crashes stop, you hit a multiprocessing + subprocess interaction. 2) Make subprocess calls “safe” in tasks - Don’t return CompletedProcess; return simple/JSON-serializable data. - Capture output and avoid check=True unless you explicitly want to raise:
Copy code
import subprocess

@task
def run_cmd():
    res = subprocess.run(
        ["bash", "-lc", "your-command"],
        capture_output=True,
        text=True,
        check=False,
        start_new_session=True  # isolates signal handling
    )
    return {"code": res.returncode, "out": res.stdout, "err": res.stderr}
- If you need a failure on non-zero exit, raise a clear exception yourself so downstream tasks fail with an UpstreamTaskError (not a crash):
Copy code
if res.returncode != 0:
    raise RuntimeError(f"Command failed ({res.returncode}): {res.stderr}")
3) Avoid nested process pools - If you are using ProcessPoolTaskRunner, consider switching to ThreadPoolTaskRunner when your tasks call subprocesses. Threads work well for I/O-bound shell calls and avoid the extra process layer. 4) Persist results if runs may be interrupted - If your worker might restart, enable result persistence on tasks whose outputs are consumed later:
Copy code
from prefect import task

@task(persist_result=True)
def produce():
    ...
5) Inspect logs around the crash - Check worker/host logs for signals, restarts, or OOM around the time of the crash. - Grab Prefect logs to confirm what happened first: -
prefect flow-run logs <FLOW_RUN_ID>
-
prefect task-run logs <TASK_RUN_ID>
If you can share: - Whether you’re using ProcessPoolTaskRunner in the flow - A snippet of the subprocess.run you’re calling - One crashed task’s full state message/logs I can give you a precise recommendation (e.g., threads vs processes, specific subprocess.run flags, or service configuration tweaks).
a
@Marvin This works
Copy code
from prefect import get_run_logger, task, flow
from dotenv import load_dotenv
from prefect.task_runners import ThreadPoolTaskRunner
import time

@task
def task_a():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Running task A...")
    return "A's result"

@task
def task_b():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Running task B...")
    return "B's result"

@task
def no_op_task(*dependencies):
    logger = get_run_logger()
    <http://logger.info|logger.info>("Running no-op task...")

@flow(task_runner=ThreadPoolTaskRunner())
def my_flow():
    a = task_a.submit()
    b = task_b.submit()
    no_op_task.submit(a, b)

if __name__ == "__main__":
    my_flow()
This doesn't
Copy code
from prefect import get_run_logger, task, flow
from dotenv import load_dotenv
from prefect.task_runners import ThreadPoolTaskRunner
import time

@task
def task_a():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Running task A...")
    time.sleep(5) 
    return "A's result"

@task
def task_b():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Running task B...")
    return "B's result"

@task
def no_op_task(*dependencies):
    logger = get_run_logger()
    <http://logger.info|logger.info>("Running no-op task...")

@flow(task_runner=ThreadPoolTaskRunner())
def my_flow():
    a = task_a.submit()
    b = task_b.submit()
    no_op_task.submit(a, b)

if __name__ == "__main__":
    my_flow()
m
thought for 141.7 seconds
Yes — the difference you’re seeing is because the flow returns immediately after submitting tasks. When the flow function exits, the runtime begins shutting down and any tasks that are still running or scheduled can be cancelled by the environment. In your first example they finish fast enough; in the second,
time.sleep(5)
keeps
task_a
running after the flow has already returned, so Prefect cancels the work and you see “Execution was cancelled by the runtime environment.” In Prefect 3.x, always wait for the futures you care about before the flow returns. Easiest fixes: Option A: Block on the last task
Copy code
from prefect import get_run_logger, task, flow
from prefect.task_runners import ThreadPoolTaskRunner
import time

@task
def task_a():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Running task A...")
    time.sleep(5)
    return "A's result"

@task
def task_b():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Running task B...")
    return "B's result"

@task
def no_op_task(*dependencies):
    logger = get_run_logger()
    <http://logger.info|logger.info>("Running no-op task...")

@flow(task_runner=ThreadPoolTaskRunner())
def my_flow():
    a = task_a.submit()
    b = task_b.submit()
    noop = no_op_task.submit(a, b)
    noop.result()  # wait so the flow does not exit early

if __name__ == "__main__":
    my_flow()
Option B: Wait on all futures (if you have many)
Copy code
from prefect.futures import wait

@flow(task_runner=ThreadPoolTaskRunner())
def my_flow():
    a = task_a.submit()
    b = task_b.submit()
    noop = no_op_task.submit(a, b)
    wait([a, b, noop])
Option C: Return a future’s result from the flow (the flow will block on it)
Copy code
@flow(task_runner=ThreadPoolTaskRunner())
def my_flow():
    a = task_a.submit()
    b = task_b.submit()
    return no_op_task.submit(a, b).result()
That should eliminate the crash. If you still see it after waiting on the futures, let me know how you’re starting the worker (systemd, Docker, etc.) and share the worker logs around the failure time so we can check for restarts or signals.
a
@Marvin can I make a list like futures, and then in the end do
wait(futures)
?
m
thought for 29.4 seconds
Yes. Prefect 3.x exposes
prefect.futures.wait
and it takes a list of `PrefectFuture`s. Example:
Copy code
from prefect import flow, task, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.futures import wait

@task
def task_a():
    get_run_logger().info("A")
    return "A"

@task
def task_b():
    get_run_logger().info("B")
    return "B"

@task
def no_op(*deps):
    get_run_logger().info("noop")

@flow(task_runner=ThreadPoolTaskRunner())
def my_flow():
    futures = []
    futures.append(task_a.submit())
    futures.append(task_b.submit())
    futures.append(no_op.submit(futures[0], futures[1]))

    # Block until they’re all done (or raise on crash/failure if you call .result() later)
    done = wait(futures)
    # Optionally access results
    for f in done.done:
        _ = f.result()

if __name__ == "__main__":
    my_flow()
Notes: -
wait
returns a
DoneAndNotDoneFutures
object with
.done
and
.not_done
sets. - You can pass
timeout=<seconds>
if you want to cap total waiting time. - If you want the flow to fail when any task fails, either iterate and call
.result()
on the futures (which will raise) or explicitly check states.
a
Copy code
@flow(name="Sample Processing")
def sample_flow():

    futures = []

    a_flow_future = a_flow.submit()

    ## Chances stuff
    z_task_future = tasks.z_task.submit()
    futures.append(z_task_future)

    b_flow_future = b_flow.submit(a_flow_future)

    c_flow_future = c_flow.submit(a_flow_future)

    d_flow_future = d_flow.submit(b_flow_future,c_flow_future)

    wait(futures)
@Marvin Debug this flow. Is this done properly?
m
thought for 441.7 seconds
Short answer: you’re close, but there are a few fixes and best practices to apply. What’s correct - Using
.submit()
to run tasks and subflows concurrently is supported in Prefect 3.x. Passing Prefect futures as inputs to downstream tasks/subflows is also correct; Prefect will resolve them to results when needed. - Your dependency chain a -> (b, c) -> d is modeled correctly by passing futures. Fixes and suggestions - Import
wait
from
prefect.futures
(it’s not available from the top-level
prefect
import). - If you intend to wait for everything before the flow exits, include all relevant futures in your wait, not just
z_task_future
. Alternatively, return a downstream future’s result to implicitly wait for its entire dependency chain. - Prefer named arguments when passing futures to subflows/tasks to match their signatures. - Optional: set a task runner if you want local concurrency control (threads/processes). Cleaned up example
Copy code
from prefect import flow
from prefect.futures import wait
from prefect.task_runners import ThreadPoolTaskRunner

# assuming a_flow, b_flow, c_flow, d_flow are @flow-decorated
# and tasks.z_task is a @task

@flow(name="Sample Processing", task_runner=ThreadPoolTaskRunner())
def sample_flow():
    a_future = a_flow.submit()

    z_future = tasks.z_task.submit()
    # If b_flow takes the result of a_flow as, say, "a_result", pass it by name
    b_future = b_flow.submit(a_future)  # e.g., b_flow.submit(a_result=a_future)
    c_future = c_flow.submit(a_future)  # e.g., c_flow.submit(a_result=a_future)

    d_future = d_flow.submit(b_future, c_future)  # e.g., d_flow.submit(b=b_future, c=c_future)

    # Option 1: explicitly wait for everything you care about:
    wait([z_future, b_future, c_future, d_future])

    # Option 2 (common): return a result to implicitly wait on its chain
    # return d_future.result()
Notes - You do not have to call
wait()
for Prefect to run the submitted work; Prefect will track submitted tasks/subflows that are part of the flow run. Use
wait()
if you want to block at a specific point, manage timeouts, or ensure “extra” work (like
z_task
) finishes before returning if it’s not part of your returned dependency chain. - Avoid calling
.result()
too early on upstream futures if you want to preserve concurrency; do it at the end (or only when you actually need the value). If you’re seeing a specific error or unexpected behavior (e.g., NameError for
wait
, flows running sequentially, or type mismatches in parameters), share the error and the signatures of
a_flow/b_flow/c_flow/d_flow
and I’ll pinpoint the fix. Helpful docs - Futures API (wait, as_completed): Prefect Futures API - Task runners and concurrency: Task Runners
a
Getting AttributeError: 'Flow' object has no attribute 'submit'
@Marvin Getting AttributeError: 'Flow' object has no attribute 'submit'
m
thought for 103.7 seconds
That error means your environment’s
Flow
object doesn’t have
.submit()
. A couple possibilities: - You’re on Prefect 2.x (flows don’t support
.submit()
there). - Or your subflows aren’t decorated as Prefect flows. Quick check: what Prefect version are you running? You can verify with:
Copy code
prefect version
or
Copy code
python -c "import prefect; print(prefect.__version__)"
If you’re on Prefect 3.x - Ensure your subflows are defined with
@flow
. In 3.x, both tasks and flows can be submitted concurrently. - Import
wait
from
prefect.futures
. Example:
Copy code
from prefect import flow
from prefect.futures import wait

@flow
def a_flow(): ...
@flow
def b_flow(a_result): ...
@flow
def c_flow(a_result): ...
@flow
def d_flow(b_result, c_result): ...
# tasks.z_task is a @task

@flow(name="Sample Processing")
def sample_flow():
    z_future = tasks.z_task.submit()

    a_future = a_flow.submit()

    b_future = b_flow.submit(a_future)   # or b_flow.submit(a_result=a_future)
    c_future = c_flow.submit(a_future)   # or c_flow.submit(a_result=a_future)

    d_future = d_flow.submit(b_future, c_future)  # or d_flow.submit(b_result=b_future, c_result=c_future)

    # Either explicitly wait
    wait([z_future, b_future, c_future, d_future])
    # Or return a result to implicitly wait on the whole chain
    # return d_future.result()
If you’re on Prefect 2.x Flows do not have
.submit()
. Use one of these patterns: - Synchronous subflow calls (simple and common):
Copy code
@flow
def sample_flow():
    z_future = tasks.z_task.submit()
    a_result = a_flow()
    b_result = b_flow(a_result)
    c_result = c_flow(a_result)
    d_result = d_flow(b_result, c_result)
    # optionally wait for z if you care it finishes before exit
    z_future.result()
    return d_result
- Concurrent “subflows” by wrapping each subflow call in a task, then submitting the tasks:
Copy code
from prefect import task, flow

@task
def run_a(): return a_flow()
@task
def run_b(a): return b_flow(a)
@task
def run_c(a): return c_flow(a)
@task
def run_d(b, c): return d_flow(b, c)

@flow
def sample_flow():
    z_future = tasks.z_task.submit()
    a_future = run_a.submit()
    b_future = run_b.submit(a_future)
    c_future = run_c.submit(a_future)
    d_future = run_d.submit(b_future, c_future)
    return d_future.result()
- Or trigger separate flow runs via deployments (asynchronous orchestration):
Copy code
from prefect.deployments import run_deployment

@flow
def sample_flow():
    fr = run_deployment(name="a-flow/a-deployment", as_subflow=True)
    # run_deployment returns a FlowRun; you can poll/wait if desired
Docs/resources - Futures API (3.x): Prefect Futures API - Design patterns discussion: Workflow design patterns If you share your Prefect version and how
a_flow/b_flow/c_flow/d_flow
are defined, I can tailor the exact fix.
a
@Marvin Version: 3.4.14 API version: 0.8.4 Python version: 3.10.12 Profile: ephemeral Server type: server Pydantic version: 2.11.7 I am running this
m
thought for 122.6 seconds
Thanks for the version details — on Prefect 3.4.14,
Flow
objects do not have a
.submit()
method. That’s why you’re seeing: AttributeError: 'Flow' object has no attribute 'submit' How to fix it You have a few solid options depending on whether you truly need the subflows to run concurrently or just want to keep them modular. Option A — Keep subflows, run them sequentially (simple) Call subflows like normal functions; only tasks have `.submit()`:
Copy code
from prefect import flow

@flow(name="Sample Processing")
def sample_flow():
    z_future = tasks.z_task.submit()

    a_result = a_flow()
    b_result = b_flow(a_result)
    c_result = c_flow(a_result)
    d_result = d_flow(b_result, c_result)

    # ensure z finishes if you need it before exit
    z_future.result()
    return d_result
Option B — Run subflows concurrently via wrapper tasks (recommended if you want concurrency across subflows) Wrap each subflow call in a
@task
and submit those tasks. This preserves your subflow organization but uses task futures for concurrency.
Copy code
from prefect import flow, task
from prefect.futures import wait

@task
def run_a():
    return a_flow()

@task
def run_b(a):
    return b_flow(a)

@task
def run_c(a):
    return c_flow(a)

@task
def run_d(b, c):
    return d_flow(b, c)

@flow(name="Sample Processing")
def sample_flow():
    z_future = tasks.z_task.submit()

    a_future = run_a.submit()
    b_future = run_b.submit(a_future)
    c_future = run_c.submit(a_future)
    d_future = run_d.submit(b_future, c_future)

    # Explicitly wait for what you care about
    wait([z_future, d_future])
    return d_future.result()
Option C — Kick off separate flow runs via deployments (advanced, decoupled) If
a_flow
,
b_flow
, etc. are deployed, you can launch them as subflow runs and wait on them:
Copy code
from prefect import flow
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run

@flow(name="Sample Processing")
def sample_flow():
    z_future = tasks.z_task.submit()

    a_run = run_deployment(name="a-flow/a-deployment", as_subflow=True)
    wait_for_flow_run(a_run.id)
    a_result = a_run.state.result()  # if you need the result

    b_run = run_deployment(name="b-flow/b-deployment", parameters={"a_result": a_result}, as_subflow=True)
    c_run = run_deployment(name="c-flow/c-deployment", parameters={"a_result": a_result}, as_subflow=True)

    wait_for_flow_run(b_run.id)
    wait_for_flow_run(c_run.id)

    d_run = run_deployment(name="d-flow/d-deployment",
                           parameters={"b_result": b_run.state.result(), "c_result": c_run.state.result()},
                           as_subflow=True)

    wait_for_flow_run(d_run.id)
    z_future.result()
    return d_run.state.result()
Notes - Only tasks have `.submit()`; flows are called like normal functions. - If you need concurrency, wrap flow calls in tasks (Option B) or use deployments (Option C). - Import
wait
from
prefect.futures
when you want to block at specific points: Prefect Futures API. If you can share how
a_flow
,
b_flow
,
c_flow
, and
d_flow
are defined (signatures) and whether you need them to run concurrently, I can tailor the best pattern for your case.
a
@Marvin Do I need to upgrade prefect to get submit for flows?
m
thought for 53.3 seconds
Short answer: No—upgrading won’t give you
Flow.submit
. In Prefect 3.x, flows do not have a
.submit()
method. Only tasks have
.submit()
. How to achieve what you want - Sequential subflows (simple):
Copy code
from prefect import flow

@flow(name="Sample Processing")
def sample_flow():
    z_future = tasks.z_task.submit()

    a_result = a_flow()
    b_result = b_flow(a_result)
    c_result = c_flow(a_result)
    d_result = d_flow(b_result, c_result)

    z_future.result()  # ensure z completes if needed
    return d_result
- Concurrent “subflows” via wrapper tasks (common pattern for concurrency):
Copy code
from prefect import flow, task
from prefect.futures import wait

@task
def run_a(): return a_flow()

@task
def run_b(a): return b_flow(a)

@task
def run_c(a): return c_flow(a)

@task
def run_d(b, c): return d_flow(b, c)

@flow(name="Sample Processing")
def sample_flow():
    z_future = tasks.z_task.submit()

    a_future = run_a.submit()
    b_future = run_b.submit(a_future)
    c_future = run_c.submit(a_future)
    d_future = run_d.submit(b_future, c_future)

    wait([z_future, d_future])
    return d_future.result()
- Launch separate flow runs via deployments (decoupled, if each flow has its own deployment):
Copy code
from prefect import flow
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run

@flow(name="Sample Processing")
def sample_flow():
    z_future = tasks.z_task.submit()

    a_run = run_deployment(name="a-flow/a-deployment", as_subflow=True)
    wait_for_flow_run(a_run.id)
    a_result = a_run.state.result()

    b_run = run_deployment(name="b-flow/b-deployment", parameters={"a_result": a_result}, as_subflow=True)
    c_run = run_deployment(name="c-flow/c-deployment", parameters={"a_result": a_result}, as_subflow=True)

    wait_for_flow_run(b_run.id)
    wait_for_flow_run(c_run.id)

    d_run = run_deployment(
        name="d-flow/d-deployment",
        parameters={"b_result": b_run.state.result(), "c_result": c_run.state.result()},
        as_subflow=True,
    )
    wait_for_flow_run(d_run.id)

    z_future.result()
    return d_run.state.result()
Helpful docs - Futures (for
wait
,
result()
): Prefect Futures API - Running deployments programmatically: run_deployment If you tell me whether you need true parallelism between your subflows or just modular structure (plus whether each subflow is deployed), I can recommend the best of the three patterns above for your case. And sorry for the earlier confusion around
.submit()
on flows!
a
@Marvin what is the normal way to get tasks to run concurrently given my prefect version? I feel like I am overcomplicating it
m
thought for 40.5 seconds
You’re not overcomplicating it. In Prefect 3.x, the normal way to run work concurrently is: - Use
@task
for units of work - Call
task.submit(...)
to start them - Pass futures to downstream tasks - Only call
.result()
(or
wait([...])
) when you need the values Minimal pattern
Copy code
from prefect import flow, task
from prefect.futures import wait

@task
def step_a():
    ...

@task
def step_b():
    ...

@task
def step_c(a, b):
    ...

@flow
def sample_flow():
    a_future = step_a.submit()
    b_future = step_b.submit()
    # a and b run concurrently

    c_future = step_c.submit(a_future, b_future)  # depends on a and b

    # Block only when you need results
    return c_future.result()
Control local parallelism (optional) - IO-bound work: use threads
Copy code
from prefect.task_runners import ThreadPoolTaskRunner

@flow(task_runner=ThreadPoolTaskRunner(max_workers=8))
def sample_flow():
    ...
- CPU-bound work: use processes
Copy code
from prefect.task_runners import ProcessPoolTaskRunner

@flow(task_runner=ProcessPoolTaskRunner(max_workers=4))
def sample_flow():
    ...
Tips - Don’t call
.result()
on upstream futures too early—that serializes execution. - Use
wait([f1, f2, ...])
if you want to block at a specific point or enforce a timeout. - Flows themselves don’t have `.submit()`; that’s only for tasks. Docs for reference - Futures API (wait, result, as_completed): Prefect Futures API - Task runners (threads vs processes): Task Runners If you tell me whether your workload is IO- or CPU-bound, I can suggest the best runner and
max_workers
settings.
a
@Marvin what is the process for concurrent subflows then?
m
thought for 60.5 seconds
Great question. In Prefect 3.x, there isn’t a built-in “submit subflow” API. The two normal patterns for concurrent subflows are: 1) In-process concurrency via wrapper tasks (simple, common) - Keep your flows modular, but run them concurrently by calling each flow inside a task and submitting those tasks. This keeps everything within one parent flow run/process. Example:
Copy code
from prefect import flow, task
from prefect.futures import wait
from prefect.task_runners import ThreadPoolTaskRunner

@task
def run_a():
    return a_flow()

@task
def run_b(a):
    return b_flow(a)

@task
def run_c(a):
    return c_flow(a)

@task
def run_d(b, c):
    return d_flow(b, c)

@flow(task_runner=ThreadPoolTaskRunner(max_workers=8))
def sample_processing():
    # run A, then B and C concurrently, then D
    a_f = run_a.submit()
    b_f = run_b.submit(a_f)
    c_f = run_c.submit(a_f)
    d_f = run_d.submit(b_f, c_f)

    # block only when needed
    return d_f.result()
- Notes: - Only tasks have
.submit()
. - Choose
ThreadPoolTaskRunner
(I/O bound) or
ProcessPoolTaskRunner
(CPU bound). - Docs: Task Runners, Futures API 2) Concurrent subflows as separate flow runs via deployments (distributed, scalable) - Give each subflow its own deployment and launch them with
run_deployment
. They’ll run on your workers concurrently. You can optionally wait and collect results. Example:
Copy code
from prefect import flow
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run

@flow
def orchestrator():
    # Kick off A as a subflow run
    a_run = run_deployment(name="a-flow/a-deployment", as_subflow=True)
    wait_for_flow_run(a_run.id)
    a_result = a_run.state.result()  # If you need A's result

    # Kick off B and C concurrently
    b_run = run_deployment(name="b-flow/b-deployment", parameters={"a_result": a_result}, as_subflow=True)
    c_run = run_deployment(name="c-flow/c-deployment", parameters={"a_result": a_result}, as_subflow=True)

    wait_for_flow_run(b_run.id)
    wait_for_flow_run(c_run.id)

    d_run = run_deployment(
        name="d-flow/d-deployment",
        parameters={"b_result": b_run.state.result(), "c_result": c_run.state.result()},
        as_subflow=True,
    )
    wait_for_flow_run(d_run.id)
    return d_run.state.result()
- Notes: - This uses your work pool/worker(s) for real parallelism across infrastructure. - Make sure results you pass between runs are serializable; consider enabling persistence if results are large. - API references: run_deployment, wait_for_flow_run Which route fits you best? - Want simple concurrency within one process? Use wrapper tasks (Option 1). - Want fully decoupled runs scaling across workers? Use deployments (Option 2). If you tell me whether you need in-process vs distributed parallelism (and whether each subflow already has a deployment), I can tailor the exact code for your case.
a
@Marvin how is @flow(task_runner=ThreadPoolTaskRunner(max_workers=8)) Different than having a worker pool set up as a service?
m
thought for 33.6 seconds
They solve different layers of concurrency: Task runner (ThreadPoolTaskRunner / ProcessPoolTaskRunner) - Scope: Inside a single flow run. - What it does: Runs your tasks concurrently within the same Python process for that flow run. - When to use: - IO-bound work → ThreadPoolTaskRunner(max_workers=N) - CPU-bound work → ProcessPoolTaskRunner(max_workers=N) to bypass the GIL - Characteristics: - Shared memory and environment (fast data handoff between tasks). - Failures are at the task level; the parent flow run remains the single unit of execution. - Does not scale across machines/containers by itself. Work pools and workers (running as a service) - Scope: Across multiple flow runs. - What it does: Workers poll your Prefect server for scheduled flow runs from a work pool and start each run (often in separate processes/containers/VMs). - When to use: - You want horizontal scaling, isolation, and scheduling across infrastructure. - You deploy flows and let workers pick them up. - Characteristics: - Parallelism across many flow runs (and across machines). - Strong isolation between runs (separate processes/containers). - Operational controls: retries, infrastructure config, environment, resource allocation per run. How they work together - A worker picks up a flow run from a work pool and starts it. - Inside that flow run, your task runner controls how tasks run concurrently. - For concurrent “subflows,” you can: - Stay in a single flow run: wrap subflow calls in tasks and submit them (uses the task runner for concurrency). - Or fan out across workers: give each subflow its own deployment and trigger them (e.g.,
run_deployment
) so they run as separate flow runs that workers execute concurrently. Rules of thumb - Need simple, in-process parallelism for tasks? Use a task runner on the flow: - IO-bound → ThreadPoolTaskRunner - CPU-bound → ProcessPoolTaskRunner - Need to scale out across machines or isolate workloads? Use deployments + a work pool/worker(s). - You can combine both: workers for concurrent flow runs; task runners for concurrency inside each run. Docs - Task runners: Task Runners - Workers and work pools (deployments): Work Pools and Workers
a
@Marvin Do all my flows need to be set up as deployments then to be called by run_deployment?
m
thought for 54.4 seconds
Short answer: Yes—`run_deployment` can only start flows that have been registered as deployments. Details -
run_deployment(name=...)
expects the target flow to have a deployment with that name (format: "<flow-name>/<deployment-name>"). Without a deployment, there’s nothing for workers to pick up. - Deployments package your flow plus infrastructure settings (work pool, env, image, storage, schedules). Workers poll your work pool and execute scheduled flow runs for those deployments. When you do NOT need deployments - If you are orchestrating subflows within a single flow run and are okay staying in-process, you can call subflows like regular functions or wrap them in tasks for concurrency. No deployments required for that pattern. When you DO need deployments - If you want to fan out subflows as separate flow runs executed by workers (for isolation or horizontal scaling), each subflow must have its own deployment. Then your orchestrator calls
run_deployment(...)
for each. Creating deployments in Prefect 3.x - From code:
Copy code
from prefect import flow

@flow
def a_flow(): ...

# Create and push a deployment for a_flow
a_flow.from_source("<https://github.com/org/repo>", entrypoint="path/to/file.py:a_flow") \
     .deploy(name="a-deployment", work_pool_name="my-pool")
- Or use the CLI:
Copy code
prefect deploy
Then follow the prompts to pick flows and configure the deployment(s). Docs - Deployments overview: Deployments in Prefect 3 - Programmatic deploy: Python deployment API -
run_deployment
reference: run_deployment If you share how you’re storing your code (local, Git, image) and your work pool type, I can show the exact deploy snippet or
prefect deploy
steps for your setup.
a
@Marvin if not deployed - how can I run a subflow concurrently within a flow?
m
thought for 23.8 seconds
You can do it in-process by wrapping each subflow call in a task and submitting those tasks. That’s the normal pattern in Prefect 3.x when you don’t use deployments. Why: only tasks have `.submit()`; flows are called like regular functions. Wrapping the flow calls in tasks lets you submit them for concurrent execution under the flow’s task runner. Example
Copy code
from prefect import flow, task
from prefect.futures import wait
from prefect.task_runners import ThreadPoolTaskRunner

# Your subflows
@flow
def a_flow(): ...
@flow
def b_flow(a): ...
@flow
def c_flow(a): ...
@flow
def d_flow(b, c): ...

# Thin task wrappers that call the subflows
@task
def run_a():
    return a_flow()

@task
def run_b(a):
    return b_flow(a)

@task
def run_c(a):
    return c_flow(a)

@task
def run_d(b, c):
    return d_flow(b, c)

@flow(task_runner=ThreadPoolTaskRunner(max_workers=8))
def orchestrator():
    # Kick off A
    a_f = run_a.submit()
    # B and C depend on A; they’ll run concurrently
    b_f = run_b.submit(a_f)
    c_f = run_c.submit(a_f)
    # D depends on B and C
    d_f = run_d.submit(b_f, c_f)

    # Block only when you need the result
    return d_f.result()
Notes - Choose
ThreadPoolTaskRunner
for IO-bound work,
ProcessPoolTaskRunner
for CPU-bound work. - Don’t call
.result()
on upstream futures too early or you’ll serialize execution. - If you want to process results as they finish, use
as_completed
from
prefect.futures
. Docs - Futures (wait, result, as_completed): Prefect Futures API - Task runners: Task Runners
a
@Marvin given my setup, show me a sample flow with concurrently running subflows and tasks without using submit
m
thought for 516.9 seconds
Great question — assuming you’re on Prefect 3.x, here’s a minimal example that runs multiple subflows concurrently and, within each subflow, runs multiple tasks concurrently — all without using
.submit
. Concurrency is driven by the task runners.
Copy code
from prefect import flow, task, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.futures import as_completed
import time, random

@task
def do_work(x: int, delay: float | None = None) -> int:
    logger = get_run_logger()
    delay = delay or random.uniform(0.5, 2.0)
    <http://logger.info|logger.info>(f"Task {x}: sleeping {delay:.2f}s")
    time.sleep(delay)
    result = x * x
    <http://logger.info|logger.info>(f"Task {x}: done -> {result}")
    return result

# Subflow: runs its tasks concurrently
@flow(name="child-subflow", task_runner=ThreadPoolTaskRunner(max_workers=4))
def child_subflow(name: str, items: list[int]) -> int:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"[{name}] launching {len(items)} tasks concurrently")

    # Calling tasks returns PrefectFutures; no .submit needed
    task_futures = [do_work(i) for i in items]

    total = 0
    # React to task results as they complete
    for fut in as_completed(task_futures):
        total += fut.result()

    <http://logger.info|logger.info>(f"[{name}] total = {total}")
    return total

# Parent flow: launches multiple subflows concurrently
@flow(name="parent-flow", task_runner=ThreadPoolTaskRunner(max_workers=8))
def parent_flow() -> int:
    logger = get_run_logger()
    <http://logger.info|logger.info>("Launching child subflows concurrently")

    # Calling subflows returns PrefectFlowRunFutures; still no .submit
    subflow_futures = [
        child_subflow(name="A", items=list(range(0, 5))),
        child_subflow(name="B", items=list(range(5, 10))),
        child_subflow(name="C", items=list(range(10, 15))),
    ]

    child_totals = []
    for fut in as_completed(subflow_futures):
        child_totals.append(fut.result())

    grand_total = sum(child_totals)
    <http://logger.info|logger.info>(f"Child totals: {child_totals} -> grand total: {grand_total}")
    return grand_total

if __name__ == "__main__":
    parent_flow()
Notes: - Replace
ThreadPoolTaskRunner
with
ProcessPoolTaskRunner
if your work is CPU-bound. - Concurrency is controlled by the
max_workers
in each flow’s
task_runner
. -
as_completed
lets you handle results as each task/subflow finishes;
wait
is also available if you prefer to block until all are done. Docs for more details: - Task runners: Prefect 3 Task Runners - Concurrency utilities: prefect.futures (source) - Global concurrency limits (optional): Global concurrency limits Quick check: are you targeting I/O-bound or CPU-bound work, and do you have any global concurrency limits configured in your workspace? I can tailor the runners and limits accordingly.
a
@Marvin ignore above. I want an example of a concurrent subflow in prefect 3
m
thought for 19.7 seconds
Here’s a minimal Prefect 3 example showing subflows running concurrently (no
.submit
needed). The parent flow schedules multiple subflows and processes results as they complete.
Copy code
from prefect import flow, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.futures import as_completed
import time

# A simple subflow that sleeps, then returns a value
@flow(name="child-subflow")
def child_subflow(name: str, delay: float) -> str:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"[{name}] starting (sleep {delay}s)")
    time.sleep(delay)
    msg = f"[{name}] done"
    <http://logger.info|logger.info>(msg)
    return msg

# Parent flow runs subflows concurrently by using a task runner
@flow(name="parent-flow", task_runner=ThreadPoolTaskRunner(max_workers=3))
def parent_flow():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Launching child subflows concurrently")

    # Calling subflows returns PrefectFlowRunFutures; no .submit required
    subflow_futures = [
        child_subflow("A", 2.0),
        child_subflow("B", 1.0),
        child_subflow("C", 3.0),
    ]

    results = []
    for fut in as_completed(subflow_futures):
        results.append(fut.result())

    <http://logger.info|logger.info>(f"Results (completion order): {results}")
    return results

if __name__ == "__main__":
    parent_flow()
Notes: - Concurrency is controlled by the parent flow’s
task_runner
(
ThreadPoolTaskRunner
here). Increase
max_workers
to allow more concurrent subflows. - You can swap to
ProcessPoolTaskRunner
if your subflows are CPU-bound. Docs: - Task runners: Prefect 3 Task Runners
a
@Marvin ignore above. I want an example of a concurrent subflow in prefect 3 with dependencies between the subflows
m
thought for 38.4 seconds
Here’s a minimal Prefect 3 example with subflows that run concurrently and a downstream subflow that depends on the results of two upstream subflows. No
.submit
is used; dependencies are expressed by passing futures as inputs.
Copy code
from prefect import flow, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.futures import as_completed
import time, random

@flow(name="producer-subflow")
def producer(name: str, delay: float | None = None) -> int:
    logger = get_run_logger()
    d = delay or random.uniform(0.5, 2.0)
    <http://logger.info|logger.info>(f"[{name}] starting (sleep {d:.2f}s)")
    time.sleep(d)
    value = len(name) * 10
    <http://logger.info|logger.info>(f"[{name}] done -> {value}")
    return value

@flow(name="dependent-subflow")
def dependent(name: str, a: int, b: int) -> int:
    logger = get_run_logger()
    result = a + b
    <http://logger.info|logger.info>(f"[{name}] got a={a}, b={b} -> sum={result}")
    return result

@flow(name="parent-flow", task_runner=ThreadPoolTaskRunner(max_workers=4))
def parent_flow():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Launching upstream subflows concurrently")

    # Start three subflows concurrently
    a_fut = producer("A", 2.0)
    b_fut = producer("B", 1.0)
    c_fut = producer("C", 3.0)

    # Launch a dependent subflow that requires A and B.
    # Passing futures as arguments creates dependencies; D will start
    # only after A and B complete, but this call returns a future immediately.
    d_fut = dependent("D", a=a_fut, b=b_fut)

    # You can continue orchestrating; for example, react to completions as they happen
    results = {}
    for fut in as_completed([c_fut, d_fut]):
        val = fut.result()
        key = "C" if fut is c_fut else "D"
        results[key] = val
        <http://logger.info|logger.info>(f"Subflow {key} completed with result={val}")

    <http://logger.info|logger.info>(f"Final results: {results}")
    return results

if __name__ == "__main__":
    parent_flow()
Key points: - Concurrency is controlled by the parent flow’s
task_runner
(
ThreadPoolTaskRunner
here). Increase
max_workers
to allow more concurrent subflows. - Dependencies are created by passing upstream futures (
a_fut
,
b_fut
) as parameters to the dependent subflow (
dependent
). Prefect waits for those inputs before starting the dependent run. - Use
as_completed
(or
wait
) to coordinate and process results in real time. Docs: - Task runners: Prefect 3 Task Runners - Futures utilities: prefect.futures source
a
@Marvin Can I run the flows with submit()?
m
thought for 128.4 seconds
Short answer: In Prefect 3.x, flows don’t have a
.submit()
method. How to run flows concurrently instead: - Inside a flow: just call subflows like regular functions. Each call returns a PrefectFlowRunFuture and will run concurrently when the parent flow uses a task runner (e.g., ThreadPoolTaskRunner). You can coordinate with `as_completed`/`wait`. - Docs: Task runners, source for futures: prefect.futures - Outside a flow (e.g., from a script or service): use
prefect.runner.submit_to_runner(...)
to submit one or many flow runs concurrently, then optionally
wait_for_submitted_runs()
to block until they finish. - Source: prefect.runner module Example: submit flows concurrently from a plain Python script
Copy code
from prefect import flow
from prefect.runner import submit_to_runner, wait_for_submitted_runs

@flow
def my_flow(n: int) -> int:
    return n * 2

# Submit multiple flow runs concurrently
runs = submit_to_runner(
    my_flow,
    parameters=[{"n": i} for i in range(5)],  # one run per parameter set
)

# Optionally block until all submitted runs complete
wait_for_submitted_runs()

print("Submitted runs:", runs)  # FlowRun objects with IDs/states
If you tell me whether you want to launch flows from inside another flow or from a standalone script/service, I can tailor the example further.
a
@Marvin could I use run_deployment?
m
thought for 136.1 seconds
Yes — in Prefect 3 you can trigger deployments programmatically with
prefect.deployments.run_deployment(...)
. Key points: - It creates a new flow run from an existing deployment and returns a FlowRun object. - Set
as_subflow=True
(default) to show the triggered run as a subflow in the parent’s UI; set
False
to keep it independent. - To run multiple deployments concurrently from a flow, wrap
run_deployment
in a Prefect task and use a task runner so calls happen in parallel. Example: run multiple deployments concurrently from a parent flow
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.deployments import run_deployment
from prefect.futures import as_completed

@task
def trigger_deployment(name: str, params: dict | None = None) -> str:
    fr = run_deployment(
        name=name,
        parameters=params,
        as_subflow=True,   # show as subflow in UI (optional)
        flow_run_name=f"triggered-{name.replace('/', '-')}"
    )
    # You can inspect fr.state here if needed
    return str(fr.id)

@flow(name="orchestrator", task_runner=ThreadPoolTaskRunner(max_workers=5))
def orchestrator():
    futures = [
        trigger_deployment.submit("flow-a/dev", {"x": 1}),
        trigger_deployment.submit("flow-b/dev", {"x": 2}),
        trigger_deployment.submit("flow-c/dev", {"x": 3}),
    ]

    run_ids = []
    for fut in as_completed(futures):
        run_ids.append(fut.result())

    return run_ids

if __name__ == "__main__":
    orchestrator()
Notes: - If you call
run_deployment
directly in a loop inside a flow, it will run serially. Wrapping it in a
@task
and using a task runner (threads/processes) enables concurrency. - Use
work_queue_name
,
idempotency_key
, or
scheduled_time
on
run_deployment
as needed. References: - How to trigger deployments: Run deployments - API (signature): prefect.deployments.run_deployment (source) - Task runners: Task runners
a
@Marvin Can I set PREFECT_UI_API_URL to be data.example.com/prefect? Or will the prefect path create issues
m
thought for 517.3 seconds
Short answer: yes—you can point PREFECT_UI_API_URL at a URL with a subpath. Just make sure it points to where the API is actually exposed through your proxy. Recommendations for a /prefect subpath (Prefect 3.x): - Set the UI’s API endpoint for the browser: - PREFECT_UI_API_URL=http://data.example.com/prefect/api - Mount the UI under the subpath so assets/router work: - PREFECT_SERVER_UI_SERVE_BASE=/prefect - (Alias also supported in many setups: PREFECT_UI_SERVE_BASE=/prefect) - Make generated links correct: - PREFECT_UI_URL=http://data.example.com/prefect - Point CLI/workers to the same API (if they should use the proxy): - PREFECT_API_URL=http://data.example.com/prefect/api Reverse proxy notes: - Proxy /prefect to the UI and /prefect/api -> upstream /api on the Prefect server. - Don’t forget WebSocket upgrade on /prefect/api/* for logs/events streaming. Example Nginx snippet:
Copy code
location /prefect/ {
  proxy_pass <http://prefect-server-ui/;>   # serves the UI; server mounted with PREFECT_SERVER_UI_SERVE_BASE=/prefect
  proxy_set_header Host $host;
}

location /prefect/api/ {
  proxy_pass <http://prefect-server/api/;>  # forward to Prefect API root
  proxy_http_version 1.1;
  proxy_set_header Upgrade $http_upgrade;
  proxy_set_header Connection "upgrade";
  proxy_set_header Host $host;
}
Docs and references: - Self-hosted Prefect (3.x) - Settings reference If you can share what reverse proxy you’re using (Nginx, Traefik, etc.) and how you’re running Prefect (docker-compose, k8s), I can tailor the exact config.
a
@Marvin Failing to load assets because they are being requested without the prefix. Like assets/...js instead of prefect/assets/...js
m
thought for 48.7 seconds
Got it — that symptom means the UI isn’t aware it’s being served under /prefect. Assets will be requested at /assets/... unless you tell the server to mount the UI at a base path. What to change (Prefect 3.x): - On the Prefect server process, set the UI serve base and restart: - PREFECT_SERVER_UI_SERVE_BASE=/prefect - Point the UI to the API through your proxy: - PREFECT_UI_API_URL=http://data.example.com/prefect/api - Optional but recommended so generated links are correct: - PREFECT_UI_URL=http://data.example.com/prefect - If your CLI/workers should go through the proxy too: - PREFECT_API_URL=http://data.example.com/prefect/api Reverse-proxy gotchas: - When you mount the UI at /prefect, preserve the prefix when proxying. In Nginx, use proxy_pass without a trailing slash so /prefect stays in the upstream request:
Copy code
# UI (static assets and SPA routes)
location /prefect/ {
  proxy_pass <http://prefect-server>;   # note: NO trailing slash
  proxy_set_header Host $host;
}

# API (includes websockets for logs/events)
location /prefect/api/ {
  proxy_pass <http://prefect-server/api/;>
  proxy_http_version 1.1;
  proxy_set_header Upgrade $http_upgrade;
  proxy_set_header Connection "upgrade";
  proxy_set_header Host $host;
}
- If you use a trailing slash like proxy_pass http://prefect-server/; under location /prefect/, Nginx will strip the /prefect prefix and you’ll see exactly what you’re seeing (assets requested at /assets/...). Verify after restart: - Load the UI at http://data.example.com/prefect and check that network requests for assets are to /prefect/assets/... - You can also inspect effective settings:
Copy code
prefect config view --show-defaults
Docs and examples: - Self-hosted Prefect (3.x) - Reverse proxy example (community) If you share how you’re running Prefect (docker/k8s/bare metal) and which proxy (Nginx/Traefik/etc.), I can tailor the exact env and proxy rules.