Jon
01/18/2023, 4:54 PMcreate_flow_run.map()
and it's spinning up too many threads. im running out of threads/memory. i would rather execute sequentiallyEthienne Marcelin
01/18/2023, 4:55 PMSequentialTaskRunner
in your flow ? Or is it just in prefect2 🤔Jon
01/18/2023, 4:58 PMKyle McChesney
01/18/2023, 9:32 PMfrom prefect.executors import LocalExecutor
with Flow(
'my_flow',
executor=LocalExecutor(),
):
...
Luis Gallegos
01/19/2023, 4:59 AMexecutor = LocalDaskExecutor(num_workers=1)
Jon
01/19/2023, 1:44 PMLocalExecutor
by default?Kyle McChesney
01/19/2023, 4:47 PM- executor (prefect.executors.Executor, optional): The executor that the flow
should use. If `None`, the default executor configured in the runtime environment
will be used.
Jon
01/19/2023, 4:53 PMLuis Gallegos
01/19/2023, 4:59 PMfrom prefect import task, Flow, Parameter, unmapped
from prefect.executors import LocalDaskExecutor
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
executor = LocalDaskExecutor(num_workers=1)
with Flow("you_flow_orchestrator_name", executor=executor, ) as flow:
# run_parameters takes a list of dicts of parameters to run
load_flow = create_flow_run.map(flow_name=unmapped("slave_flow"), project_name=unmapped("slave_project"), parameters=any_list_of_dict_param)
# Wait for workers
wait_for_flow_run = wait_for_flow_run.map(flow_run_id=load_flow)
I hope works for you!Jon
01/19/2023, 5:07 PM