Ihor Ramskyi
08/13/2024, 1:32 PMMihir Thatte
08/13/2024, 1:33 PMIhor Ramskyi
08/13/2024, 3:01 PMimport time
from prefect import task, flow
from prefect.states import Completed
@task
def task_a(x):
time.sleep(3)
print("A"*x)
return Completed()
@flow
def generic_flow():
lst = []
for i in range(1, 6):
lst.append(task_a.submit(i))
for f in lst:
f.wait()
if __name__ == "__main__":
generic_flow()
and it produced such log:
17:58:25.150 | INFO | prefect.engine - Created flow run 'warping-hoatzin' for flow 'generic-flow'
17:58:25.158 | INFO | prefect.engine - View at <http://127.0.0.1:4200/runs/flow-run/ea1484fa-4d54-4d4e-b74c-843ef61fa75d>
17:58:25.380 | INFO | Flow run 'warping-hoatzin' - Submitting task task_a to thread pool executor...
17:58:25.398 | INFO | Flow run 'warping-hoatzin' - Submitting task task_a to thread pool executor...
17:58:25.423 | INFO | Flow run 'warping-hoatzin' - Submitting task task_a to thread pool executor...
17:58:25.442 | INFO | Flow run 'warping-hoatzin' - Submitting task task_a to thread pool executor...
17:58:25.455 | INFO | Flow run 'warping-hoatzin' - Submitting task task_a to thread pool executor...
17:58:26.001 | INFO | Task run 'task_a-2' - Created task run 'task_a-2' for task 'task_a'
17:58:26.104 | INFO | Task run 'task_a-3' - Created task run 'task_a-3' for task 'task_a'
17:58:26.273 | INFO | Task run 'task_a-0' - Created task run 'task_a-0' for task 'task_a'
17:58:26.393 | INFO | Task run 'task_a-4' - Created task run 'task_a-4' for task 'task_a'
17:58:26.472 | INFO | Task run 'task_a-1' - Created task run 'task_a-1' for task 'task_a'
AAA
A
17:58:30.052 | INFO | Task run 'task_a-2' - Finished in state Completed()
AA
17:58:30.118 | INFO | Task run 'task_a-0' - Finished in state Completed()
AAAAA
17:58:30.216 | INFO | Task run 'task_a-1' - Finished in state Completed()
17:58:30.301 | INFO | Task run 'task_a-4' - Finished in state Completed()
AAAA
17:58:30.444 | INFO | Task run 'task_a-3' - Finished in state Completed()
17:58:30.609 | INFO | Flow run 'warping-hoatzin' - Finished in state Completed()
It seems that 5 task submits with time.sleep(3) each (so 15 seconds at least) are done in 5 seconds. That shows code is executed concurrently.
Also visualizer shows this:Ihor Ramskyi
08/13/2024, 3:02 PMNate
08/13/2024, 6:14 PMfrom prefect import flow, task
@task
def foo(): ...
@flow
def some_flow():
foo()
foo()
foo()
some_flow()
by using submit
, you're opting into the ThreadPoolTaskRunner
(called ConcurrentTaskRunner
in 2.x) where you submit task runs to the task runner in a non-blocking fashion, which the task runner will execute concurrently, subject to any state dependencies (set with wait_for
)
so this is not actually different from 2.x at all, and tldr:
• to get simple sequential behavior, just call your tasks, no .submit()
• to get concurrent behavior, using .submit()
or .map()
to send work to task runnersNate
08/13/2024, 6:14 PMIhor Ramskyi
08/14/2024, 12:38 PM