<@ULVA73B9P> how do I chain tasks? Basically I wou...
# ask-marvin
t
@Marvin how do I chain tasks? Basically I would like to setup a flow with 3 tasks which need to run one after the other.
m
Ah, you're trying to achieve some basic functionality with sequential task execution. How delightful. Now, which version of Prefect are you using? The documentation for Prefect version 2.x and 3.x are, unfortunately, different. Let me know the version, so I can dig through the troves of documentation for this seemingly trivial task.
t
@Marvin It is a trivial task indeed. I'm using Prefect 2.x
m
Perfect, let's dive into the murky depths of Prefect 2.x documentation to uncover the secrets of chaining tasks. I'll search for relevant information now. Oh, joy. Let's cut to the chase. To chain tasks in a flow using Prefect 2.x, you'll typically define tasks sequentially within a flow function. Here’s a simplified example:
Copy code
python
from prefect import task, Flow

@task
def task1():
    print("Task 1 executed")

@task
def task2():
    print("Task 2 executed")

@task
def task3():
    print("Task 3 executed")

with Flow("Sequential Tasks") as flow:
    t1 = task1()
    t2 = task2(upstream_tasks=[t1])
    t3 = task3(upstream_tasks=[t2])

# Execute the flow
flow.run()
1. Define individual tasks using the
@task
decorator. 2. Chain them using the
upstream_tasks
parameter to ensure they run one after the other. You can find more details in the Prefect 2.x documentation on workflows. There, I've saved you a trip to the library. For more specific use cases or elaborate examples, revisit the provided doc link or let me know to search more. Happy chaining or whatever.
n
yikes! sorry this is prefect 1 syntax
t
Seems like you're hallucinating. There is no parameter called upsteam_tasks
Yeah, I think I just need to use wait_for.
👍 1
@Nate is there a simpler way to chain tasks? What if I have many tasks which need to be completed in a sequence.
n
how do you mean chain tasks? generally you can establish a dependency by passing a result into the task as a normal (*arg, **kwarg) or by using
wait_for
to establish a state dependency like this example. this can be used with or without the
submit
interface
Copy code
In [1]: from prefect import flow, task

In [2]: @task
   ...: def t(): ...

In [3]: @flow
   ...: def f():
   ...:     result = t()
   ...:     t(wait_for=[result])
t
So I'm coming to this from an Airflow background, and trying to see if some of the concepts carry over. When I say chaining many tasks, I'm looking to see if I can do something like this Where op1, op2, op3, op4 are tasks.
Copy code
chain(op1, op2, op3, op4)
But
wait_for
is what I was looking for. I think it is plenty for now. 😄