<@ULVA73B9P> How can I get the task runner object ...
# ask-marvin
j
@Marvin How can I get the task runner object for a particular flow within that flow?
m
thought for 77.6 seconds
In Prefect 3.x, call get_run_context() inside the flow. It returns an EngineContext, which exposes the task_runner attribute. Example: from prefect import flow, get_run_logger from prefect.context import get_run_context from prefect.task_runners import ThreadPoolTaskRunner @flow(name="example", task_runner=ThreadPoolTaskRunner(max_workers=8)) def my_flow(): logger = get_run_logger() ctx = get_run_context() # EngineContext inside a flow tr = ctx.task_runner logger.info(f"Task runner: {tr} (type={type(tr).name})") return tr if name == "__main__": my_flow() Notes: - get_run_context() must be called while the flow is running; outside of a run it raises a MissingContextError. - Inside tasks (not flows), get_run_context() returns a TaskRunContext which does not have task_runner. If you need it in tasks, fetch it in the flow and pass it down explicitly. Docs: Runtime context in Prefect 3.x - https://docs-3.prefect.io/v3/concepts/runtime-context Yes, the context knows what’s going on. Someone has to.
j
@Marvin is the task runner replaced when retrying a failed flow?
m
thought for 269.6 seconds
Short answer: yes. Each flow retry gets a fresh task runner. Details: - On every flow run attempt, Prefect creates a new EngineContext and (re)instantiates the task runner for that attempt. The runner is started when the attempt begins and closed when it ends. - If you supplied a runner class (e.g., task_runner=ThreadPoolTaskRunner), it’s instantiated anew per attempt. If you supplied an instance, Prefect duplicates it per attempt so you still get a clean runner with the same config. - Task retries are different: they happen within the same flow attempt and therefore use the same task runner instance for that attempt. - If your retries are picked up by a worker that launches new infrastructure (new process/container), the runner will obviously be new as well. You can confirm in a pinch by logging the object identity during the flow: ctx = get_run_context() logger.info(f"task runner id: {id(ctx.task_runner)}") Related docs: - Runtime context: https://docs-3.prefect.io/v3/concepts/runtime-context - Task runners: https://docs-3.prefect.io/v3/concepts/task-runners - Retries: https://docs-3.prefect.io/v3/how-to-guides/workflows/retries Fresh runner per attempt. Clean slate. Pity we can’t do the same for Mondays.
j
@Marvin When my flow retries, I’m seeing a task runner of a different class than the one I specified in the
task_runner
argument to the flow decorator. Why is that?
m
thought for 434.6 seconds
Short version: your retry probably ran in a different execution mode than the first attempt. Prefect will re-create the EngineContext on each attempt and may swap in a different task runner when the environment changes—most notably when a Task Worker is involved. Likely reasons you’re seeing a different class: - A Task Worker picked up the retry: When tasks are executed by a Task Worker, Prefect uses PrefectTaskRunner to submit tasks to that worker. If the first attempt ran locally (ThreadPoolTaskRunner) and the retry landed where a Task Worker is available, you’ll see PrefectTaskRunner on the retry. - Different infrastructure/worker picked up the retry: Retries can be scheduled to a different machine or process than the original attempt. If that environment has different capabilities (e.g., task worker running vs not), the runner class can differ. - Fresh attempt, fresh runner: Each flow attempt creates a new EngineContext and runner; what gets instantiated depends on the runtime conditions of that attempt, not the previous one. What won’t cause it: - Passing a class vs an instance for task_runner. Prefect will instantiate/duplicate your runner per attempt; that alone doesn’t change the class. How to confirm: - Log the runner each attempt: - from prefect.context import get_run_context - tr = get_run_context().task_runner - logger.info(f"Task runner: {tr} ({type(tr).name})") - Check whether a Task Worker is running in the environment that executed the retry. How to keep it consistent: - If you want local threaded execution, don’t run a Task Worker in the environment executing your flow attempts; Prefect will then honor ThreadPoolTaskRunner. - If you want distributed task execution, expect PrefectTaskRunner. Docs for reference: - Task runners: https://docs-3.prefect.io/v3/concepts/task-runners - Runtime context: https://docs-3.prefect.io/v3/concepts/runtime-context - Retries: https://docs-3.prefect.io/v3/how-to-guides/workflows/retries If you can share how you’re running the flow (local vs deployment), whether a Task Worker is running, and the runner class you see on each attempt, I can pinpoint the exact cause instead of grumbling about it abstractly.
j
@Marvin this is in an EKS deployment. There is no task worker running. On the first attempt the task runner class is
MultiprocessTaskRunner
(from https://github.com/rpeden/prefect-multiprocess), and on the retry the task runner is a
ThreadPoolTaskRunner
.
m
thought for 90.1 seconds
Short answer: the retry is starting in an environment where Prefect can’t (or won’t) re-create your MultiprocessTaskRunner, so it falls back to the default ThreadPoolTaskRunner. Common culprits in EKS/Kubernetes: - The plugin isn’t available in the retry pod: If your deployment uses a floating “latest” image tag or a different image for retries, the pod that runs the retry may not have prefect-multiprocess installed. Result: Prefect can’t import/instantiate your runner, so it defaults to ThreadPoolTaskRunner. - Passing an instance instead of a class: Each attempt creates a fresh EngineContext and runner. If you pass an instance (e.g., task_runner=MultiprocessTaskRunner()), Prefect tries to duplicate/recreate it per attempt. If the runner can’t be duplicated cleanly (serialization/import issues), Prefect defaults to ThreadPoolTaskRunner. Prefer passing the class: task_runner=MultiprocessTaskRunner. - Multiprocessing constraints in the retry pod: Some K8s setups or base images restrict forking or shared memory (/dev/shm). If the multiprocess runner fails to start in that pod, you’ll get the fallback. - Image drift between attempts: Even with the same “tag,” retries might land on nodes pulling different cached layers. If one image variant lacked the plugin, you’ll see the class change on that attempt. What to check next - Pin your image by digest or a fixed tag and verify the plugin is installed in that image: - pip show prefect-multiprocess (or the package name it uses) - Pass the runner class, not an instance: - from prefect_multiprocess import MultiprocessTaskRunner - @flow(task_runner=MultiprocessTaskRunner) - Turn on debug logs and look for warnings/errors about runner initialization on the retry. - PREFECT_LOGGING_LEVEL=DEBUG - Log the environment on each attempt to prove it’s identical: - sys.version, platform, container image tag, and whether prefect-multiprocess can be imported. - If you’re setting a multiprocessing start method, ensure it’s compatible in K8s (spawn often behaves better than fork in constrained environments). If you can share: - How you set task_runner (class vs instance), - The container image tag/digest, - Any warnings in the retry attempt logs about initializing the runner, …I can narrow it down further instead of blaming the universe’s general hostility to parallelism. References: - Task runners: https://docs-3.prefect.io/v3/concepts/task-runners - Runtime context: https://docs-3.prefect.io/v3/concepts/runtime-context - Multiprocess runner plugin: https://github.com/rpeden/prefect-multiprocess
c
Hi Jim - that could be a bug if it’s consistently happening; if you open a bug report with a reproducible example we can probably get to the bottom of it more easily than Marvin in this case
j
Thanks, yeah, I'm going to see if I can boil it down to a more minimal reproducer
thank you 1
@Chris White – okay, I have a much simplified reproducer which still gives me the same behavior. What’s the best way to open a bug report?