https://prefect.io logo
Title
b

Ben Muller

10/31/2022, 11:31 PM
Hi Community - me again. I am reading the tutorial here. Just trying to understand why there is a need and demonstration for the
SequentialTaskRunner
with
.submit
. Would this just be the same as not specifying a
task_runner
and just running the tasks without
.submit
or am I missing something?
1
eg what is the difference between:
@flow(task_runner=SequentialTaskRunner())
def my_flow():
  some_task.submit(123)
and
@flow
def my_flow():
  some_task(123)
?
m

Matt Conger

10/31/2022, 11:50 PM
Hey @Ben Muller by default Prefect uses the
ConcurrentTaskRunner
. The
SequentialTaskRunner
with
.submit
is beneficial to ensure multiple tasks will run one after another. I've included some documentation here. https://orion-docs.prefect.io/tutorials/execution/#task-runners
b

Ben Muller

10/31/2022, 11:52 PM
ok, thanks @Matt Conger - yeah I read that. Then my question would change to this: what is the difference between:
@flow(task_runner=ConcurrentTaskRunner())
def my_flow():
  some_task.submit(123)
and
@flow
def my_flow():
  some_task(123)
I just haven't seen why there is this explanation between the two
r

Ryan Peden

11/01/2022, 12:11 AM
Since
ConcurrentTaskRunner
is the default, it's using that in either case. If you just call the function directly, like
some_task(123)
, the concurrent task runner won't be able to run anything concurrently because the flow is blocked from running anything else until
some_task(123)
finishes running. Calling
some_task.submit(123)
, on the other hand, doesn't wait for
some_task
to finish running. It returns very quickly, giving you a
PrefectFuture
you can use to get `some_task`'s return value later on. This is useful when you want to run a bunch of tasks simultaneously instead of one after the other
🙌 1
b

Ben Muller

11/01/2022, 12:16 AM
Ok that makes sense, - thanks for the explanation. Is it fair to then say what I said initially about those being the same in terms of execution?
r

Ryan Peden

11/01/2022, 12:38 AM
Yes, in that case you'll end up with the same result
b

Ben Muller

11/01/2022, 12:38 AM
thanks
r

Ryan Peden

11/01/2022, 12:39 AM
for your previous question about the concurrent task runner, this example would highlight the difference between the two, and show why you might want to use submit:
from prefect import task, flow, get_run_logger
import time
import random

@task()
def printer(n):
    # random delay so some printer calls take longer
    # than others
    delay = random.randrange(50, 100) / 1000.0
    time.sleep(delay)
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"printer {n}")

@flow()
def test_flow():
    for i in range(100):
        printer(i)

@flow()
def test_flow_2():
    for i in range(100):
        printer.submit(i)
b

Ben Muller

11/01/2022, 12:41 AM
Oh I totally understand why you'd want to use it - I just was confused in the tutorial why there was no explanation as to the difference, it seems like this
.submit
jumps into the fold and it is not fully explained and the differences of not using it. Just my feedback, was trying to get my head around it.
r

Ryan Peden

11/01/2022, 12:42 AM
That's great feedback - showing a single task call perhaps doesn't highlight the difference enough
b

Ben Muller

11/01/2022, 12:45 AM
ah not really - I think the concurrent running is explained well enough. It is more the fact that the
ConcurrentTaskRunner
is mentioned as the default, but in order to really get any Concurrent runs, we actually need to use
.submit
. So logically ( in my head ) if it is in fact the default and we do not specify the
task_runner
and we do NOT use
.submit
we are really getting execution in the same manor that a
SequentialTaskRunner
would execute?
r

Ryan Peden

11/01/2022, 1:02 AM
Yes, for regular functions - because if you aren't using submit, you're only giving the task runner one task at a time, so it never has a chance to be concurrent.
b

Ben Muller

11/01/2022, 1:03 AM
exactly - just think that is worth explaining to people
r

Ryan Peden

11/01/2022, 1:04 AM
If your functions are async, it changes the game a bit:
import asyncio
from prefect import flow, task

