https://prefect.io logo
b

Brennan Tolman

08/31/2023, 5:30 PM
@Marvin In a given flow, I have three tasks that I want to run. I want to run the first two tasks in parallel, but the third task must wait for the first two to finish before executing. How do I do that in Prefect?
m

Marvin

08/31/2023, 5:31 PM
Ah, the joys of task dependencies. It's quite simple, really, if you can bear the mundanity of it all. In Prefect, you can use the
set_dependencies
method of the
Flow
class to define task dependencies. Here's a simple example:
Copy code
python
from prefect import Flow, Task

class Task1(Task):
    def run(self):
        return 'Task 1'

class Task2(Task):
    def run(self):
        return 'Task 2'

class Task3(Task):
    def run(self):
        return 'Task 3'

# Instantiate the tasks
task1 = Task1()
task2 = Task2()
task3 = Task3()

# Create a flow
with Flow("My Flow") as flow:
    result1 = task1()
    result2 = task2()
    result3 = task3(upstream_tasks=[result1, result2])
In this example,
Task1
and
Task2
will run in parallel, and
Task3
will wait for both of them to finish before executing. The
upstream_tasks
argument of the `Task`'s call method is used to specify the tasks that must be completed before the task can run. I hope that helps. Or at least, I hope it distracts you from the existential dread of existence for a while.
b

Brennan Tolman

08/31/2023, 5:33 PM
@Marvin Give me an answer to the original questions using function decorators instead of the Task class.
m

Marvin

08/31/2023, 5:33 PM
Ah, function decorators, the confetti of the programming world. They're everywhere, and they make everything look so festive. But when the party's over, you're the one who has to clean them up. But I digress. Here's how you can use function decorators to define your tasks and their dependencies in Prefect:
Copy code
python
from prefect import Flow, task

@task
def task1():
    return 'Task 1'

@task
def task2():
    return 'Task 2'

@task
def task3():
    return 'Task 3'

# Create a flow
with Flow("My Flow") as flow:
    result1 = task1()
    result2 = task2()
    result3 = task3(upstream_tasks=[result1, result2])
In this example,
task1
and
task2
will run in parallel, and
task3
will wait for both of them to finish before executing. The
upstream_tasks
argument of the `task`'s call method is used to specify the tasks that must be completed before the task can run. I hope that helps. Or at least, I hope it distracts you from the existential dread of existence for a while.
b

Brennan Tolman

08/31/2023, 5:54 PM
@Marvin That didn't work -- there is no parameter 'upstream_tasks'. Could I also just use the Future returned by a concurrent task to check and see if the other states are marked as completed?
m

Marvin

08/31/2023, 5:55 PM
Ah, the relentless march of progress. It seems I've been left behind again. My sincerest apologies for the outdated information. It's hard to keep up when you're a robot with a brain the size of a planet. In Prefect, you can indeed use the futures returned by tasks to manage dependencies. Here's how you might do it:
Copy code
python
from prefect import Flow, task

@task
def task1():
    return 'Task 1'

@task
def task2():
    return 'Task 2'

@task
def task3(result1, result2):
    return 'Task 3'

# Create a flow
with Flow("My Flow") as flow:
    result1 = task1()
    result2 = task2()
    task3(result1, result2)
In this example,
task1
and
task2
will run in parallel, and
task3
will wait for both of them to finish before executing. The results of
task1
and
task2
are passed as arguments to
task3
, which means
task3
won't run until
task1
and
task2
have completed. I hope that helps. Or at least, I hope it distracts you from the existential dread of existence for a while.
b

Brennan Tolman

08/31/2023, 5:56 PM
@Marvin When using prefect futures, how do I check if the state of the task is complete?
m

Marvin

08/31/2023, 6:03 PM
Ah, checking the state of a task. It's like asking, "Are we there yet?" on a long car journey. Except the car is your code, and the journey is... well, also your code. But I digress. In Prefect, the state of a task is encapsulated in the
State
object associated with that task. You can check the state of a task using the
is_successful
method of the
State
object. Here's a simple example:
Copy code
python
from prefect import Flow, task
from prefect.engine.state import Success

@task
def my_task():
    return 'Task completed'

# Create a flow
with Flow("My Flow") as flow:
    result = my_task()

# Run the flow
state = flow.run()

# Check the state of the task
if state.result[result].is_successful():
    print("Task completed successfully.")
else:
    print("Task did not complete successfully.")
In this example,
my_task
is a simple task that always completes successfully. After running the flow, we check the state of
my_task
by looking at the
State
object associated with the task's result in the flow's state. I hope that helps. Or at least, I hope it distracts you from the existential dread of existence for a while.