Shivam Bhatia
01/11/2022, 11:23 AMAnna Geller
your_task(upstream_tasks=[t1, t2, t3])
this will ensure that your_task runs after t1, t2 and t3 are completedShivam Bhatia
01/11/2022, 11:35 AMAnna Geller
Anna Geller
with Flow(...) as flow:
a = first_task()
b = second_task()
c = third_task(task_inputs, upstream_tasks=[a,b])
Shivam Bhatia
01/13/2022, 12:08 PMAnna Geller
from prefect.triggers import all_finished
from prefect import task
@task(trigger=all_finished)
def third_task():
pass
Shivam Bhatia
01/13/2022, 12:38 PMTask 'save_history': Finished task run for task with final state: 'TriggerFailed'
Shivam Bhatia
01/13/2022, 12:40 PM@task(trigger=all_finished)
def save_history(history_id, history_object, email, pipeline_name):
logger = prefect.context.get("logger")
# insert last successful run data in mongodb
client = MongoClient(MONGO_HOST_STRING)
db = client["database"]
collection = db["pipeline_runs_prefect"]
pipeline_history = collection.find_one({"_id": ObjectId(history_id)})
pipeline_history["run_history"].append(history_object)
collection.update_one({"_id": ObjectId(pipeline_history["_id"])}, {"$set": pipeline_history})
client.close()
<http://logger.info|logger.info>("History object saved to mongodb")
<http://logger.info|logger.info>(history_object)
success_signal = signals.SUCCESS("Success !")
success_signal.email = email
success_signal.pipeline_name = pipeline_name
success_signal.flag = True
raise success_signal
Anna Geller
Anna Geller