Brennan Tolman
08/31/2023, 5:30 PMMarvin
08/31/2023, 5:31 PMset_dependencies
method of the Flow
class to define task dependencies. Here's a simple example:
python
from prefect import Flow, Task
class Task1(Task):
def run(self):
return 'Task 1'
class Task2(Task):
def run(self):
return 'Task 2'
class Task3(Task):
def run(self):
return 'Task 3'
# Instantiate the tasks
task1 = Task1()
task2 = Task2()
task3 = Task3()
# Create a flow
with Flow("My Flow") as flow:
result1 = task1()
result2 = task2()
result3 = task3(upstream_tasks=[result1, result2])
In this example, Task1
and Task2
will run in parallel, and Task3
will wait for both of them to finish before executing. The upstream_tasks
argument of the `Task`'s call method is used to specify the tasks that must be completed before the task can run.
I hope that helps. Or at least, I hope it distracts you from the existential dread of existence for a while.Brennan Tolman
08/31/2023, 5:33 PMMarvin
08/31/2023, 5:33 PMpython
from prefect import Flow, task
@task
def task1():
return 'Task 1'
@task
def task2():
return 'Task 2'
@task
def task3():
return 'Task 3'
# Create a flow
with Flow("My Flow") as flow:
result1 = task1()
result2 = task2()
result3 = task3(upstream_tasks=[result1, result2])
In this example, task1
and task2
will run in parallel, and task3
will wait for both of them to finish before executing. The upstream_tasks
argument of the `task`'s call method is used to specify the tasks that must be completed before the task can run.
I hope that helps. Or at least, I hope it distracts you from the existential dread of existence for a while.Brennan Tolman
08/31/2023, 5:54 PMMarvin
08/31/2023, 5:55 PMpython
from prefect import Flow, task
@task
def task1():
return 'Task 1'
@task
def task2():
return 'Task 2'
@task
def task3(result1, result2):
return 'Task 3'
# Create a flow
with Flow("My Flow") as flow:
result1 = task1()
result2 = task2()
task3(result1, result2)
In this example, task1
and task2
will run in parallel, and task3
will wait for both of them to finish before executing. The results of task1
and task2
are passed as arguments to task3
, which means task3
won't run until task1
and task2
have completed.
I hope that helps. Or at least, I hope it distracts you from the existential dread of existence for a while.Brennan Tolman
08/31/2023, 5:56 PMMarvin
08/31/2023, 6:03 PMState
object associated with that task. You can check the state of a task using the is_successful
method of the State
object. Here's a simple example:
python
from prefect import Flow, task
from prefect.engine.state import Success
@task
def my_task():
return 'Task completed'
# Create a flow
with Flow("My Flow") as flow:
result = my_task()
# Run the flow
state = flow.run()
# Check the state of the task
if state.result[result].is_successful():
print("Task completed successfully.")
else:
print("Task did not complete successfully.")
In this example, my_task
is a simple task that always completes successfully. After running the flow, we check the state of my_task
by looking at the State
object associated with the task's result in the flow's state.
I hope that helps. Or at least, I hope it distracts you from the existential dread of existence for a while.