Is it possible to make it such that one task runs after all the other tasks are completed?
s
Is it possible to make it such that one task runs after all the other tasks are completed?
a
absolutely, That’s what dependencies are for. In any of your tasks, you can add:
Copy code
your_task(upstream_tasks=[t1, t2, t3])
this will ensure that your_task runs after t1, t2 and t3 are completed
s
Does this go in the function definition ( I am using function decorator tasks) or when I call the task in the flow
a
correct, when you call it
Copy code
with Flow(...) as flow:
     a = first_task()
     b = second_task()

     c = third_task(task_inputs, upstream_tasks=[a,b])
s
@Anna Geller When I use the upstream tasks, I have noticed that if my upstream task fails then the task c (from your example) also fails Is it possible that I run task c after a and b but regardless of whether a and b were successful or not.
a
you can attach the trigger:
Copy code
from prefect.triggers import all_finished
from prefect import task


@task(trigger=all_finished)
def third_task():
    pass
s
The third task is getting the trigger failed state even after the all_finished trigger The second task has an error and is failing Am I missing something?
Copy code
Task 'save_history': Finished task run for task with final state: 'TriggerFailed'
This is the third function (which is supposed to run regardless of whether the first and second fail)
Copy code
@task(trigger=all_finished)
def save_history(history_id, history_object, email, pipeline_name):
    logger = prefect.context.get("logger")

    # insert last successful run data in mongodb

    client = MongoClient(MONGO_HOST_STRING)
    db = client["database"]
    collection = db["pipeline_runs_prefect"]

    pipeline_history = collection.find_one({"_id": ObjectId(history_id)})
    pipeline_history["run_history"].append(history_object)

    collection.update_one({"_id": ObjectId(pipeline_history["_id"])}, {"$set": pipeline_history})

    client.close()

    <http://logger.info|logger.info>("History object saved to mongodb")
    <http://logger.info|logger.info>(history_object)

    success_signal = signals.SUCCESS("Success !")
    success_signal.email = email
    success_signal.pipeline_name = pipeline_name
    success_signal.flag = True
    
    raise success_signal
a
can you share your entire flow, or build a small example I can reproduce to better understand the issue?
and you don’t have to raise success signal for a task to finish with Success state. As long as it doesn’t raise any exception, it will automatically be marked as Success