Hristo Papazov
11/22/2024, 6:24 PMThreadPoolTaskRunner
with max_workers to run my tasks concurrently. I ould like to be able to set the value for max_workers dynamically (via parameters or some other mechanism). Has anyone done something similar? Here is a simple flow that I am using to try thing out
@task
async def run_batch(index: int) -> None:
logger = get_run_logger()
<http://logger.info|logger.info>(f"Processing batch, {index}")
return None
@flow(task_runner=ThreadPoolTaskRunner(max_workers=2))
async def entrypoint(parameters: JobParameters):
logger = get_run_logger()
task_futures = []
for i in range(0, parameters.total_count, parameters.batch_size):
task_futures.append(run_batch.submit(i))
for future in task_futures:
future.wait()
<http://logger.info|logger.info>("All tasks have completed.")
Nate
11/22/2024, 6:31 PMIn [1]: from prefect import task, flow
In [2]: from prefect.task_runners import ThreadPoolTaskRunner
In [3]: @flow
...: def some_flow():
...: task(lambda x: x).map(range(10)).result()
...: @flow
...: def f(n_workers: int):
...: some_flow.with_options(task_runner=ThreadPoolTaskRunner(max_workers=n_workers))()
...:
In [4]: f(24)
Hristo Papazov
11/22/2024, 7:43 PM