https://prefect.io logo
Title
s

scott

01/24/2023, 3:35 AM
Does anyone have an example of what she means here by passing the task into another task? I want to use
wait_for
where the tasks will be run async, but it’s not clear how to be able to use
wait_for
r

Ryan Peden

01/24/2023, 5:03 AM
Hi Scott! I think what Kalise means if that if you call
.submit
on the task to submit it to the task runner, the task will return a
PrefectFuture
immediately instead of waiting for the task to finish. So you can
submit
several tasks to start them running asynchronously, and then use wait_for so a downstream task won't start until previous tasks finish. Here's an example:
import time
from prefect import task, flow
from prefect.task_runners import ConcurrentTaskRunner

@task(name="Task 1")
def task_one():
    # sleep for 2 seconds
    print("Running task 1...")
    time.sleep(2)
    print("Task 1 done!")

@task(name="Task 2")
def task_two():
    # sleep for 1 second
    print("Running task 2...")
    time.sleep(1)
    print("Task 2 done!")

@task(name="Task 3")
def task_three():
    # print the output of t1 and t2
    print("Running task 3...")


@flow(name="Time flow", task_runner=ConcurrentTaskRunner())
def time_flow():
    t1 = task_one.submit()
    t2 = task_two.submit()
    task_three(wait_for=[t1, t2])


if __name__ == "__main__":
    time_flow()
If you run this, you'll see task 1 and task 2 running simultaneously. Task 2 finishes first, followed by task 1 about a second later, and only then does task 3 start running.
wait_for
is useful if you need to wait for another task to finish, but don't need to use its result. If you do need the to use the result of a submitted upstream task, you can just pass its
PrefectFuture
as an argument to another task. If a task sees that it has received a future as an argument, it will wait for it to finish before proceeding. For example:
import time
from prefect import task, flow
from prefect.task_runners import ConcurrentTaskRunner


@task(name="Task 1")
def task_one():
    # sleep for 2 seconds
    print("Running task 1...")
    time.sleep(2)
    print("Task 1 done!")
    return "Task 1 output"


@task(name="Task 2")
def task_two():
    # sleep for 1 second
    print("Running task 2...")
    time.sleep(1)
    print("Task 2 done!")
    return "Task 2 output"


@task(name="Task 3")
def task_three(t1, t2):
    # print the output of t1 and t2
    print("Running task 3...")
    print(t1)
    print(t2)


@flow(name="Time flow", task_runner=ConcurrentTaskRunner())
def time_flow():
    t1 = task_one.submit()
    t2 = task_two.submit()
    task_three(t1, t2)
You'll see that tasks 1 and 2 again run asynchronously, and task 3 waits for them to complete before it runs and prints their outputs.
s

scott

01/24/2023, 5:29 AM
Thanks @Ryan Peden for those examples! These examples use the task runner approach. What if you were using pure Python async approach as in the prefect docs here https://docs.prefect.io/concepts/tasks/#async-tasks ?
r

Ryan Peden

01/24/2023, 5:57 AM
I think in that case, you'd still need to use
submit
to use
wait_for
, because ultimately the Prefect engine ends up checking
wait_for
right here. I can definitely see how it would be nicer to use any awaitable in
wait_for
, not just a
PrefectFuture
. I have a could of ideas about why it's designed the way it is now, but I'd have to ask a couple of people to find out for sure.
s

scott

01/24/2023, 2:30 PM
Okay, thanks, that’s what I was starting to think, that I’d have to use
submit
. Thanks again for the help