<@ULVA73B9P> I run flow in kubernetes jobs using a...
# ask-marvin
s
@Marvin I run flow in kubernetes jobs using a kubernetes work-pool. How can I run a shell script after the flow would be executed ? I'd like to call the prefect flow run execution inside the shell script of my job and then do a curl call. Is that possible ?
m
thought for 133.4 seconds
Yes — you can wrap Prefect’s execution in your own shell script in a Kubernetes work pool job and run whatever you want after the flow finishes. Key points: - Inside a Kubernetes work-pool job, the container is started with
prefect flow-run execute
. The flow run ID is available in the container as
PREFECT__FLOW_RUN_ID
. - You can override the container’s command/args in your work pool’s Job template to run a shell script that first executes the flow and then does your
curl
. - Be sure your script returns the proper exit code so Prefect correctly records the run as success/failure. Example entrypoint.sh you can bake into your image:
Copy code
#!/bin/sh
set -u

status=success
# Execute the Prefect flow run (ID is provided via env)
prefect flow-run execute "$PREFECT__FLOW_RUN_ID" || status=failure

# Your post-run action
curl -sS -X POST "<https://example.com/hook>" \
  -H "Content-Type: application/json" \
  -d "{\"run_id\":\"$PREFECT__FLOW_RUN_ID\",\"status\":\"$status\"}" || true

# Propagate the flow's status to the container exit code
[ "$status" = "success" ]
Then set your work pool’s Kubernetes Job template to use your script instead of the default command. For example, set: - command: ["/bin/sh", "-c"] - args: ["/entrypoint.sh"] If you don’t want to build a new image, you can inline the script in the command/args:
Copy code
command:
  - /bin/sh
  - -c
args:
  - |
    set -u
    status=success
    prefect flow-run execute "$PREFECT__FLOW_RUN_ID" || status=failure
    curl -sS "<https://example.com/hook?run_id=$PREFECT__FLOW_RUN_ID&status=$status>" || true
    [ "$status" = "success" ]
Notes: - You do not need to pass the run ID explicitly;
prefect flow-run execute
will read
PREFECT__FLOW_RUN_ID
if the argument is omitted. - Make sure any credentials or target URLs needed by your
curl
are provided via env vars or Kubernetes Secrets in the Job template. Alternative (often simpler): use Prefect Automations to send a webhook on flow run state changes (success, failure, etc.) and skip modifying your container. This is a good fit if your goal is just to call a URL when the run finishes. References: - Kubernetes work pools and job template customization: Prefect 3.x: Kubernetes deployments/work pools - Workers overview: Prefect 3.x: Workers - Flow run execution CLI:
prefect flow-run execute --help
(available in the container during execution) If you share how you’re managing your Job template (UI vs. code) and whether you prefer the script or Automation approach, I can provide the exact steps for your setup.
s
@Marvin Ok that worked! thanks. Now I have jobs running in kubernetes: it's working fine, but suddenly some sub-tasks stop doing anything while still marked as "running" in the UI of Prefect. They run forever and eventually time out. I'm using ProcessTaskRunner for the first level of tasks, but internally I'm only calling tasks with simple function calls. Any idea ?
m
thought for 890.8 seconds
Glad it’s mostly working — let’s figure out the “stuck running” subtasks. Short version: nesting Prefect tasks or background tasks inside a task that’s running under a process-based task runner is a common cause of stalls/hangs. In Prefect 3.x, orchestration should live in flows; tasks should just do work and call plain functions. Mixing
ProcessPoolTaskRunner
with task-in-task or background tasks (
.delay()
/
.result_async()
) can deadlock or orphan state updates, which shows up in the UI as “running” forever. A few focused checks and fixes: - Are you calling
@task
from inside another
@task
? - Avoid this. Inside a task, call a plain function instead. Reserve
@task
calls (or
.delay()
) for the flow. - If you need the inner unit to be a task, lift that orchestration up to the flow. - Are you scheduling background tasks from inside a task (e.g.,
sub.delay()
then
await sub.result_async()
inside a process-run task)? - Move the `.delay()`/`result_async()` calls to the flow level. Process-based runners don’t play well with scheduling/awaiting other tasks from inside their workers. - Consider the top-level task runner - If your goal is to fan out work to multiple Task Workers, prefer letting Prefect background tasks do the parallelism across workers and keep the flow’s task runner simple (default
PrefectTaskRunner
or
ThreadPoolTaskRunner
). You generally don’t need a
ProcessPoolTaskRunner
at the top level if you’re distributing work to workers. - If you truly need CPU-bound parallelism inside a single pod, avoid calling other tasks from those processes; only call functions. - Pickling and process-safety - Anything executed by
ProcessPoolTaskRunner
must be picklable and must not rely on non-fork-safe globals or event loops. Hidden closures, lambdas, or objects with open connections can cause subtle hangs. - Infra/resource issues - Check the Kubernetes pod logs for OOMKilled or worker-process exits. An abruptly killed worker process can leave the task state “running” until the runner times out. - Ensure CPU/memory requests/limits are adequate; CPU throttling can look like a “stuck” task. - Logging and timeouts - Add logging inside the suspected hang points to confirm progress. - Set task-level timeouts via
@task(timeout_seconds=...)
to fail fast instead of hanging for a very long time. - Enable debug logs to see state transitions: set
PREFECT_LOGGING_LEVEL=DEBUG
. Recommended pattern for fan-out with background tasks in Prefect 3.x (run this at the flow level, not from within a task):
Copy code
from prefect import flow, task
import asyncio

@task
def do_work(x):
    # pure function-style work here
    return x * 2

@flow
async def main_flow(items):
    # fan out across task workers
    futures = [do_work.delay(i) for i in items]

    # await all results (non-blocking)
    results = await asyncio.gather(*(f.result_async() for f in futures))
    return results
If you currently have something like this (task calling task), refactor:
Copy code
@task
def outer_task(item):
    # BAD: calling a task like a function or scheduling background tasks here
    return inner_task(item)  # or inner_task.delay(item)
To either: - Make the inner piece a plain function and keep
outer_task
as the only task:
Copy code
def inner_fn(item):
    ...

@task
def outer_task(item):
    return inner_fn(item)
- Or lift the orchestration to a flow and call both as tasks there:
Copy code
@task
def inner_task(item): ...

@flow
def orchestrate(items):
    return [inner_task(i) for i in items]  # or use .delay()/.result_async() here
Helpful references: - Run background tasks in Prefect 3.x: Run background tasks - Kubernetes deployment/infrastructure guide: Kubernetes guide
- Task runners (source for available runners): prefect.task_runners A couple of clarifying questions to pinpoint it: - Are the “stuck” subtasks being scheduled via
.delay()
from inside another task that’s running under
ProcessPoolTaskRunner
? - Do you have an example snippet of how you’re calling the inner tasks today? - Have you seen any OOM or worker exit messages in the pod logs around the time these tasks stop progressing? If you share a minimal snippet of your current pattern, I can suggest the smallest change to avoid the hang.