@task
async def some_task(n):
    delay = random.randrange(50, 250) / 1000.0
    await asyncio.sleep(delay)
    print(n)
    return n + 1


@flow(task_runner=SequentialTaskRunner())
async def my_flow():
    result_1 = some_task(1)
    result_2 = some_task(2)
    result_3 = some_task(3)
    result_4 = some_task(4)
    result_5 = some_task(5)

    # wait for all the tasks to finish
    await asyncio.gather(result_1, result_2, result_3, result_4, result_5)
    
asyncio.run(my_flow())
When I run that, I get:
21:01:40.863 | INFO    | prefect.engine - Created flow run 'tremendous-ibex' for flow 'my-flow'
21:01:41.034 | INFO    | Flow run 'tremendous-ibex' - Created task run 'some_task-c498e59a-0' for task 'some_task'
21:01:41.035 | INFO    | Flow run 'tremendous-ibex' - Executing 'some_task-c498e59a-0' immediately...
21:01:41.036 | INFO    | Flow run 'tremendous-ibex' - Created task run 'some_task-c498e59a-4' for task 'some_task'
21:01:41.036 | INFO    | Flow run 'tremendous-ibex' - Executing 'some_task-c498e59a-4' immediately...
21:01:41.038 | INFO    | Flow run 'tremendous-ibex' - Created task run 'some_task-c498e59a-3' for task 'some_task'
21:01:41.038 | INFO    | Flow run 'tremendous-ibex' - Executing 'some_task-c498e59a-3' immediately...
21:01:41.039 | INFO    | Flow run 'tremendous-ibex' - Created task run 'some_task-c498e59a-2' for task 'some_task'
21:01:41.040 | INFO    | Flow run 'tremendous-ibex' - Executing 'some_task-c498e59a-2' immediately...
21:01:41.041 | INFO    | Flow run 'tremendous-ibex' - Created task run 'some_task-c498e59a-1' for task 'some_task'
21:01:41.042 | INFO    | Flow run 'tremendous-ibex' - Executing 'some_task-c498e59a-1' immediately...
4
1
3
21:01:41.267 | INFO    | Task run 'some_task-c498e59a-3' - Finished in state Completed()
21:01:41.280 | INFO    | Task run 'some_task-c498e59a-0' - Finished in state Completed()
21:01:41.292 | INFO    | Task run 'some_task-c498e59a-2' - Finished in state Completed()
5
21:01:41.334 | INFO    | Task run 'some_task-c498e59a-4' - Finished in state Completed()
2
21:01:41.367 | INFO    | Task run 'some_task-c498e59a-1' - Finished in state Completed()
21:01:41.400 | INFO    | Flow run 'tremendous-ibex' - Finished in state Completed('All states completed.')
Thank you for taking the time to give us feedback on this; it is really valuable and helps us improve things
🙌 1
:upvote: 2
That last example showed an edge case in the Sequential task runner: if everything is async, it can't force things to finish sequentially because they've already started by the time they reach the task runner
That wasn't actually what I meant to show 🤣 I intended to put a concurrent task runner there to show an example of non-blocking task calls without submit
b

Ben Muller

11/01/2022, 1:15 AM
haha nice - thanks!
k

Khuyen Tran

11/01/2022, 4:20 PM
@terrence I also saw other users asking the same question: Is there a difference between not using submit and using submit with
SequentialTaskRunner
. Should we remove the example of using
submit
with
SequentialTaskRunner
in getting started?
t

terrence

11/01/2022, 4:41 PM
I wrote this specifically to demonstrate how to call a specific task runner. There’s really not any more mystery around this example than demonstrating the syntax to specify a task runner. And it’s my opinion that, if you’re going to call a task runner, you should specify the task runner and not rely on or assume the default. Hence the example.
👍 1
Good feedback, though, @Ben Muller thank you 🙏
There’s one hitch in the discussion of which task runner to use: anything other than SequentialTaskRunner and ConcurrentTaskRunner involves pip installing a collection or having to spin up some external service like Docker. That’s a lot of overhead for the examples in this part of the documentation. We’re really just trying to introduce the concept of specifying and submitting to a task runner before we expose users to more complex environments and use cases.