Alex de Geofroy
11/04/2023, 1:44 PMMarvin
11/04/2023, 1:45 PMset_dependencies
method to add a dependency between two tasks. In your case, you want to ensure that task B doesn't start until task A is finished. Here's how you can do it:
from prefect import Flow, Task
class TaskA(Task):
def run(self):
# Your code here
pass
class TaskB(Task):
def run(self):
# Your code here
pass
with Flow("My Flow") as flow:
task_a = TaskA()
task_b = TaskB()
flow.set_dependencies(task=task_b, upstream_tasks=[task_a])
In this example, TaskB
won't run until TaskA
has completed. The set_dependencies
method sets TaskA
as an upstream task of TaskB
, creating a dependency.
However, if TaskA
is a mapped task, you might run into issues because Prefect treats each iteration of the mapped task as a separate task. If you want TaskB
to run after all iterations of TaskA
have completed, you'll need to use a different approach.
One solution could be to use an additional task that collects the results of the mapped task and passes them to TaskB
. Here's an example:
from prefect import Flow, Task
class TaskA(Task):
def run(self):
# Your code here
pass
class TaskB(Task):
def run(self):
# Your code here
pass
class Collector(Task):
def run(self):
# Your code here
pass
with Flow("My Flow") as flow:
task_a = TaskA().map(some_list)
collector = Collector().map(task_a)
task_b = TaskB(upstream_tasks=[collector])
In this example, Collector
is a mapped task that takes the results of TaskA
and TaskB
waits for Collector
to finish. This ensures that TaskB
won't run until all iterations of TaskA
have completed.
Please note that this is a general approach and you might need to adjust it to fit your specific use case.Nate
11/04/2023, 6:16 PMwait_for
in the task docsMarvin
11/04/2023, 6:16 PMwait_for
parameter when calling a task to specify upstream task dependencies. This allows you to control the execution order of tasks that do not share data dependencies. Here's an example:
from prefect import Flow, Task
class TaskA(Task):
def run(self):
# Your code here
pass
class TaskB(Task):
def run(self):
# Your code here
pass
class TaskC(Task):
def run(self):
# Your code here
pass
class TaskD(Task):
def run(self):
# Your code here
pass
with Flow("My Flow") as flow:
a = TaskA().submit()
b = TaskB().submit()
# Wait for TaskA and TaskB to complete
c = TaskC().submit(wait_for=[a, b])
# TaskD will wait for TaskC to complete
# Note: If waiting for one task it must still be in a list.
d = TaskD().submit(wait_for=[c])
In this example, TaskC
won't run until both TaskA
and TaskB
have completed. Similarly, TaskD
won't run until TaskC
has completed. The wait_for
parameter takes a list of tasks that the current task should wait for.
Please note that this is a general approach and you might need to adjust it to fit your specific use case.Nate
11/04/2023, 6:18 PMMarvin
11/04/2023, 6:18 PMwith Flow()
syntax and flow.run().
2. Prefect 2:
- @flow, @task, are the new way to define flows/tasks.
- Subflows are just flows called from within a flow.
- You just call flows now instead of my_flow.run().
- Tasks are just functions, and so are flows (@flow decorated functions). You don't need to subclass Task in Prefect 2 unless you really need to.Nate
11/04/2023, 6:19 PMMarvin
11/04/2023, 6:19 PMwait_for
parameter when calling a task to specify upstream task dependencies. This allows you to control the execution order of tasks that do not share data dependencies. Here's an example:
from prefect import task, flow
@task
def task_a():
pass
@task
def task_b():
pass
@task
def task_c():
pass
@task
def task_d():
pass
@flow
def my_flow():
a = task_a.submit()
b = task_b.submit()
# Wait for task_a and task_b to complete
c = task_c.submit(wait_for=[a, b])
# task_d will wait for task_c to complete
# Note: If waiting for one task it must still be in a list.
d = task_d.submit(wait_for=[c])
In this example, task_c
won't run until both task_a
and task_b
have completed. Similarly, task_d
won't run until task_c
has completed. The wait_for
parameter takes a list of tasks that the current task should wait for.
Please note that this is a general approach and you might need to adjust it to fit your specific use case.