Chen
12/05/2023, 9:05 PMresults = sleep.map([5, 3, 0.8, 1]) # <--- Run all tasks in parallel
for i in [1.2, 0.7]:
do_some_stuffs(sleep(i))
for i in [0.6, 0.3]:
do_some_stuffs(sleep.submit(i))
Marvin
12/05/2023, 9:06 PMtasks.map
and tasks.submit
are not equivalent.
tasks.map
is a Prefect feature that automatically creates a task run for each element of its input data. It represents the computations of many individual children tasks. This means that if you use tasks.map
, it will apply the task to each element of its inputs in parallel.
On the other hand, tasks.submit
is used to submit a task to a task runner like Dask. It doesn't inherently run tasks in parallel like tasks.map
does.
The reason why your submit
approach is waiting for 0.6 to be completed first might be due to the way your task runner is handling the tasks. If you want to achieve parallel execution with submit
, you might need to configure your task runner to support parallel execution.
For more detailed information, you might want to refer to the Prefect documentation or community resources:
- Prefect Tasks
- Prefect Task Runners
- Discourse: Running multiple subflows or child flows in parallelNate
12/05/2023, 9:08 PM.map
will call .submit
for each item in the iterable, there should be no big difference between
for i in range(3):
foo.submit(i)
and
foo.map(range(3)) # this will take a slightly more efficient path through the orchestration engine
Nate
12/05/2023, 9:10 PMresults = sleep.map([5, 3, 0.8, 1])
this will run tasks concurrently, not in parallel, unless you use Dask or Ray task runners - although it will effectively feel parallel for most small-scale mapped tasksChen
12/05/2023, 9:23 PM@task(task_run_name="Sleeping for {time}")
async def sleep(time: int) -> int:
import time as t
t.sleep(time)
return time
@task(task_run_name="Doing some stuffs with {value}")
def do_some_stuffs(value):
print(f"Doing some stuffs with {value}")
@flow(log_prints=True, flow_run_name="MyFlow")
def hello_flow(name_input):
results = sleep.map([5, 1.5])
results_2 = sleep.map([2, 1])
do_some_stuffs(results)
do_some_stuffs(results_2)
Which produces the following results
Task run 'Sleeping for 1' - Finished in state Completed()
Task run 'Sleeping for 1.5' - Finished in state Completed()
Task run 'Sleeping for 2' - Finished in state Completed()
Task run 'Sleeping for 5' - Finished in state Completed()
Task run 'Doing some stuffs with [5, 1.5]' - Doing some stuffs with [5, 1.5]
Task run 'Doing some stuffs with [5, 1.5]' - Finished in state Completed()
Flow run 'MyFlow' - Created task run 'do_some_stuffs-1' for task 'do_some_stuffs'
Flow run 'MyFlow' - Executing 'do_some_stuffs-1' immediately...
Task run 'Doing some stuffs with [2, 1]' - Doing some stuffs with [2, 1]
Task run 'Doing some stuffs with [2, 1]' - Finished in state Completed()
Flow run 'truthful-wolverine' - Finished in state Completed('All states completed.')
Which is expected
If I use
results_2 = [sleep.submit(i) for i in [2, 1]]
do_some_stuffs(results_2)
I got the same result as well
But if I do:
for i in [2, 1]:
do_some_stuffs(sleep.submit(i))
The results are different; the sleep for the first 2 seconds and later sleep for 1 second.
My interpretation was that sleep.submit would be a future, and do_some_stuffs would be called later.
But calling the submit inside the for loop passes the argument strait to do_some_stuffs
makes it to sleep 2 seconds first, and later sleep 1 second.
Am I missing something here?Nate
12/05/2023, 9:43 PMIn [1]: from prefect import flow, task
In [2]: @task
...: def sleep(duration: int) -> int:
...: import time
...: time.sleep(duration)
...: return duration
...:
In [3]: @flow
...: def foo():
...: futures = [sleep.submit(i) for i in range(3)]
...: assert [i.result() for i in futures] == [0, 1, 2]
...:
In [4]: foo()
... tasks run concurrently with ConcurrentTaskRunner (the default task runner)
In [5]: @flow
...: def foo():
...: futures = sleep.map(range(3))
...: assert [i.result() for i in futures] == [0, 1, 2]
In [6]: foo()
... tasks run concurrently with ConcurrentTaskRunner (the default task runner)
futures are resolved by downstream tasks / flows (e.g. do_some_stuffs
) when they're called - does that answer your question?Chen
12/05/2023, 9:53 PMresults = sleep.map([5, 1.5])
results_2 = sleep.map([2, 1])
do_some_stuffs(results)
do_some_stuffs(results_2)
I got
Task run 'Sleeping for 1' - Finished in state Completed()
Task run 'Sleeping for 1.5' - Finished in state Completed()
Task run 'Sleeping for 2' - Finished in state Completed()
Task run 'Sleeping for 5' - Finished in state Completed()
Using
results = sleep.map([5, 1.5])
results_2 = [sleep.submit(i) for i in [2, 1]]
do_some_stuffs(results)
do_some_stuffs(results_2)
I got
Task run 'Sleeping for 1' - Finished in state Completed()
Task run 'Sleeping for 1.5' - Finished in state Completed()
Task run 'Sleeping for 2' - Finished in state Completed()
Task run 'Sleeping for 5' - Finished in state Completed()
BUT using
results = sleep.map([5, 1.5])
for i in [2, 1]:
do_some_stuffs(sleep.submit(i))
do_some_stuffs(results)
I get
Task run 'Sleeping for 1.5' - Finished in state Completed()
Task run 'Sleeping for 2' - Finished in state Completed()
Task run 'Sleeping for 1' - Finished in state Completed()
Task run 'Sleeping for 5' - Finished in state Completed()
Which Is not the order that I expected; I was expecting the same order of execution as before (1, 1.5, 2, 5)Nate
12/05/2023, 9:57 PMConcurrentTaskRunner
will run tasks submitted to it in a non-deterministic order, though it will return the results in the same order as the corresponding inputsNate
12/05/2023, 9:57 PMSequentialTaskRunner
will run things sequentially in the order that they are submitted