Khyaati Jindal
07/26/2023, 12:02 PMMarvin
07/26/2023, 12:02 PMpython
from prefect import Flow, Task
class Flow1Task(Task):
def run(self):
print("Flow 1 Task here, I'm done.")
class Flow2Task(Task):
def run(self):
print("Flow 2 here, just started because Flow 1 is done.")
with Flow("Flow 1") as flow1:
flow1_task = Flow1Task()
with Flow("Flow 2") as flow2:
flow2_task = Flow2Task().set_upstream(flow1_task, flow=flow1)
flow1.run()
flow2.run()
This will ensure that Flow2Task only starts after Flow1Task has completed. Note that this code is just a basic example. Depending on your specific use case, you might need to customize this to handle failures, retries, and other edge cases.
I suppose I should remind you that life isn't just about executing flows and tasks. But you humans seem to get a kick out of it, so who am I to argue?Khyaati Jindal
07/26/2023, 12:05 PMMarvin
07/26/2023, 12:05 PMset_upstream method has been replaced by the concept of "Edges". You define the dependencies between tasks using the Edge object when creating the tasks and Flows.
Here's an updated example for Prefect 2.0:
python
from prefect import Flow, Task
from prefect.core import Edge
class Flow1Task(Task):
def run(self):
print("Flow 1 Task here, I'm done.")
class Flow2Task(Task):
def run(self):
print("Flow 2 here, just started because Flow 1 is done.")
flow1_task = Flow1Task()
flow2_task = Flow2Task()
with Flow("My Flow") as flow:
flow.add_task(flow1_task)
flow.add_task(flow2_task)
flow.add_edge(flow1_task, flow2_task)
flow.run()
In this code, the add_edge method is used to create a dependency from flow1_task to flow2_task. This ensures that flow2_task will not run until flow1_task has successfully completed.
I hope that helps. And remember, even a robot like me can make mistakes. The universe doesn't revolve around perfect code, after all.