Hi Community - me again. I am reading the tutoria...
# prefect-community
b
Hi Community - me again. I am reading the tutorial here. Just trying to understand why there is a need and demonstration for the
SequentialTaskRunner
with
.submit
. Would this just be the same as not specifying a
task_runner
and just running the tasks without
.submit
or am I missing something?
1
eg what is the difference between:
Copy code
@flow(task_runner=SequentialTaskRunner())
def my_flow():
  some_task.submit(123)
and
Copy code
@flow
def my_flow():
  some_task(123)
?
m
Hey @Ben Muller by default Prefect uses the
ConcurrentTaskRunner
. The
SequentialTaskRunner
with
.submit
is beneficial to ensure multiple tasks will run one after another. I've included some documentation here. https://orion-docs.prefect.io/tutorials/execution/#task-runners
b
ok, thanks @Matt Conger - yeah I read that. Then my question would change to this: what is the difference between:
Copy code
@flow(task_runner=ConcurrentTaskRunner())
def my_flow():
  some_task.submit(123)
and
Copy code
@flow
def my_flow():
  some_task(123)
I just haven't seen why there is this explanation between the two
r
Since
ConcurrentTaskRunner
is the default, it's using that in either case. If you just call the function directly, like
some_task(123)
, the concurrent task runner won't be able to run anything concurrently because the flow is blocked from running anything else until
some_task(123)
finishes running. Calling
some_task.submit(123)
, on the other hand, doesn't wait for
some_task
to finish running. It returns very quickly, giving you a
PrefectFuture
you can use to get `some_task`'s return value later on. This is useful when you want to run a bunch of tasks simultaneously instead of one after the other
🙌 1
b
Ok that makes sense, - thanks for the explanation. Is it fair to then say what I said initially about those being the same in terms of execution?
r
Yes, in that case you'll end up with the same result
b
thanks
r
for your previous question about the concurrent task runner, this example would highlight the difference between the two, and show why you might want to use submit:
Copy code
from prefect import task, flow, get_run_logger
import time
import random

@task()
def printer(n):
    # random delay so some printer calls take longer
    # than others
    delay = random.randrange(50, 100) / 1000.0
    time.sleep(delay)
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"printer {n}")

@flow()
def test_flow():
    for i in range(100):
        printer(i)

@flow()
def test_flow_2():
    for i in range(100):
        printer.submit(i)
b
Oh I totally understand why you'd want to use it - I just was confused in the tutorial why there was no explanation as to the difference, it seems like this
.submit
jumps into the fold and it is not fully explained and the differences of not using it. Just my feedback, was trying to get my head around it.
r
That's great feedback - showing a single task call perhaps doesn't highlight the difference enough
b
ah not really - I think the concurrent running is explained well enough. It is more the fact that the
ConcurrentTaskRunner
is mentioned as the default, but in order to really get any Concurrent runs, we actually need to use
.submit
. So logically ( in my head ) if it is in fact the default and we do not specify the
task_runner
and we do NOT use
.submit
we are really getting execution in the same manor that a
SequentialTaskRunner
would execute?
r
Yes, for regular functions - because if you aren't using submit, you're only giving the task runner one task at a time, so it never has a chance to be concurrent.
b
exactly - just think that is worth explaining to people
r
If your functions are async, it changes the game a bit:
Copy code
import asyncio
from prefect import flow, task

@task
async def some_task(n):
    delay = random.randrange(50, 250) / 1000.0
    await asyncio.sleep(delay)
    print(n)
    return n + 1


@flow(task_runner=SequentialTaskRunner())
async def my_flow():
    result_1 = some_task(1)
    result_2 = some_task(2)
    result_3 = some_task(3)
    result_4 = some_task(4)
    result_5 = some_task(5)

    # wait for all the tasks to finish
    await asyncio.gather(result_1, result_2, result_3, result_4, result_5)
    
asyncio.run(my_flow())
When I run that, I get:
Copy code
21:01:40.863 | INFO    | prefect.engine - Created flow run 'tremendous-ibex' for flow 'my-flow'
21:01:41.034 | INFO    | Flow run 'tremendous-ibex' - Created task run 'some_task-c498e59a-0' for task 'some_task'
21:01:41.035 | INFO    | Flow run 'tremendous-ibex' - Executing 'some_task-c498e59a-0' immediately...
21:01:41.036 | INFO    | Flow run 'tremendous-ibex' - Created task run 'some_task-c498e59a-4' for task 'some_task'
21:01:41.036 | INFO    | Flow run 'tremendous-ibex' - Executing 'some_task-c498e59a-4' immediately...
21:01:41.038 | INFO    | Flow run 'tremendous-ibex' - Created task run 'some_task-c498e59a-3' for task 'some_task'
21:01:41.038 | INFO    | Flow run 'tremendous-ibex' - Executing 'some_task-c498e59a-3' immediately...
21:01:41.039 | INFO    | Flow run 'tremendous-ibex' - Created task run 'some_task-c498e59a-2' for task 'some_task'
21:01:41.040 | INFO    | Flow run 'tremendous-ibex' - Executing 'some_task-c498e59a-2' immediately...
21:01:41.041 | INFO    | Flow run 'tremendous-ibex' - Created task run 'some_task-c498e59a-1' for task 'some_task'
21:01:41.042 | INFO    | Flow run 'tremendous-ibex' - Executing 'some_task-c498e59a-1' immediately...
4
1
3
21:01:41.267 | INFO    | Task run 'some_task-c498e59a-3' - Finished in state Completed()
21:01:41.280 | INFO    | Task run 'some_task-c498e59a-0' - Finished in state Completed()
21:01:41.292 | INFO    | Task run 'some_task-c498e59a-2' - Finished in state Completed()
5
21:01:41.334 | INFO    | Task run 'some_task-c498e59a-4' - Finished in state Completed()
2
21:01:41.367 | INFO    | Task run 'some_task-c498e59a-1' - Finished in state Completed()
21:01:41.400 | INFO    | Flow run 'tremendous-ibex' - Finished in state Completed('All states completed.')
Thank you for taking the time to give us feedback on this; it is really valuable and helps us improve things
🙌 1
upvote 2
That last example showed an edge case in the Sequential task runner: if everything is async, it can't force things to finish sequentially because they've already started by the time they reach the task runner
That wasn't actually what I meant to show 🤣 I intended to put a concurrent task runner there to show an example of non-blocking task calls without submit
b
haha nice - thanks!
k
@terrence I also saw other users asking the same question: Is there a difference between not using submit and using submit with
SequentialTaskRunner
. Should we remove the example of using
submit
with
SequentialTaskRunner
in getting started?
t
I wrote this specifically to demonstrate how to call a specific task runner. There’s really not any more mystery around this example than demonstrating the syntax to specify a task runner. And it’s my opinion that, if you’re going to call a task runner, you should specify the task runner and not rely on or assume the default. Hence the example.
👍 1
Good feedback, though, @Ben Muller thank you 🙏
There’s one hitch in the discussion of which task runner to use: anything other than SequentialTaskRunner and ConcurrentTaskRunner involves pip installing a collection or having to spin up some external service like Docker. That’s a lot of overhead for the examples in this part of the documentation. We’re really just trying to introduce the concept of specifying and submitting to a task runner before we expose users to more complex environments and use cases.