I have 4 tasks: t1, t2, t3, t4 How to start t3 and...
# prefect-server
r
I have 4 tasks: t1, t2, t3, t4 How to start t3 and t4 after t1 and t2 successfully finished?
a
It sounds like the usual Prefect core approach will do: set t3 and t4 to each have both t1 and t2 as upstream tasks.
n
Hi @Ruslan - @Amanda Wee is correct! To expand a little further, you can set task ordering using the imperative API like this:
Copy code
from prefect import Task, Flow
from prefect.tasks.notifications import SlackTask

class GetValue(Task):
    def run(self):
        return "Test SlackTask!"

flow = Flow("slack-test")

value = GetValue()
slack_task = SlackTask()

slack_task.set_upstream(value, key="message", flow=flow)
or with the functional API like this (both with a data dependency and without shown):
Copy code
from prefect import task, Flow
from prefect.tasks.notifications import SlackTask

@task
def get_value():
    return "Test SlackTask!"

slack_task = SlackTask()

with Flow("slack-test") as flow:
    value = get_value()
    slack_task(message=value) # data dependencies will automatically create upstream/downstream relationships
    slack_task(upstream_tasks=[value]) # if there isn't a data dependency, but you still need to run tasks in a specific order, you can use the `upstream_tasks` kwarg, which takes a list of task references
You can read more about dependencies here 🙂
r
@nicholas thank you for reply! but I don’t see answer on question there, I saw this doc, it is hard to understand and too poor examples there. In addition your examples are about 2 tasks too
https://airflow.apache.org/docs/apache-airflow/stable/concepts.html#additional-functionality this is example of ready for understanding doc with normal examples and graph pictures
this is something what I’m looking for, where “some-other-task” is dummy
n
Sorry you feel that way @Ruslan - you're welcome to PR the docs (or open a ticket) for any examples you feel could be improved. You should be able to do the same for 4 tasks as you would for 2 (or any number).
Copy code
with Flow("some flow") as flow:
  task_1 = t1()
  task_2 = t2()
  t3(upstream_tasks=[task_1, task_2])
  t4(upstream_tasks=[task_1, task_2])
r
yeah thank you, like this! but what if I need to start t3 if t1 succeded and t2 finished (failed or succeded)? (not any_successful)
n
You'll probably want to use the
all_finished
trigger for that.
r
no t1 have to be only succeded
a
What I'd do is introduce a dummy task t5 that runs if t2 finishes, and then t3 depends on both t1 and t5 succeeding.
upvote 1
r
yeah thats it, but what is the best way to make dummy? def dummy(): None or how?
a
You would need the
@prefect.task(trigger=prefect.triggers.all_finished)
decorator too, and probably just
return
in the function.
r
thank you very much Amanda!