scott
01/24/2023, 3:35 AMwait_for
where the tasks will be run async, but it’s not clear how to be able to use wait_for
Ryan Peden
01/24/2023, 5:03 AM.submit
on the task to submit it to the task runner, the task will return a PrefectFuture
immediately instead of waiting for the task to finish.
So you can submit
several tasks to start them running asynchronously, and then use wait_for so a downstream task won't start until previous tasks finish. Here's an example:
import time
from prefect import task, flow
from prefect.task_runners import ConcurrentTaskRunner
@task(name="Task 1")
def task_one():
# sleep for 2 seconds
print("Running task 1...")
time.sleep(2)
print("Task 1 done!")
@task(name="Task 2")
def task_two():
# sleep for 1 second
print("Running task 2...")
time.sleep(1)
print("Task 2 done!")
@task(name="Task 3")
def task_three():
# print the output of t1 and t2
print("Running task 3...")
@flow(name="Time flow", task_runner=ConcurrentTaskRunner())
def time_flow():
t1 = task_one.submit()
t2 = task_two.submit()
task_three(wait_for=[t1, t2])
if __name__ == "__main__":
time_flow()
If you run this, you'll see task 1 and task 2 running simultaneously. Task 2 finishes first, followed by task 1 about a second later, and only then does task 3 start running.wait_for
is useful if you need to wait for another task to finish, but don't need to use its result.
If you do need the to use the result of a submitted upstream task, you can just pass its PrefectFuture
as an argument to another task. If a task sees that it has received a future as an argument, it will wait for it to finish before proceeding.
For example:
import time
from prefect import task, flow
from prefect.task_runners import ConcurrentTaskRunner
@task(name="Task 1")
def task_one():
# sleep for 2 seconds
print("Running task 1...")
time.sleep(2)
print("Task 1 done!")
return "Task 1 output"
@task(name="Task 2")
def task_two():
# sleep for 1 second
print("Running task 2...")
time.sleep(1)
print("Task 2 done!")
return "Task 2 output"
@task(name="Task 3")
def task_three(t1, t2):
# print the output of t1 and t2
print("Running task 3...")
print(t1)
print(t2)
@flow(name="Time flow", task_runner=ConcurrentTaskRunner())
def time_flow():
t1 = task_one.submit()
t2 = task_two.submit()
task_three(t1, t2)
You'll see that tasks 1 and 2 again run asynchronously, and task 3 waits for them to complete before it runs and prints their outputs.scott
01/24/2023, 5:29 AMRyan Peden
01/24/2023, 5:57 AMsubmit
to use wait_for
, because ultimately the Prefect engine ends up checking wait_for
right here.
I can definitely see how it would be nicer to use any awaitable in wait_for
, not just a PrefectFuture
. I have a could of ideas about why it's designed the way it is now, but I'd have to ask a couple of people to find out for sure.scott
01/24/2023, 2:30 PMsubmit
. Thanks again for the help