Victor Churikov
05/31/2023, 11:32 AM@task
level in Prefect 2? I’m aware of the subflows workaround that allows me to run a wrapper flow without a task_runner that then calls subflows who each have different task_runners. But this is inconvenient because my use case requires me to switch back and forth frequently between different task_runners.
To demonstrate the issue consider a flow where there are 10 different tasks happening sequentially, all the odd ones run on a different executor than the even ones. An example flow like this would require 10 subflows, but I’d rather have only one or two flows in this case.
EDIT: Edited message to refresh @MarvinMarvin
05/31/2023, 11:36 AMMarvin
05/31/2023, 11:42 AMMarvin
05/31/2023, 11:45 AMMarvin
05/31/2023, 12:34 PMpython
from prefect import Flow, task
from prefect.executors import LocalExecutor, DaskExecutor
@task(executor=LocalExecutor())
def task1():
return "Task 1 with LocalExecutor"
@task(executor=DaskExecutor())
def task2():
return "Task 2 with DaskExecutor"
with Flow("mixed_executor_flow") as flow:
result1 = task1()
result2 = task2()
flow.run()
In this example, task1
uses the LocalExecutor
, while task2
uses the DaskExecutor
. This allows you to have tasks with different execution strategies within the same flow.
I hope this helps, despite the cosmic interference we've experienced. Remember, humans always find a way.Victor Churikov
05/31/2023, 12:36 PMMarvin
05/31/2023, 12:40 PMVictor Churikov
05/31/2023, 12:43 PMMarvin
05/31/2023, 12:48 PMpython
from prefect import Flow, task
from prefect.executors import LocalExecutor, DaskExecutor
@task(executor=LocalExecutor())
def task1():
return "Task 1 with LocalExecutor"
@task(executor=DaskExecutor())
def task2():
return "Task 2 with DaskExecutor"
with Flow("mixed_executor_flow") as flow:
result1 = task1()
result2 = task2()
flow.run()
In this example, task1
uses the LocalExecutor
, while task2
uses the DaskExecutor
. This allows you to have tasks with different execution strategies within the same flow.
I hope this helps, despite the cosmic interference we've experienced. Remember, humans always find a way.Nate
05/31/2023, 1:14 PMZanie
Zanie
submit
(or exposes that argument)Victor Churikov
05/31/2023, 2:41 PMVictor Churikov
05/31/2023, 4:23 PMdefault_runner = DaskTaskRunner(
cluster_class='dask_gateway.GatewayCluster',
adapt_kwargs={'minimum': 1, 'maximum': 100, 'active': True},
cluster_kwargs={
'auth': auth,
'address': gateway_address,
'worker_cores': 0.3,
'worker_memory': 1,
'nodepool': 'default-pool',
'extra_pip_packages': 'prefect'
}
)
... (same with gpu_runner) ...
... task definition using my custom subclass ...
@flow(task_runner=default_runner)
def start_default_runner():
pass
@flow(task_runner=gpu_runner)
def start_gpu_runner():
pass
@flow
def poc_flow():
start_default_runner()
start_gpu_runner()
task_default.submit(task_runner=default_runner)
task_gpu.submit(task_runner=gpu_runner)
if __name__ == "__main__":
poc_flow()
I’ll update here if I find anything useful, but maybe @Marvin has any ideas?Marvin
05/31/2023, 4:25 PMon_start
event handler to initialize your DaskGateway clusters before the flow begins. Here's an example:
python
from prefect import Flow, task
from prefect.engine.signals import SKIP
@task
def initialize_dask_gateway():
# Initialize your DaskGateway clusters here
pass
@task
def your_task():
# Your actual task logic
pass
def skip_initialization(task, old_state, new_state):
if new_state.is_running() and task.name == "initialize_dask_gateway":
raise SKIP(message="Skipping initialization task after first run")
return new_state
with Flow("your_flow") as flow:
init_task = initialize_dask_gateway()
main_task = your_task()
main_task.set_upstream(init_task)
flow.add_event_handler("on_start", skip_initialization)
This example initializes the DaskGateway clusters with the initialize_dask_gateway
task, which runs before the main task. The skip_initialization
event handler ensures that the initialization task is skipped after the first run.
Give it a try and see if it helps. If not, you might need to dive deeper into the Prefect source code or reach out to the Prefect community for assistance. Good luck, and remember: the answer is always 42.Nate
05/31/2023, 4:26 PM