https://prefect.io logo
Title
j

Jon

01/18/2023, 4:54 PM
in prefect 1, is there a way to disable parallel execution of tasks? i'm doing
create_flow_run.map()
and it's spinning up too many threads. im running out of threads/memory. i would rather execute sequentially
e

Ethienne Marcelin

01/18/2023, 4:55 PM
Can't you specify a
SequentialTaskRunner
in your flow ? Or is it just in prefect2 🤔
j
bump
k

Kyle McChesney

01/18/2023, 9:32 PM
what executor are you using?
:gratitude-thank-you: 1
from prefect.executors import LocalExecutor


with Flow(
    'my_flow',
    executor=LocalExecutor(),
):
    ...
should force serial execution
l

Luis Gallegos

01/19/2023, 4:59 AM
Maybe you can try
executor = LocalDaskExecutor(num_workers=1)
j

Jon

01/19/2023, 1:44 PM
thank you @Kyle McChesney and @Luis Gallegos.i will give them both a shot!
@Kyle McChesney we are not explicitly setting an executor, so it seems we are running with
LocalExecutor
by default?
k

Kyle McChesney

01/19/2023, 4:47 PM
Not 100% sure what the default would be, as it varies by execution environment
- executor (prefect.executors.Executor, optional): The executor that the flow
           should use. If `None`, the default executor configured in the runtime environment
           will be used.
Looking more closely at your example, it seems like you are running multiple flows at once, is that the case?
j

Jon

01/19/2023, 4:53 PM
yeah, i am running a series of dependent flows: https://docs-v1.prefect.io/core/idioms/flow-to-flow
l

Luis Gallegos

01/19/2023, 4:59 PM
I have a similar code like:
from prefect import task, Flow, Parameter, unmapped
from prefect.executors import LocalDaskExecutor
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run

executor = LocalDaskExecutor(num_workers=1)

with Flow("you_flow_orchestrator_name", executor=executor, ) as flow:
	
	# run_parameters takes a list of dicts of parameters to run
	load_flow = create_flow_run.map(flow_name=unmapped("slave_flow"), project_name=unmapped("slave_project"), parameters=any_list_of_dict_param)

	# Wait for workers
	wait_for_flow_run = wait_for_flow_run.map(flow_run_id=load_flow)
I hope works for you!
j

Jon

01/19/2023, 5:07 PM
@Luis Gallegos thank you! :thank-you: checking that out