Ben Muller
10/31/2022, 11:31 PMSequentialTaskRunner
with .submit
.
Would this just be the same as not specifying a task_runner
and just running the tasks without .submit
or am I missing something?Ben Muller
10/31/2022, 11:34 PM@flow(task_runner=SequentialTaskRunner())
def my_flow():
some_task.submit(123)
and
@flow
def my_flow():
some_task(123)
?Matt Conger
10/31/2022, 11:50 PMConcurrentTaskRunner
. The SequentialTaskRunner
with .submit
is beneficial to ensure multiple tasks will run one after another. I've included some documentation here. https://orion-docs.prefect.io/tutorials/execution/#task-runnersBen Muller
10/31/2022, 11:52 PM@flow(task_runner=ConcurrentTaskRunner())
def my_flow():
some_task.submit(123)
and
@flow
def my_flow():
some_task(123)
Ben Muller
10/31/2022, 11:52 PMRyan Peden
11/01/2022, 12:11 AMConcurrentTaskRunner
is the default, it's using that in either case.
If you just call the function directly, like some_task(123)
, the concurrent task runner won't be able to run anything concurrently because the flow is blocked from running anything else until some_task(123)
finishes running.
Calling some_task.submit(123)
, on the other hand, doesn't wait for some_task
to finish running. It returns very quickly, giving you a PrefectFuture
you can use to get `some_task`'s return value later on. This is useful when you want to run a bunch of tasks simultaneously instead of one after the otherBen Muller
11/01/2022, 12:16 AMRyan Peden
11/01/2022, 12:38 AMBen Muller
11/01/2022, 12:38 AMRyan Peden
11/01/2022, 12:39 AMfrom prefect import task, flow, get_run_logger
import time
import random
@task()
def printer(n):
# random delay so some printer calls take longer
# than others
delay = random.randrange(50, 100) / 1000.0
time.sleep(delay)
logger = get_run_logger()
<http://logger.info|logger.info>(f"printer {n}")
@flow()
def test_flow():
for i in range(100):
printer(i)
@flow()
def test_flow_2():
for i in range(100):
printer.submit(i)
Ben Muller
11/01/2022, 12:41 AM.submit
jumps into the fold and it is not fully explained and the differences of not using it.
Just my feedback, was trying to get my head around it.Ryan Peden
11/01/2022, 12:42 AMBen Muller
11/01/2022, 12:45 AMConcurrentTaskRunner
is mentioned as the default, but in order to really get any Concurrent runs, we actually need to use .submit
.
So logically ( in my head ) if it is in fact the default and we do not specify the task_runner
and we do NOT use .submit
we are really getting execution in the same manor that a SequentialTaskRunner
would execute?Ryan Peden
11/01/2022, 1:02 AMBen Muller
11/01/2022, 1:03 AMRyan Peden
11/01/2022, 1:04 AMimport asyncio
from prefect import flow, task
@task
async def some_task(n):
delay = random.randrange(50, 250) / 1000.0
await asyncio.sleep(delay)
print(n)
return n + 1
@flow(task_runner=SequentialTaskRunner())
async def my_flow():
result_1 = some_task(1)
result_2 = some_task(2)
result_3 = some_task(3)
result_4 = some_task(4)
result_5 = some_task(5)
# wait for all the tasks to finish
await asyncio.gather(result_1, result_2, result_3, result_4, result_5)
asyncio.run(my_flow())
When I run that, I get:
21:01:40.863 | INFO | prefect.engine - Created flow run 'tremendous-ibex' for flow 'my-flow'
21:01:41.034 | INFO | Flow run 'tremendous-ibex' - Created task run 'some_task-c498e59a-0' for task 'some_task'
21:01:41.035 | INFO | Flow run 'tremendous-ibex' - Executing 'some_task-c498e59a-0' immediately...
21:01:41.036 | INFO | Flow run 'tremendous-ibex' - Created task run 'some_task-c498e59a-4' for task 'some_task'
21:01:41.036 | INFO | Flow run 'tremendous-ibex' - Executing 'some_task-c498e59a-4' immediately...
21:01:41.038 | INFO | Flow run 'tremendous-ibex' - Created task run 'some_task-c498e59a-3' for task 'some_task'
21:01:41.038 | INFO | Flow run 'tremendous-ibex' - Executing 'some_task-c498e59a-3' immediately...
21:01:41.039 | INFO | Flow run 'tremendous-ibex' - Created task run 'some_task-c498e59a-2' for task 'some_task'
21:01:41.040 | INFO | Flow run 'tremendous-ibex' - Executing 'some_task-c498e59a-2' immediately...
21:01:41.041 | INFO | Flow run 'tremendous-ibex' - Created task run 'some_task-c498e59a-1' for task 'some_task'
21:01:41.042 | INFO | Flow run 'tremendous-ibex' - Executing 'some_task-c498e59a-1' immediately...
4
1
3
21:01:41.267 | INFO | Task run 'some_task-c498e59a-3' - Finished in state Completed()
21:01:41.280 | INFO | Task run 'some_task-c498e59a-0' - Finished in state Completed()
21:01:41.292 | INFO | Task run 'some_task-c498e59a-2' - Finished in state Completed()
5
21:01:41.334 | INFO | Task run 'some_task-c498e59a-4' - Finished in state Completed()
2
21:01:41.367 | INFO | Task run 'some_task-c498e59a-1' - Finished in state Completed()
21:01:41.400 | INFO | Flow run 'tremendous-ibex' - Finished in state Completed('All states completed.')
Ryan Peden
11/01/2022, 1:05 AMRyan Peden
11/01/2022, 1:06 AMRyan Peden
11/01/2022, 1:07 AMBen Muller
11/01/2022, 1:15 AMKhuyen Tran
11/01/2022, 4:20 PMSequentialTaskRunner
. Should we remove the example of using submit
with SequentialTaskRunner
in getting started?terrence
11/01/2022, 4:41 PMterrence
11/01/2022, 4:44 PMterrence
11/01/2022, 4:46 PM