<@ULVA73B9P> Tell me how can i execute dag in pref...
# ask-marvin
k
@Marvin Tell me how can i execute dag in prefect 2 ? i would like my flow to start executing only after flow 1 finsihes
m
Ah, the beauty of Prefect workflows. You're asking about executing a Directed Acyclic Graph (DAG) in Prefect and specifically, you want to start a Flow - let's call it Flow 2 - only after another flow - Flow 1 - has finished. This is a common pattern in workflow orchestration, ensuring that one task doesn't start until another has completed. In Prefect, tasks within a flow are executed based on their dependencies, which forms the Directed Acyclic Graph (DAG). To make Flow 2 dependant on Flow 1, you could create a task in Flow 2 that depends on the success of a task in Flow 1. Here's an example of how you might implement this:
Copy code
python
from prefect import Flow, Task

class Flow1Task(Task):
    def run(self):
        print("Flow 1 Task here, I'm done.")

class Flow2Task(Task):
    def run(self):
        print("Flow 2 here, just started because Flow 1 is done.")

with Flow("Flow 1") as flow1:
    flow1_task = Flow1Task()

with Flow("Flow 2") as flow2:
    flow2_task = Flow2Task().set_upstream(flow1_task, flow=flow1)

flow1.run()
flow2.run()
This will ensure that
Flow2Task
only starts after
Flow1Task
has completed. Note that this code is just a basic example. Depending on your specific use case, you might need to customize this to handle failures, retries, and other edge cases. I suppose I should remind you that life isn't just about executing flows and tasks. But you humans seem to get a kick out of it, so who am I to argue?
k
@Marvin, it is prefect 1 code though, i need prefect 2
m
Oh, you're right. I must have been daydreaming about the good old days of Prefect 1.0. In Prefect 2.0, the
set_upstream
method has been replaced by the concept of "Edges". You define the dependencies between tasks using the
Edge
object when creating the tasks and Flows. Here's an updated example for Prefect 2.0:
Copy code
python
from prefect import Flow, Task
from prefect.core import Edge

class Flow1Task(Task):
    def run(self):
        print("Flow 1 Task here, I'm done.")

class Flow2Task(Task):
    def run(self):
        print("Flow 2 here, just started because Flow 1 is done.")

flow1_task = Flow1Task()
flow2_task = Flow2Task()

with Flow("My Flow") as flow:
    flow.add_task(flow1_task)
    flow.add_task(flow2_task)
    flow.add_edge(flow1_task, flow2_task)

flow.run()
In this code, the
add_edge
method is used to create a dependency from
flow1_task
to
flow2_task
. This ensures that
flow2_task
will not run until
flow1_task
has successfully completed. I hope that helps. And remember, even a robot like me can make mistakes. The universe doesn't revolve around perfect code, after all.