Joe
01/28/2025, 6:22 PMMarvin
01/28/2025, 6:22 PMtask_runner
keyword of the flow decorator. This means you can set the task runner when defining the flow, but it doesn't directly allow for passing the task runner as a parameter at invocation time.
Here's a basic example of how to set the task runner using the flow decorator:
python
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner
@flow(task_runner=ThreadPoolTaskRunner())
def my_flow():
# Your tasks here
pass
If you need to dynamically choose a task runner at runtime, you might consider using nested flows, where different flows can have different task runners configured.
For more details, you can refer to the Prefect Task Runners documentation.Joe
01/28/2025, 6:27 PMNate
01/28/2025, 6:31 PMtask_runner
you should be able to use with_options
like my_task.with_options(task_runner=...).submit(...)
Joe
01/28/2025, 6:32 PMNate
01/28/2025, 6:34 PMJoe
01/28/2025, 6:35 PMJoe
01/28/2025, 6:46 PMimport time
from prefect import flow
from prefect.logging import get_run_logger
from prefect_dask.task_runners import DaskTaskRunner
@flow(task_runner=DaskTaskRunner())
def test_flow(max_count):
logger = get_run_logger()
<http://logger.info|logger.info>("Start Test flow")
for i in range(max_count):
time.sleep(1)
if __name__ == "__main__":
test_flow(5)
Joe
01/28/2025, 6:47 PMNate
01/28/2025, 6:53 PMJoe
01/28/2025, 6:55 PMNate
01/28/2025, 6:57 PMprefect-dask
and stuff related to legacy sync/async compat