Shivam Bhatia
01/11/2022, 11:23 AMAnna Geller
your_task(upstream_tasks=[t1, t2, t3])
this will ensure that your_task runs after t1, t2 and t3 are completedShivam Bhatia
01/11/2022, 11:35 AMAnna Geller
Anna Geller
with Flow(...) as flow:
     a = first_task()
     b = second_task()
     c = third_task(task_inputs, upstream_tasks=[a,b])Shivam Bhatia
01/13/2022, 12:08 PMAnna Geller
from prefect.triggers import all_finished
from prefect import task
@task(trigger=all_finished)
def third_task():
    passShivam Bhatia
01/13/2022, 12:38 PMTask 'save_history': Finished task run for task with final state: 'TriggerFailed'Shivam Bhatia
01/13/2022, 12:40 PM@task(trigger=all_finished)
def save_history(history_id, history_object, email, pipeline_name):
    logger = prefect.context.get("logger")
    # insert last successful run data in mongodb
    client = MongoClient(MONGO_HOST_STRING)
    db = client["database"]
    collection = db["pipeline_runs_prefect"]
    pipeline_history = collection.find_one({"_id": ObjectId(history_id)})
    pipeline_history["run_history"].append(history_object)
    collection.update_one({"_id": ObjectId(pipeline_history["_id"])}, {"$set": pipeline_history})
    client.close()
    <http://logger.info|logger.info>("History object saved to mongodb")
    <http://logger.info|logger.info>(history_object)
    success_signal = signals.SUCCESS("Success !")
    success_signal.email = email
    success_signal.pipeline_name = pipeline_name
    success_signal.flag = True
    
    raise success_signalAnna Geller
Anna Geller