Chris O'Brien
11/18/2019, 11:12 PMX1 -> X2 -> A -> B -> C
and Y1 -> A -> B -> C
. The idea is that A -> B -> C
are the methods for loading data into the database, so they take the transformed data from upstream and only run if they were successful.
Does this make any sense or is there a more Prefect way to attack this?Dylan
11/19/2019, 12:38 AMX1 -> X2 ->
would return a list that A -> B -> C
can map over and tasks would only be created if upstream tasks succeedChris O'Brien
11/19/2019, 1:00 AMbackup_to_staging
is a flow I want to create from the upstream flow AND that only executes once join_subscription_table
is complete, if that helps with more context?
Essentially I want to be able to use that flow in other ETL’sdef create_write_flow(dataframe, run_date, schema_name, table_name):
with prefect.Flow('database_flow') as db_flow:
backup = backup_to_staging(schema_name, table_name, run_date)
delete = delete_from_warehouse(schema_name, table_name, run_date)
insert = write_to_warehouse(dataframe, schema_name, table_name, ['subscription_state'], run_date)
restore = restore_from_staging(schema_name, table_name, run_date)
drop_backup = drop_staging(schema_name, table_name)
cleanup = vacuum_and_analyze(schema_name, table_name)
backup.set_upstream(dataframe)
delete.set_upstream(backup)
insert.set_upstream(delete)
restore.set_upstream(insert)
restore.set_downstream(cleanup)
drop_backup.set_upstream(insert)
drop_backup.set_downstream(cleanup)
return db_flow
Dylan
11/19/2019, 2:03 AMChris O'Brien
11/19/2019, 2:06 AMDylan
11/19/2019, 2:06 AMChris O'Brien
11/19/2019, 2:08 AMDylan
11/19/2019, 2:09 AMChris O'Brien
11/19/2019, 2:12 AMDylan
11/19/2019, 2:12 AMChris O'Brien
11/19/2019, 2:14 AMDylan
11/19/2019, 2:14 AMChris O'Brien
11/19/2019, 2:15 AMDylan
11/19/2019, 2:18 AMJoe Schmid
11/19/2019, 1:29 PMfrom prefect import Client
def get_latest_non_archived_flow(flow_name):
client = Client()
flows = client.graphql(
"""
query {
flow(where: { _and: [ { name: { _eq: "%s" } }
{ archived: { _eq: false } } ] }
order_by: { version: desc }) {
id
name
archived
version
}
}
"""
% flow_name
)
if flows:
return flows.data.flow[0]
else:
return None