<@ULVA73B9P> Are tasks.map and tasks.submit equiva...
# ask-marvin
c
@Marvin Are tasks.map and tasks.submit equivalent? they provide the same result? I have the following code and looks like the submit approach awaits 0.6 to be completed first, why so? how can I have the same behavior as map but using submit?
Copy code
results = sleep.map([5, 3, 0.8, 1])  # <--- Run all tasks in parallel

    for i in [1.2, 0.7]:
        do_some_stuffs(sleep(i))

    for i in [0.6, 0.3]:
        do_some_stuffs(sleep.submit(i))
m
tasks.map
and
tasks.submit
are not equivalent.
tasks.map
is a Prefect feature that automatically creates a task run for each element of its input data. It represents the computations of many individual children tasks. This means that if you use
tasks.map
, it will apply the task to each element of its inputs in parallel. On the other hand,
tasks.submit
is used to submit a task to a task runner like Dask. It doesn't inherently run tasks in parallel like
tasks.map
does. The reason why your
submit
approach is waiting for 0.6 to be completed first might be due to the way your task runner is handling the tasks. If you want to achieve parallel execution with
submit
, you might need to configure your task runner to support parallel execution. For more detailed information, you might want to refer to the Prefect documentation or community resources: - Prefect Tasks - Prefect Task Runners - Discourse: Running multiple subflows or child flows in parallel
n
.map
will call
.submit
for each item in the iterable, there should be no big difference between
Copy code
for i in range(3):
   foo.submit(i)
and
Copy code
foo.map(range(3)) # this will take a slightly more efficient path through the orchestration engine
also, not to be pedantic 🙂 but
Copy code
results = sleep.map([5, 3, 0.8, 1])
this will run tasks concurrently, not in parallel, unless you use Dask or Ray task runners - although it will effectively feel parallel for most small-scale mapped tasks
🙌 1
c
I'm sorry, I still don't understand the difference between the map and the submit on this scenario: I have the following toy code.
Copy code
@task(task_run_name="Sleeping for {time}")
async def sleep(time: int) -> int:
    import time as t
    t.sleep(time)
    return time


@task(task_run_name="Doing some stuffs with {value}")
def do_some_stuffs(value):
    print(f"Doing some stuffs with {value}")


@flow(log_prints=True, flow_run_name="MyFlow")
def hello_flow(name_input):

    results = sleep.map([5, 1.5])  
    results_2 = sleep.map([2, 1])  

    do_some_stuffs(results)
    do_some_stuffs(results_2)
Which produces the following results
Copy code
Task run 'Sleeping for 1' - Finished in state Completed()                                   
 Task run 'Sleeping for 1.5' - Finished in state Completed()                                 
 Task run 'Sleeping for 2' - Finished in state Completed()                                   
 Task run 'Sleeping for 5' - Finished in state Completed()                                   
 Task run 'Doing some stuffs with [5, 1.5]' - Doing some stuffs with [5, 1.5]                
 Task run 'Doing some stuffs with [5, 1.5]' - Finished in state Completed()                  
 Flow run 'MyFlow' - Created task run 'do_some_stuffs-1' for task 'do_some_stuffs'           
 Flow run 'MyFlow' - Executing 'do_some_stuffs-1' immediately...                             
 Task run 'Doing some stuffs with [2, 1]' - Doing some stuffs with [2, 1]                    
 Task run 'Doing some stuffs with [2, 1]' - Finished in state Completed()                    
 Flow run 'truthful-wolverine' - Finished in state Completed('All states completed.')
Which is expected If I use
Copy code
results_2 = [sleep.submit(i) for i in [2, 1]] 
do_some_stuffs(results_2)
I got the same result as well But if I do:
Copy code
for i in [2, 1]:                        
        do_some_stuffs(sleep.submit(i))
The results are different; the sleep for the first 2 seconds and later sleep for 1 second. My interpretation was that sleep.submit would be a future, and do_some_stuffs would be called later. But calling the submit inside the for loop passes the argument strait to
do_some_stuffs
makes it to sleep 2 seconds first, and later sleep 1 second. Am I missing something here?
n
i'm not sure I understand your point there shouldn't be much of a difference between the following
Copy code
In [1]: from prefect import flow, task

In [2]: @task
   ...: def sleep(duration: int) -> int:
   ...:     import time
   ...:     time.sleep(duration)
   ...:     return duration
   ...:

In [3]: @flow
   ...: def foo():
   ...:     futures = [sleep.submit(i) for i in range(3)]
   ...:     assert [i.result() for i in futures] == [0, 1, 2]
   ...:

In [4]: foo()

... tasks run concurrently with ConcurrentTaskRunner (the default task runner)

In [5]: @flow
   ...: def foo():
   ...:     futures = sleep.map(range(3))
   ...:     assert [i.result() for i in futures] == [0, 1, 2]

In [6]: foo()

... tasks run concurrently with ConcurrentTaskRunner (the default task runner)
futures are resolved by downstream tasks / flows (e.g.
do_some_stuffs
) when they're called - does that answer your question?
c
Yes indeed the results are identical, my only surprise was the execution order. Using
Copy code
results = sleep.map([5, 1.5])  
    results_2 = sleep.map([2, 1])  

    do_some_stuffs(results)
    do_some_stuffs(results_2)

I got

Task run 'Sleeping for 1' - Finished in state Completed()                                   
Task run 'Sleeping for 1.5' - Finished in state Completed()                                 
Task run 'Sleeping for 2' - Finished in state Completed()                                   
Task run 'Sleeping for 5' - Finished in state Completed()
Using
Copy code
results = sleep.map([5, 1.5])  
    results_2 = [sleep.submit(i) for i in [2, 1]]  

    do_some_stuffs(results)
    do_some_stuffs(results_2)

I got

Task run 'Sleeping for 1' - Finished in state Completed()                                   
Task run 'Sleeping for 1.5' - Finished in state Completed()                                 
Task run 'Sleeping for 2' - Finished in state Completed()                                   
Task run 'Sleeping for 5' - Finished in state Completed()
BUT using
Copy code
results = sleep.map([5, 1.5])  
    for i in [2, 1]:                        
        do_some_stuffs(sleep.submit(i))  

    do_some_stuffs(results)
I get
Copy code
Task run 'Sleeping for 1.5' - Finished in state Completed()                                   
Task run 'Sleeping for 2' - Finished in state Completed()                                 
Task run 'Sleeping for 1' - Finished in state Completed()                                   
Task run 'Sleeping for 5' - Finished in state Completed()
Which Is not the order that I expected; I was expecting the same order of execution as before (1, 1.5, 2, 5)
n
ah, yeah in general I think the
ConcurrentTaskRunner
will run tasks submitted to it in a non-deterministic order, though it will return the results in the same order as the corresponding inputs
whereas the
SequentialTaskRunner
will run things sequentially in the order that they are submitted