Lina Carmona
07/01/2025, 11:30 AM...
@flow(
name="Scrapers sub-flow",
task_runner=DaskTaskRunner(
cluster_kwargs={"n_workers": 1, "processes": False, "threads_per_worker": 20}
),
)
def scraper(splitted_files: list[str], run_timestamp: str, success_threshold: float):
logger = get_run_logger()
...
Nate
07/01/2025, 12:59 PMThreadPoolTaskRunner
which will run each submitted task run in a single thread (per ThreadPoolExecutor
from standard library python). so if you were using DaskTaskRunner
and things were working, all you should have to do to move to the default thread pool task runner is remove the task_runner
keyword argument from @flow
...
@flow(
name="Scrapers sub-flow"
)
def scraper(splitted_files: list[str], run_timestamp: str, success_threshold: float):
logger = get_run_logger()
...
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by