Is it possible to make it such that one task runs after all the other tasks are completed?
a
Anna Geller
01/11/2022, 11:32 AM
absolutely, That’s what dependencies are for. In any of your tasks, you can add:
Copy code
your_task(upstream_tasks=[t1, t2, t3])
this will ensure that your_task runs after t1, t2 and t3 are completed
s
Shivam Bhatia
01/11/2022, 11:35 AM
Does this go in the function definition ( I am using function decorator tasks) or when I call the task in the flow
a
Anna Geller
01/11/2022, 11:36 AM
correct, when you call it
Anna Geller
01/11/2022, 11:37 AM
Copy code
with Flow(...) as flow:
a = first_task()
b = second_task()
c = third_task(task_inputs, upstream_tasks=[a,b])
s
Shivam Bhatia
01/13/2022, 12:08 PM
@Anna Geller When I use the upstream tasks, I have noticed that if my upstream task fails then the task c (from your example) also fails
Is it possible that I run task c after a and b but regardless of whether a and b were successful or not.
a
Anna Geller
01/13/2022, 12:12 PM
you can attach the trigger:
Copy code
from prefect.triggers import all_finished
from prefect import task
@task(trigger=all_finished)
def third_task():
pass
s
Shivam Bhatia
01/13/2022, 12:38 PM
The third task is getting the trigger failed state
even after the all_finished trigger
The second task has an error and is failing
Am I missing something?
Copy code
Task 'save_history': Finished task run for task with final state: 'TriggerFailed'
Shivam Bhatia
01/13/2022, 12:40 PM
This is the third function (which is supposed to run regardless of whether the first and second fail)
Copy code
@task(trigger=all_finished)
def save_history(history_id, history_object, email, pipeline_name):
logger = prefect.context.get("logger")
# insert last successful run data in mongodb
client = MongoClient(MONGO_HOST_STRING)
db = client["database"]
collection = db["pipeline_runs_prefect"]
pipeline_history = collection.find_one({"_id": ObjectId(history_id)})
pipeline_history["run_history"].append(history_object)
collection.update_one({"_id": ObjectId(pipeline_history["_id"])}, {"$set": pipeline_history})
client.close()
<http://logger.info|logger.info>("History object saved to mongodb")
<http://logger.info|logger.info>(history_object)
success_signal = signals.SUCCESS("Success !")
success_signal.email = email
success_signal.pipeline_name = pipeline_name
success_signal.flag = True
raise success_signal
a
Anna Geller
01/13/2022, 1:02 PM
can you share your entire flow, or build a small example I can reproduce to better understand the issue?
Anna Geller
01/13/2022, 1:04 PM
and you don’t have to raise success signal for a task to finish with Success state. As long as it doesn’t raise any exception, it will automatically be marked as Success
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.