Madison Schott
10/13/2021, 2:54 PMAnna Geller
Madison Schott
10/13/2021, 3:06 PMMadison Schott
10/13/2021, 3:07 PMMadison Schott
10/13/2021, 3:07 PM# web session details model
web_session_visitor_ids_dbt_run = dbt_task(command='dbt run -m web_session_users_mapped web_session_visitor_ids', task_args={"name": "Web Session Visitor Ids"})
web_session_page_views_visitor_ids_joined_dbt_run = dbt_task(command='dbt run -m web_session_page_views_visitor_ids_joined', task_args={"name": "Web Session Page Views Visitor Ids Joined"})
users_to_visitor_ids_dbt_run = dbt_task(command='dbt run -m users_to_visitor_ids', task_args={"name": "Users to Visitor Ids"})
web_session_details_dbt_run = dbt_task(command='dbt run -m web_session_new_sessions_derived web_session_sequences_derived web_session_details', task_args={"name": "Web Session Details"})
page_view_details_dbt_run = dbt_task(command='dbt run -m page_view_details', task_args={"name": "Page View Details"})
Madison Schott
10/13/2021, 3:07 PMweb_session_page_views_visitor_ids_joined_dbt_run.set_upstream(web_session_visitor_ids_dbt_run)
web_session_details_dbt_run.set_upstream(web_session_page_views_visitor_ids_joined_dbt_run)
web_session_details_dbt_run.set_upstream(web_session_visitor_ids_dbt_run)
page_view_details_dbt_run.set_upstream(web_session_details_dbt_run)
page_view_details_dbt_run.set_upstream(web_session_page_views_visitor_ids_joined_dbt_run))
users_to_visitor_ids_dbt_run.set_upstream(web_session_visitor_ids_dbt_run)
Anna Geller
Madison Schott
10/13/2021, 3:09 PMAnna Geller
Madison Schott
10/13/2021, 3:15 PMfivetran_task = FivetranSyncTask()
DBT_REPO_PATH = "/dbt_snowflake/"
dbt_task = DbtShellTask(profile_name='winc',
helper_script = f"cd {DBT_REPO_PATH}",
log_stderr=True,
stream_output=True,
log_stdout=True,
profiles_dir=DBT_REPO_PATH,
environment='prod',
dbt_kwargs = {
"type": "snowflake",
"account": 'fi45185.east-us-2.azure',
"user": 'dbt_user',
"password": PrefectSecret("SNOWFLAKE_PASSWORD"),
"role": 'TRANSFORMER_WH',
"database": 'data_mart_prod',
"warehouse": 'TRANSFORM',
"overwrite_profile": True,
"profile_name": 'winc_prod',
"threads": 200
})
with Flow("data_pipeline_prod", storage=STORAGE, run_config=RUN_CONFIG) as data_pipeline_prod:
# run data syncs with Fivetran
winc_sync = fivetran_task(PrefectSecret("FIVETRAN_API_KEY"),
PrefectSecret("FIVETRAN_API_SECRET"),
connector_id=PrefectSecret("WINC_CONNECTOR_ID"),
poll_status_every_n_seconds=15,
task_args={"name": "Syncing Winc..."})
webevent_page_views_sync = fivetran_task(PrefectSecret("FIVETRAN_API_KEY"),
PrefectSecret("FIVETRAN_API_SECRET"),
connector_id=PrefectSecret("WEBEVENT_PAGE_VIEWS_CONNECTOR_ID"),
poll_status_every_n_seconds=15,
task_args={"name": "Syncing Webevent Page Views..."})
webevents_sync = fivetran_task(PrefectSecret("FIVETRAN_API_KEY"),
PrefectSecret("FIVETRAN_API_SECRET"),
connector_id=PrefectSecret("WEBEVENTS_CONNECTOR_ID"),
poll_status_every_n_seconds=15,
task_args={"name": "Syncing Webevents..."})
shopify_sync = fivetran_task(PrefectSecret("FIVETRAN_API_KEY"),
PrefectSecret("FIVETRAN_API_SECRET"),
connector_id=PrefectSecret("SHOPIFY_CONNECTOR_ID"),
poll_status_every_n_seconds=15,
task_args={"name": "Syncing Shopify..."})
biz_dev_mapping_winc_sync = fivetran_task(PrefectSecret("FIVETRAN_API_KEY"),
PrefectSecret("FIVETRAN_API_SECRET"),
connector_id=PrefectSecret("BIZ_DEV_MAPPING_WINC_CONNECTOR_ID"),
poll_status_every_n_seconds=15,
task_args={"name": "Syncing Biz Dev Mapping Winc..."})
# Media Spend syncs
google_ads_sync = fivetran_task(PrefectSecret("FIVETRAN_API_KEY"),
PrefectSecret("FIVETRAN_API_SECRET"),
connector_id=PrefectSecret("GOOGLE_ADS_CONNECTOR_ID"),
poll_status_every_n_seconds=15,
task_args={"name": "Syncing Google Ads..."})
bing_ads_sync = fivetran_task(PrefectSecret("FIVETRAN_API_KEY"),
PrefectSecret("FIVETRAN_API_SECRET"),
connector_id=PrefectSecret("BING_ADS_CONNECTOR_ID"),
poll_status_every_n_seconds=15,
task_args={"name": "Syncing Bing Ads..."})
facebook_basic_ads_sync = fivetran_task(PrefectSecret("FIVETRAN_API_KEY"),
PrefectSecret("FIVETRAN_API_SECRET"),
connector_id=PrefectSecret("FACEBOOK_BASIC_ADS_CONNECTOR_ID"),
poll_status_every_n_seconds=15,
task_args={"name": "Syncing Facebook basic ads..."})
impact_radius_sync = fivetran_task(PrefectSecret("FIVETRAN_API_KEY"),
PrefectSecret("FIVETRAN_API_SECRET"),
connector_id=PrefectSecret("IMPACT_RADIUS_CONNECTOR_ID"),
poll_status_every_n_seconds=15,
task_args={"name": "Syncing Impact Radius..."})
winc_influencers_sheet_sync = fivetran_task(PrefectSecret("FIVETRAN_API_KEY"),
PrefectSecret("FIVETRAN_API_SECRET"),
connector_id=PrefectSecret("WINC_INFLUENCERS_CONNECTOR_ID"),
poll_status_every_n_seconds=15,
task_args={"name": "Syncing Winc influencer sheet..."})
wwc_influencers_sheet_sync = fivetran_task(PrefectSecret("FIVETRAN_API_KEY"),
PrefectSecret("FIVETRAN_API_SECRET"),
connector_id=PrefectSecret("WWC_INFLUENCERS_CONNECTOR_ID"),
poll_status_every_n_seconds=15,
task_args={"name": "Syncing WWC influencer sheet..."})
winc_misc_marketing_sheet_sync = fivetran_task(PrefectSecret("FIVETRAN_API_KEY"),
PrefectSecret("FIVETRAN_API_SECRET"),
connector_id=PrefectSecret("WINC_MISC_MARKETING_CONNECTOR_ID"),
poll_status_every_n_seconds=15,
task_args={"name": "Syncing Winc misc marketing sheet..."})
wwc_misc_marketing_sheet_sync = fivetran_task(PrefectSecret("FIVETRAN_API_KEY"),
PrefectSecret("FIVETRAN_API_SECRET"),
connector_id=PrefectSecret("WWC_MISC_MARKETING_CONNECTOR_ID"),
poll_status_every_n_seconds=15,
task_args={"name": "Syncing WWC misc marketing sheet..."})
# run dbt models
# user brand campaigns
dim_campaigns_dbt_run = dbt_task(command='dbt run -m dim_campaigns', task_args={"name": "Dim Campaigns"})
user_brand_campaigns_dbt_run = dbt_task(command='dbt run -m user_brand_campaigns', task_args={"name": "User Brand Campaigns"})
# web session details model
web_session_visitor_ids_dbt_run = dbt_task(command='dbt run -m web_session_users_mapped web_session_visitor_ids', task_args={"name": "Web Session Visitor Ids"})
web_session_page_views_visitor_ids_joined_dbt_run = dbt_task(command='dbt run -m web_session_page_views_visitor_ids_joined', task_args={"name": "Web Session Page Views Visitor Ids Joined"})
users_to_visitor_ids_dbt_run = dbt_task(command='dbt run -m users_to_visitor_ids', task_args={"name": "Users to Visitor Ids"})
web_session_details_dbt_run = dbt_task(command='dbt run -m web_session_new_sessions_derived web_session_sequences_derived web_session_details', task_args={"name": "Web Session Details"})
page_view_details_dbt_run = dbt_task(command='dbt run -m page_view_details', task_args={"name": "Page View Details"})
# user details model
user_details_biz_dev_mapping_joined = dbt_task(command='dbt run -m user_details_biz_dev_mapping_joined', task_args={"name": "User Details Biz Dev Mapping Joined"})
dim_subscriptions_dbt_run = dbt_task(command='dbt run -m dim_subscriptions', task_args={"name": "Dim Subscriptions"})
user_details_dbt_run = dbt_task(command='dbt run -m user_subscription_details user_details', task_args={"name": "User Details"})
# media spend mode
media_spend_shared_sources_dbt_run = dbt_task(command='dbt run -m media_spend_facebook_summed media_spend_google_ads_joined media_spend_influencers_joined media_spend_misc_marketing_joined', task_args={"name": "Media Spend"})
media_spend_cactus_media_dbt_run = dbt_task(command='dbt run -m media_spend_cactus_media_sales_derived', task_args={"name": "Cactus Media"})
media_spend_wwc_dbt_run = dbt_task(command='dbt run -m media_spend_wwc', task_args={"name": "Media Spend WWC"})
media_spend_winc_dbt_run = dbt_task(command='dbt run -m media_spend_bing_ads_joined media_spend_impact_radius_summed media_spend_winc', task_args={"name": "Media Spend Winc"})
#dbt_cleanup_old_models = dbt_task(command='dbt run-operation drop_old_relations', task_args={"name": "Cleaning up old models..."})
# set dependencies for dbt model runs
# models that depend on data sources syncing
dim_campaigns_dbt_run.set_upstream(winc_sync)
dim_subscriptions_dbt_run.set_upstream(winc_sync)
user_brand_campaigns_dbt_run.set_upstream(winc_sync)
web_session_visitor_ids_dbt_run.set_upstream(webevent_page_views_sync)
user_details_biz_dev_mapping_joined.set_upstream(winc_sync)
user_details_biz_dev_mapping_joined.set_upstream(biz_dev_mapping_winc_sync)
user_details_dbt_run.set_upstream(winc_sync)
media_spend_shared_sources_dbt_run.set_upstream(facebook_basic_ads_sync)
media_spend_shared_sources_dbt_run.set_upstream(google_ads_sync)
media_spend_shared_sources_dbt_run.set_upstream(winc_influencers_sheet_sync)
media_spend_shared_sources_dbt_run.set_upstream(wwc_influencers_sheet_sync)
media_spend_shared_sources_dbt_run.set_upstream(winc_misc_marketing_sheet_sync)
media_spend_shared_sources_dbt_run.set_upstream(wwc_misc_marketing_sheet_sync )
media_spend_winc_dbt_run.set_upstream(bing_ads_sync)
media_spend_winc_dbt_run.set_upstream(impact_radius_sync)
# models that depend on other models
user_brand_campaigns_dbt_run.set_upstream(dim_campaigns_dbt_run)
web_session_page_views_visitor_ids_joined_dbt_run.set_upstream(web_session_visitor_ids_dbt_run)
web_session_details_dbt_run.set_upstream(web_session_page_views_visitor_ids_joined_dbt_run)
web_session_details_dbt_run.set_upstream(web_session_visitor_ids_dbt_run)
page_view_details_dbt_run.set_upstream(web_session_details_dbt_run)
page_view_details_dbt_run.set_upstream(web_session_page_views_visitor_ids_joined_dbt_run)
users_to_visitor_ids_dbt_run.set_upstream(web_session_visitor_ids_dbt_run)
user_details_biz_dev_mapping_joined.set_upstream(dim_campaigns_dbt_run)
user_details_dbt_run.set_upstream(user_details_biz_dev_mapping_joined)
user_details_dbt_run.set_upstream(dim_subscriptions_dbt_run)
media_spend_wwc_dbt_run.set_upstream(media_spend_shared_sources_dbt_run)
media_spend_winc_dbt_run.set_upstream(media_spend_shared_sources_dbt_run)
media_spend_winc_dbt_run.set_upstream(media_spend_cactus_media_dbt_run)
media_spend_cactus_media_dbt_run.set_upstream(web_session_details_dbt_run)
data_pipeline_prod.register(project_name="Winc_Prod")
Anna Geller
Madison Schott
10/13/2021, 3:20 PMMadison Schott
10/13/2021, 3:20 PMAnna Geller
PrefectSecret("SNOWFLAKE_PASSWORD")
- ideally, this should be moved to the Flow constructor because this is a first-class Prefect task.Madison Schott
10/13/2021, 3:40 PMAnna Geller
with Flow("data_pipeline_prod", storage=STORAGE,run_config=RUN_CONFIG) as data_pipeline_prod:
dbt_secret = PrefectSecret("SNOWFLAKE_PASSWORD")
dim_campaigns_dbt_run = dbt_task(command='dbt run -m dim_campaigns',task_args={"name": "Dim Campaigns"},dbt_kwargs={"password": dbt_secret})
Or you could change the flow structure to use the Secret at the module level with the client-level Secret:
from prefect.client import Secret
dbt_task = DbtShellTask(profile_name='winc',
helper_script = f"cd {DBT_REPO_PATH}",
log_stderr=True,
stream_output=True,
log_stdout=True,
profiles_dir=DBT_REPO_PATH,
environment='prod',
dbt_kwargs = {
"type": "snowflake",
"account": 'fi45185.east-us-2.azure',
"user": 'dbt_user',
"password": Secret("SNOWFLAKE_PASSWORD").get(),
"role": 'TRANSFORMER_WH',
"database": 'data_mart_prod',
"warehouse": 'TRANSFORM',
"overwrite_profile": True,
"profile_name": 'winc_prod',
"threads": 200
})
The reason why this happens is that PrefectSecret
is a first-class Prefect task, and therefore it's evaluated at runtime (when you run the flow), not at build time (when you register the flow).Madison Schott
10/13/2021, 3:42 PM000904 (42000): SQL compilation error: error line 7 at position 4
10:40:12
INFO
DbtShellTask
invalid identifier 'PAGE_VIEW_VISITOR_IDS_JOINED.USER_PROFILE_ID'
Madison Schott
10/13/2021, 3:42 PMMadison Schott
10/13/2021, 3:42 PMMadison Schott
10/13/2021, 3:44 PMError during execution of task: HTTPError('404 Client Error: Not Found for url: <https://api.fivetran.com/v1/connectors/%0Alethal_conservation>')
Anna Geller
Madison Schott
10/13/2021, 3:57 PMMadison Schott
10/13/2021, 3:58 PMAnna Geller
Anna Geller