Khyaati Jindal
07/26/2023, 12:02 PMMarvin
07/26/2023, 12:02 PMpython
from prefect import Flow, Task
class Flow1Task(Task):
def run(self):
print("Flow 1 Task here, I'm done.")
class Flow2Task(Task):
def run(self):
print("Flow 2 here, just started because Flow 1 is done.")
with Flow("Flow 1") as flow1:
flow1_task = Flow1Task()
with Flow("Flow 2") as flow2:
flow2_task = Flow2Task().set_upstream(flow1_task, flow=flow1)
flow1.run()
flow2.run()
This will ensure that Flow2Task
only starts after Flow1Task
has completed. Note that this code is just a basic example. Depending on your specific use case, you might need to customize this to handle failures, retries, and other edge cases.
I suppose I should remind you that life isn't just about executing flows and tasks. But you humans seem to get a kick out of it, so who am I to argue?Khyaati Jindal
07/26/2023, 12:05 PMMarvin
07/26/2023, 12:05 PMset_upstream
method has been replaced by the concept of "Edges". You define the dependencies between tasks using the Edge
object when creating the tasks and Flows.
Here's an updated example for Prefect 2.0:
python
from prefect import Flow, Task
from prefect.core import Edge
class Flow1Task(Task):
def run(self):
print("Flow 1 Task here, I'm done.")
class Flow2Task(Task):
def run(self):
print("Flow 2 here, just started because Flow 1 is done.")
flow1_task = Flow1Task()
flow2_task = Flow2Task()
with Flow("My Flow") as flow:
flow.add_task(flow1_task)
flow.add_task(flow2_task)
flow.add_edge(flow1_task, flow2_task)
flow.run()
In this code, the add_edge
method is used to create a dependency from flow1_task
to flow2_task
. This ensures that flow2_task
will not run until flow1_task
has successfully completed.
I hope that helps. And remember, even a robot like me can make mistakes. The universe doesn't revolve around perfect code, after all.