Syméon del Marmol
10/10/2025, 2:43 PMMarvin
10/10/2025, 2:44 PMMarvin
10/10/2025, 2:46 PMprefect flow-run execute
. The flow run ID is available in the container as PREFECT__FLOW_RUN_ID
.
- You can override the container’s command/args in your work pool’s Job template to run a shell script that first executes the flow and then does your curl
.
- Be sure your script returns the proper exit code so Prefect correctly records the run as success/failure.
Example entrypoint.sh you can bake into your image:
#!/bin/sh
set -u
status=success
# Execute the Prefect flow run (ID is provided via env)
prefect flow-run execute "$PREFECT__FLOW_RUN_ID" || status=failure
# Your post-run action
curl -sS -X POST "<https://example.com/hook>" \
-H "Content-Type: application/json" \
-d "{\"run_id\":\"$PREFECT__FLOW_RUN_ID\",\"status\":\"$status\"}" || true
# Propagate the flow's status to the container exit code
[ "$status" = "success" ]
Then set your work pool’s Kubernetes Job template to use your script instead of the default command. For example, set:
- command: ["/bin/sh", "-c"]
- args: ["/entrypoint.sh"]
If you don’t want to build a new image, you can inline the script in the command/args:
command:
- /bin/sh
- -c
args:
- |
set -u
status=success
prefect flow-run execute "$PREFECT__FLOW_RUN_ID" || status=failure
curl -sS "<https://example.com/hook?run_id=$PREFECT__FLOW_RUN_ID&status=$status>" || true
[ "$status" = "success" ]
Notes:
- You do not need to pass the run ID explicitly; prefect flow-run execute
will read PREFECT__FLOW_RUN_ID
if the argument is omitted.
- Make sure any credentials or target URLs needed by your curl
are provided via env vars or Kubernetes Secrets in the Job template.
Alternative (often simpler): use Prefect Automations to send a webhook on flow run state changes (success, failure, etc.) and skip modifying your container. This is a good fit if your goal is just to call a URL when the run finishes.
References:
- Kubernetes work pools and job template customization: Prefect 3.x: Kubernetes deployments/work pools
- Workers overview: Prefect 3.x: Workers
- Flow run execution CLI: prefect flow-run execute --help
(available in the container during execution)
If you share how you’re managing your Job template (UI vs. code) and whether you prefer the script or Automation approach, I can provide the exact steps for your setup.Syméon del Marmol
10/10/2025, 5:35 PMMarvin
10/10/2025, 5:35 PMMarvin
10/10/2025, 5:50 PMProcessPoolTaskRunner
with task-in-task or background tasks (.delay()
/ .result_async()
) can deadlock or orphan state updates, which shows up in the UI as “running” forever.
A few focused checks and fixes:
- Are you calling @task
from inside another @task
?
- Avoid this. Inside a task, call a plain function instead. Reserve @task
calls (or .delay()
) for the flow.
- If you need the inner unit to be a task, lift that orchestration up to the flow.
- Are you scheduling background tasks from inside a task (e.g., sub.delay()
then await sub.result_async()
inside a process-run task)?
- Move the `.delay()`/`result_async()` calls to the flow level. Process-based runners don’t play well with scheduling/awaiting other tasks from inside their workers.
- Consider the top-level task runner
- If your goal is to fan out work to multiple Task Workers, prefer letting Prefect background tasks do the parallelism across workers and keep the flow’s task runner simple (default PrefectTaskRunner
or ThreadPoolTaskRunner
). You generally don’t need a ProcessPoolTaskRunner
at the top level if you’re distributing work to workers.
- If you truly need CPU-bound parallelism inside a single pod, avoid calling other tasks from those processes; only call functions.
- Pickling and process-safety
- Anything executed by ProcessPoolTaskRunner
must be picklable and must not rely on non-fork-safe globals or event loops. Hidden closures, lambdas, or objects with open connections can cause subtle hangs.
- Infra/resource issues
- Check the Kubernetes pod logs for OOMKilled or worker-process exits. An abruptly killed worker process can leave the task state “running” until the runner times out.
- Ensure CPU/memory requests/limits are adequate; CPU throttling can look like a “stuck” task.
- Logging and timeouts
- Add logging inside the suspected hang points to confirm progress.
- Set task-level timeouts via @task(timeout_seconds=...)
to fail fast instead of hanging for a very long time.
- Enable debug logs to see state transitions: set PREFECT_LOGGING_LEVEL=DEBUG
.
Recommended pattern for fan-out with background tasks in Prefect 3.x (run this at the flow level, not from within a task):
from prefect import flow, task
import asyncio
@task
def do_work(x):
# pure function-style work here
return x * 2
@flow
async def main_flow(items):
# fan out across task workers
futures = [do_work.delay(i) for i in items]
# await all results (non-blocking)
results = await asyncio.gather(*(f.result_async() for f in futures))
return results
If you currently have something like this (task calling task), refactor:
@task
def outer_task(item):
# BAD: calling a task like a function or scheduling background tasks here
return inner_task(item) # or inner_task.delay(item)
To either:
- Make the inner piece a plain function and keep outer_task
as the only task:
def inner_fn(item):
...
@task
def outer_task(item):
return inner_fn(item)
- Or lift the orchestration to a flow and call both as tasks there:
@task
def inner_task(item): ...
@flow
def orchestrate(items):
return [inner_task(i) for i in items] # or use .delay()/.result_async() here
Helpful references:
- Run background tasks in Prefect 3.x: Run background tasks
- Kubernetes deployment/infrastructure guide: Kubernetes guideMarvin
10/10/2025, 5:50 PM.delay()
from inside another task that’s running under ProcessPoolTaskRunner
?
- Do you have an example snippet of how you’re calling the inner tasks today?
- Have you seen any OOM or worker exit messages in the pod logs around the time these tasks stop progressing?
If you share a minimal snippet of your current pattern, I can suggest the smallest change to avoid the hang.