Neil Natarajan
08/11/2022, 8:59 PMKhuyen Tran
08/11/2022, 9:21 PM.submit()
to each task to make the tasks inside your flow run asynchronously: https://docs.prefect.io/concepts/task-runners/#using-a-task-runnerNeil Natarajan
08/11/2022, 9:26 PMNeil Natarajan
08/11/2022, 10:01 PMJeff Hale
08/11/2022, 10:34 PMNate
08/11/2022, 11:28 PM@task
def square(x: int) -> int:
return x**2
@flow
def my_flow():
numbers = [i for i in range(10)]
squared_numbers = square.map(numbers)
my_flow()
will give logs like
18:27:44.809 | INFO | prefect.engine - Created flow run 'elegant-coua' for flow 'my-flow'
...
18:27:45.846 | INFO | Task run 'square-2b9b4656-0' - Finished in state Completed()
18:27:45.904 | INFO | Task run 'square-2b9b4656-2' - Finished in state Completed()
18:27:45.912 | INFO | Task run 'square-2b9b4656-8' - Finished in state Completed()
18:27:45.915 | INFO | Task run 'square-2b9b4656-5' - Finished in state Completed()
18:27:45.925 | INFO | Task run 'square-2b9b4656-3' - Finished in state Completed()
18:27:45.936 | INFO | Task run 'square-2b9b4656-6' - Finished in state Completed()
18:27:45.936 | INFO | Task run 'square-2b9b4656-7' - Finished in state Completed()
18:27:45.957 | INFO | Task run 'square-2b9b4656-4' - Finished in state Completed()
18:27:46.015 | INFO | Task run 'square-2b9b4656-9' - Finished in state Completed()
18:27:46.140 | INFO | Task run 'square-2b9b4656-1' - Finished in state Completed()
18:27:46.252 | INFO | Flow run 'elegant-coua' - Finished in state Completed('All states completed.')
Neil Natarajan
08/12/2022, 3:28 PMNeil Natarajan
08/12/2022, 3:29 PM@task(name="map_task")
def map_task(x, y):
if x == 2:
raise Exception
return x + y
@task(name="map task 2")
def map_task_2(nr):
print(f"Map Task 2: {time.time()}: {nr}")
time.sleep(5)
print(f"Map Task 2: {time.time()} : {nr}")
@flow(name="outer flow", task_runner=DaskTaskRunner())
def outer_flow(numbers, static_num):
futures = map_task.map(x=numbers, y=[static_num] * len(numbers))
map_task_2.map(nr=futures)
if __name__ == "__main__":
workflow_result = outer_flow([1, 2, 3, 4, 5], 2)
Neil Natarajan
08/12/2022, 3:30 PMKhuyen Tran
08/12/2022, 3:33 PMmap_task_2.map(nr=futures)
is blocked by futures = map_task.map(x=numbers, y=[static_num] * len(numbers))
.
For example,
from prefect import flow, task
from time import sleep
@task(name="Task 1")
def task_1():
sleep(5)
@task(name="Task 2")
def task_2():
sleep(2)
@flow
def my_flow():
task_1()
task_2()
if __name__ == "__main__":
my_flow()
Output:
09:37:44.528 | INFO | prefect.engine - Created flow run 'prehistoric-hyrax' for flow 'my-flow'
09:37:45.275 | INFO | Flow run 'prehistoric-hyrax' - Created task run 'Task 1-1619c4ff-0' for task 'Task 1'
09:37:45.276 | INFO | Flow run 'prehistoric-hyrax' - Executing 'Task 1-1619c4ff-0' immediately...
09:37:50.594 | INFO | Task run 'Task 1-1619c4ff-0' - Finished in state Completed()
09:37:50.679 | INFO | Flow run 'prehistoric-hyrax' - Created task run 'Task 2-49f2f021-0' for task 'Task 2'
09:37:50.680 | INFO | Flow run 'prehistoric-hyrax' - Executing 'Task 2-49f2f021-0' immediately...
09:37:52.938 | INFO | Task run 'Task 2-49f2f021-0' - Finished in state Completed()
09:37:53.041 | INFO | Flow run 'prehistoric-hyrax' - Finished in state Completed('All states completed.')
Task 2 will wait for Task 1 to executeKhuyen Tran
08/12/2022, 3:33 PMsubmit
will make the task run concurrently:
from prefect import flow, task
from time import sleep
@task(name="Task 1")
def task_1():
sleep(5)
@task(name="Task 2")
def task_2():
sleep(2)
@flow
def my_flow():
task_1.submit()
task_2.submit()
if __name__ == "__main__":
my_flow()
09:38:35.746 | INFO | prefect.engine - Created flow run 'amiable-marmot' for flow 'my-flow'
09:38:36.462 | INFO | Flow run 'amiable-marmot' - Created task run 'Task 1-1619c4ff-0' for task 'Task 1'
09:38:36.463 | INFO | Flow run 'amiable-marmot' - Submitted task run 'Task 1-1619c4ff-0' for execution.
09:38:36.603 | INFO | Flow run 'amiable-marmot' - Created task run 'Task 2-49f2f021-0' for task 'Task 2'
09:38:36.603 | INFO | Flow run 'amiable-marmot' - Submitted task run 'Task 2-49f2f021-0' for execution.
09:38:38.850 | INFO | Task run 'Task 2-49f2f021-0' - Finished in state Completed()
09:38:41.711 | INFO | Task run 'Task 1-1619c4ff-0' - Finished in state Completed()
09:38:41.810 | INFO | Flow run 'amiable-marmot' - Finished in state Completed('All states completed.')
Khuyen Tran
08/12/2022, 3:39 PM@task(name="map_task")
def map_task(x, y):
if x == 2:
raise Exception
return x + y
@task(name="map task 2")
def map_task_2(nr):
print(f"Map Task 2: {time.time()}: {nr}")
time.sleep(5)
print(f"Map Task 2: {time.time()} : {nr}")
@flow(name="outer flow")
def outer_flow(numbers, static_num):
futures = [map_task.submit(x=num, y=static_num) for num in numbers]
[map_task_2.submit(nr=num) for num in futures]
if __name__ == "__main__":
workflow_result = outer_flow([1, 2, 3, 4, 5], 2)
Neil Natarajan
08/12/2022, 3:44 PMNeil Natarajan
08/12/2022, 3:44 PMZanie
Neil Natarajan
08/12/2022, 7:15 PMNeil Natarajan
08/12/2022, 7:15 PMBring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by