Chris O'Brien
11/18/2019, 11:12 PMX1 -> X2 -> A -> B -> CY1 -> A -> B -> CA -> B -> CDylan
Dylan
X1 -> X2 ->A -> B -> CDylan
Chris O'Brien
11/19/2019, 1:00 AMChris O'Brien
11/19/2019, 2:00 AMbackup_to_stagingjoin_subscription_tableChris O'Brien
11/19/2019, 2:00 AMChris O'Brien
11/19/2019, 2:02 AMdef create_write_flow(dataframe, run_date, schema_name, table_name):
    with prefect.Flow('database_flow') as db_flow:
        backup = backup_to_staging(schema_name, table_name, run_date)
        delete = delete_from_warehouse(schema_name, table_name, run_date)
        insert = write_to_warehouse(dataframe, schema_name, table_name, ['subscription_state'], run_date)
        restore = restore_from_staging(schema_name, table_name, run_date)
        drop_backup = drop_staging(schema_name, table_name)
        cleanup = vacuum_and_analyze(schema_name, table_name)
        
        backup.set_upstream(dataframe)
        delete.set_upstream(backup)
        insert.set_upstream(delete)
        restore.set_upstream(insert)
        restore.set_downstream(cleanup)
        drop_backup.set_upstream(insert)
        drop_backup.set_downstream(cleanup)
    return db_flowDylan
Dylan
Chris O'Brien
11/19/2019, 2:06 AMDylan
Dylan
Dylan
Dylan
Dylan
Dylan
Dylan
Chris O'Brien
11/19/2019, 2:08 AMDylan
Dylan
Dylan
Dylan
Dylan
Dylan
Chris O'Brien
11/19/2019, 2:12 AMDylan
Dylan
Chris O'Brien
11/19/2019, 2:14 AMDylan
Chris O'Brien
11/19/2019, 2:15 AMDylan
Dylan
Dylan
Dylan
Joe Schmid
11/19/2019, 1:29 PMfrom prefect import Client
def get_latest_non_archived_flow(flow_name):
    client = Client()
    flows = client.graphql(
        """
             query {
                  flow(where: { _and: [ { name: { _eq: "%s" } }
                                        { archived: { _eq: false } } ] } 
                       order_by: { version: desc }) {
                    id
                    name
                    archived
                    version
                  }
                }
                """
        % flow_name
    )
    if flows:
        return flows.data.flow[0]
    else:
        return None