<@ULVA73B9P> Is there a way to set the task_runner...
# ask-marvin
j
@Marvin Is there a way to set the task_runner for a flow when the flow is invoked, like passing the TaskRunner as a parameter?
m
In Prefect 3.x, you can configure a flow to use a specific task runner by providing the runner to the
task_runner
keyword of the flow decorator. This means you can set the task runner when defining the flow, but it doesn't directly allow for passing the task runner as a parameter at invocation time. Here's a basic example of how to set the task runner using the flow decorator:
Copy code
python
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner

@flow(task_runner=ThreadPoolTaskRunner())
def my_flow():
    # Your tasks here
    pass
If you need to dynamically choose a task runner at runtime, you might consider using nested flows, where different flows can have different task runners configured. For more details, you can refer to the Prefect Task Runners documentation.
j
I get type error from pyright when I try to set the task_runner parameter in the decorator
n
hi @Joe w.r.t the static typing issue, what version of prefect are you on? an MRE for that type error would be very appreciated! as far as dynamically setting
task_runner
you should be able to use
with_options
like
my_task.with_options(task_runner=...).submit(...)
j
3.1.14. I found in the bug tracker that claims it was fixed in 3.1.12. I tried both, and they both have the same problem.
n
linking the issue you're referring to and an MRE would be appreciated!
j
I'm working on that.
🙏 1
Here is a minimal case that I can come up with:
Copy code
import time

from prefect import flow
from prefect.logging import get_run_logger
from prefect_dask.task_runners import DaskTaskRunner


@flow(task_runner=DaskTaskRunner())
def test_flow(max_count):
  logger = get_run_logger()
  <http://logger.info|logger.info>("Start Test flow")

  for i in range(max_count):
    time.sleep(1)


if __name__ == "__main__":
  test_flow(5)
This was the last thread that I found this problem last year, but didn't have time to chase it down: https://prefect-community.slack.com/archives/CL09KU1K7/p1734407135220139?thread_ts=1734407110.153419&amp;cid=CL09KU1K7
n
thanks! that's helpful, i was looking at this last week (for thread pool task runner at least, dask would have to be done separately I think) - we'll take a closer look when we can, feel free to open a PR if you have capacity!
j
So it looks like it's a known problem. Yeah, the type hinting was a little weird that it was None.
n
yea we recently began a campaign to make everything more type complete, but there are still plenty of outstanding typing issues, especially in integrations like
prefect-dask
and stuff related to legacy sync/async compat