Hi all, I'm getting an error in my Prefect run say...
# ask-community
m
Hi all, I'm getting an error in my Prefect run saying a column in my dbt model doesn't exist, but when I run the model locally everything works fine. I recently updated the Docker container so the code matches. Any ideas why I could be getting this?
a
Hi @Madison Schott! Could you share your Flow and give us more details about your Prefect setup? I assume you use Docker storage?
m
Yup, I use Docker, the flow is kinda long to post in here
Here is the part causing toruble
Copy code
# web session details model
    web_session_visitor_ids_dbt_run = dbt_task(command='dbt run -m web_session_users_mapped web_session_visitor_ids', task_args={"name": "Web Session Visitor Ids"})
    web_session_page_views_visitor_ids_joined_dbt_run = dbt_task(command='dbt run -m web_session_page_views_visitor_ids_joined', task_args={"name": "Web Session Page Views Visitor Ids Joined"})
    users_to_visitor_ids_dbt_run = dbt_task(command='dbt run -m users_to_visitor_ids', task_args={"name": "Users to Visitor Ids"})
    web_session_details_dbt_run = dbt_task(command='dbt run -m web_session_new_sessions_derived web_session_sequences_derived web_session_details', task_args={"name": "Web Session Details"})

    page_view_details_dbt_run = dbt_task(command='dbt run -m page_view_details', task_args={"name": "Page View Details"})
Copy code
web_session_page_views_visitor_ids_joined_dbt_run.set_upstream(web_session_visitor_ids_dbt_run)
    web_session_details_dbt_run.set_upstream(web_session_page_views_visitor_ids_joined_dbt_run)
    web_session_details_dbt_run.set_upstream(web_session_visitor_ids_dbt_run)
    page_view_details_dbt_run.set_upstream(web_session_details_dbt_run)
    page_view_details_dbt_run.set_upstream(web_session_page_views_visitor_ids_joined_dbt_run))
    users_to_visitor_ids_dbt_run.set_upstream(web_session_visitor_ids_dbt_run)
a
you could also share a Github Gist of your Flow if it's long
m
I don't have it on a public repo
a
You can hide sensitive information and create a public Gist if you want. But really, pasting the entire Flow configuration here would be fine as well.
m
Copy code
fivetran_task = FivetranSyncTask()

DBT_REPO_PATH = "/dbt_snowflake/"

dbt_task = DbtShellTask(profile_name='winc',
                        helper_script = f"cd {DBT_REPO_PATH}",
                        log_stderr=True,
                        stream_output=True,
                        log_stdout=True,
                        profiles_dir=DBT_REPO_PATH,
                        environment='prod',
                        dbt_kwargs = {
                            "type": "snowflake",
                            "account": 'fi45185.east-us-2.azure',
                            "user": 'dbt_user',
                            "password": PrefectSecret("SNOWFLAKE_PASSWORD"),
                            "role": 'TRANSFORMER_WH',
                            "database": 'data_mart_prod',
                            "warehouse": 'TRANSFORM',
                            "overwrite_profile": True,
                            "profile_name": 'winc_prod',
                            "threads": 200
                        })

with Flow("data_pipeline_prod", storage=STORAGE, run_config=RUN_CONFIG) as data_pipeline_prod:
    # run data syncs with Fivetran
    winc_sync = fivetran_task(PrefectSecret("FIVETRAN_API_KEY"),
                                   PrefectSecret("FIVETRAN_API_SECRET"),
                                    connector_id=PrefectSecret("WINC_CONNECTOR_ID"),
                                    poll_status_every_n_seconds=15,
                                    task_args={"name": "Syncing Winc..."})

    webevent_page_views_sync = fivetran_task(PrefectSecret("FIVETRAN_API_KEY"),
                                   PrefectSecret("FIVETRAN_API_SECRET"),
                                    connector_id=PrefectSecret("WEBEVENT_PAGE_VIEWS_CONNECTOR_ID"),
                                    poll_status_every_n_seconds=15,
                                    task_args={"name": "Syncing Webevent Page Views..."})

    webevents_sync = fivetran_task(PrefectSecret("FIVETRAN_API_KEY"),
                                   PrefectSecret("FIVETRAN_API_SECRET"),
                                    connector_id=PrefectSecret("WEBEVENTS_CONNECTOR_ID"),
                                    poll_status_every_n_seconds=15,
                                    task_args={"name": "Syncing Webevents..."})

    shopify_sync = fivetran_task(PrefectSecret("FIVETRAN_API_KEY"),
                                   PrefectSecret("FIVETRAN_API_SECRET"),
                                    connector_id=PrefectSecret("SHOPIFY_CONNECTOR_ID"),
                                    poll_status_every_n_seconds=15,
                                    task_args={"name": "Syncing Shopify..."})

    biz_dev_mapping_winc_sync = fivetran_task(PrefectSecret("FIVETRAN_API_KEY"),
                                   PrefectSecret("FIVETRAN_API_SECRET"),
                                    connector_id=PrefectSecret("BIZ_DEV_MAPPING_WINC_CONNECTOR_ID"),
                                    poll_status_every_n_seconds=15,
                                    task_args={"name": "Syncing Biz Dev Mapping Winc..."})
    # Media Spend syncs
    google_ads_sync = fivetran_task(PrefectSecret("FIVETRAN_API_KEY"),
                                   PrefectSecret("FIVETRAN_API_SECRET"),
                                    connector_id=PrefectSecret("GOOGLE_ADS_CONNECTOR_ID"),
                                    poll_status_every_n_seconds=15,
                                    task_args={"name": "Syncing Google Ads..."})

    bing_ads_sync = fivetran_task(PrefectSecret("FIVETRAN_API_KEY"),
                                   PrefectSecret("FIVETRAN_API_SECRET"),
                                    connector_id=PrefectSecret("BING_ADS_CONNECTOR_ID"),
                                    poll_status_every_n_seconds=15,
                                    task_args={"name": "Syncing Bing Ads..."})

    facebook_basic_ads_sync = fivetran_task(PrefectSecret("FIVETRAN_API_KEY"),
                                   PrefectSecret("FIVETRAN_API_SECRET"),
                                    connector_id=PrefectSecret("FACEBOOK_BASIC_ADS_CONNECTOR_ID"),
                                    poll_status_every_n_seconds=15,
                                    task_args={"name": "Syncing Facebook basic ads..."})

    impact_radius_sync = fivetran_task(PrefectSecret("FIVETRAN_API_KEY"),
                                   PrefectSecret("FIVETRAN_API_SECRET"),
                                    connector_id=PrefectSecret("IMPACT_RADIUS_CONNECTOR_ID"),
                                    poll_status_every_n_seconds=15,
                                    task_args={"name": "Syncing Impact Radius..."})

    winc_influencers_sheet_sync = fivetran_task(PrefectSecret("FIVETRAN_API_KEY"),
                                   PrefectSecret("FIVETRAN_API_SECRET"),
                                    connector_id=PrefectSecret("WINC_INFLUENCERS_CONNECTOR_ID"),
                                    poll_status_every_n_seconds=15,
                                    task_args={"name": "Syncing Winc influencer sheet..."})

    wwc_influencers_sheet_sync = fivetran_task(PrefectSecret("FIVETRAN_API_KEY"),
                                   PrefectSecret("FIVETRAN_API_SECRET"),
                                    connector_id=PrefectSecret("WWC_INFLUENCERS_CONNECTOR_ID"),
                                    poll_status_every_n_seconds=15,
                                    task_args={"name": "Syncing WWC influencer sheet..."})

    winc_misc_marketing_sheet_sync = fivetran_task(PrefectSecret("FIVETRAN_API_KEY"),
                                   PrefectSecret("FIVETRAN_API_SECRET"),
                                    connector_id=PrefectSecret("WINC_MISC_MARKETING_CONNECTOR_ID"),
                                    poll_status_every_n_seconds=15,
                                    task_args={"name": "Syncing Winc misc marketing sheet..."})

    wwc_misc_marketing_sheet_sync = fivetran_task(PrefectSecret("FIVETRAN_API_KEY"),
                                   PrefectSecret("FIVETRAN_API_SECRET"),
                                    connector_id=PrefectSecret("WWC_MISC_MARKETING_CONNECTOR_ID"),
                                    poll_status_every_n_seconds=15,
                                    task_args={"name": "Syncing WWC misc marketing sheet..."})

    # run dbt models
    # user brand campaigns
    dim_campaigns_dbt_run = dbt_task(command='dbt run -m dim_campaigns', task_args={"name": "Dim Campaigns"})
    user_brand_campaigns_dbt_run = dbt_task(command='dbt run -m user_brand_campaigns', task_args={"name": "User Brand Campaigns"})

    # web session details model
    web_session_visitor_ids_dbt_run = dbt_task(command='dbt run -m web_session_users_mapped web_session_visitor_ids', task_args={"name": "Web Session Visitor Ids"})
    web_session_page_views_visitor_ids_joined_dbt_run = dbt_task(command='dbt run -m web_session_page_views_visitor_ids_joined', task_args={"name": "Web Session Page Views Visitor Ids Joined"})
    users_to_visitor_ids_dbt_run = dbt_task(command='dbt run -m users_to_visitor_ids', task_args={"name": "Users to Visitor Ids"})
    web_session_details_dbt_run = dbt_task(command='dbt run -m web_session_new_sessions_derived web_session_sequences_derived web_session_details', task_args={"name": "Web Session Details"})

    page_view_details_dbt_run = dbt_task(command='dbt run -m page_view_details', task_args={"name": "Page View Details"})

    # user details model
    user_details_biz_dev_mapping_joined = dbt_task(command='dbt run -m user_details_biz_dev_mapping_joined', task_args={"name": "User Details Biz Dev Mapping Joined"})
    dim_subscriptions_dbt_run = dbt_task(command='dbt run -m dim_subscriptions', task_args={"name": "Dim Subscriptions"})
    user_details_dbt_run = dbt_task(command='dbt run -m user_subscription_details user_details', task_args={"name": "User Details"})

    # media spend mode
    media_spend_shared_sources_dbt_run = dbt_task(command='dbt run -m media_spend_facebook_summed media_spend_google_ads_joined media_spend_influencers_joined media_spend_misc_marketing_joined', task_args={"name": "Media Spend"})
    media_spend_cactus_media_dbt_run = dbt_task(command='dbt run -m media_spend_cactus_media_sales_derived', task_args={"name": "Cactus Media"})
    media_spend_wwc_dbt_run = dbt_task(command='dbt run -m media_spend_wwc', task_args={"name": "Media Spend WWC"})
    media_spend_winc_dbt_run = dbt_task(command='dbt run -m media_spend_bing_ads_joined media_spend_impact_radius_summed media_spend_winc', task_args={"name": "Media Spend Winc"})

    #dbt_cleanup_old_models = dbt_task(command='dbt run-operation drop_old_relations', task_args={"name": "Cleaning up old models..."})

    # set dependencies for dbt model runs
    # models that depend on data sources syncing

    dim_campaigns_dbt_run.set_upstream(winc_sync)
    dim_subscriptions_dbt_run.set_upstream(winc_sync)
    user_brand_campaigns_dbt_run.set_upstream(winc_sync)
    web_session_visitor_ids_dbt_run.set_upstream(webevent_page_views_sync)
    user_details_biz_dev_mapping_joined.set_upstream(winc_sync)
    user_details_biz_dev_mapping_joined.set_upstream(biz_dev_mapping_winc_sync)
    user_details_dbt_run.set_upstream(winc_sync)
    media_spend_shared_sources_dbt_run.set_upstream(facebook_basic_ads_sync)
    media_spend_shared_sources_dbt_run.set_upstream(google_ads_sync)
    media_spend_shared_sources_dbt_run.set_upstream(winc_influencers_sheet_sync)
    media_spend_shared_sources_dbt_run.set_upstream(wwc_influencers_sheet_sync)
    media_spend_shared_sources_dbt_run.set_upstream(winc_misc_marketing_sheet_sync)
    media_spend_shared_sources_dbt_run.set_upstream(wwc_misc_marketing_sheet_sync )
    media_spend_winc_dbt_run.set_upstream(bing_ads_sync)
    media_spend_winc_dbt_run.set_upstream(impact_radius_sync)

    # models that depend on other models
    user_brand_campaigns_dbt_run.set_upstream(dim_campaigns_dbt_run)
    web_session_page_views_visitor_ids_joined_dbt_run.set_upstream(web_session_visitor_ids_dbt_run)
    web_session_details_dbt_run.set_upstream(web_session_page_views_visitor_ids_joined_dbt_run)
    web_session_details_dbt_run.set_upstream(web_session_visitor_ids_dbt_run)
    page_view_details_dbt_run.set_upstream(web_session_details_dbt_run)
    page_view_details_dbt_run.set_upstream(web_session_page_views_visitor_ids_joined_dbt_run)
    users_to_visitor_ids_dbt_run.set_upstream(web_session_visitor_ids_dbt_run)
    user_details_biz_dev_mapping_joined.set_upstream(dim_campaigns_dbt_run)
    user_details_dbt_run.set_upstream(user_details_biz_dev_mapping_joined)
    user_details_dbt_run.set_upstream(dim_subscriptions_dbt_run)
    media_spend_wwc_dbt_run.set_upstream(media_spend_shared_sources_dbt_run)
    media_spend_winc_dbt_run.set_upstream(media_spend_shared_sources_dbt_run)
    media_spend_winc_dbt_run.set_upstream(media_spend_cactus_media_dbt_run)
    media_spend_cactus_media_dbt_run.set_upstream(web_session_details_dbt_run)

data_pipeline_prod.register(project_name="Winc_Prod")
a
Thx @Madison Schott, but the most important thing to debug which is the STORAGE and RUN_CONFIG is still missing. Can you add that?
m
well that stuff isn't the issue, it's in the dbt code itself
that has sensitive info
a
I think the issue might be
PrefectSecret("SNOWFLAKE_PASSWORD")
- ideally, this should be moved to the Flow constructor because this is a first-class Prefect task.
m
The issue isn't in running the workflow, it's in the page_view_details- that has been the same in all of my flows and I've never had an issue
a
@Madison Schott You could either pass the password as a task within the Flow:
Copy code
with Flow("data_pipeline_prod", storage=STORAGE,run_config=RUN_CONFIG) as data_pipeline_prod:
    dbt_secret = PrefectSecret("SNOWFLAKE_PASSWORD")
    dim_campaigns_dbt_run = dbt_task(command='dbt run -m dim_campaigns',task_args={"name": "Dim Campaigns"},dbt_kwargs={"password": dbt_secret})
Or you could change the flow structure to use the Secret at the module level with the client-level Secret:
Copy code
from prefect.client import Secret

dbt_task = DbtShellTask(profile_name='winc',
                        helper_script = f"cd {DBT_REPO_PATH}",
                        log_stderr=True,
                        stream_output=True,
                        log_stdout=True,
                        profiles_dir=DBT_REPO_PATH,
                        environment='prod',
                        dbt_kwargs = {
                            "type": "snowflake",
                            "account": 'fi45185.east-us-2.azure',
                            "user": 'dbt_user',
                            "password": Secret("SNOWFLAKE_PASSWORD").get(),
                            "role": 'TRANSFORMER_WH',
                            "database": 'data_mart_prod',
                            "warehouse": 'TRANSFORM',
                            "overwrite_profile": True,
                            "profile_name": 'winc_prod',
                            "threads": 200
                        })
The reason why this happens is that
PrefectSecret
is a first-class Prefect task, and therefore it's evaluated at runtime (when you run the flow), not at build time (when you register the flow).
m
Copy code
000904 (42000): SQL compilation error: error line 7 at position 4
10:40:12
INFO
DbtShellTask
  invalid identifier 'PAGE_VIEW_VISITOR_IDS_JOINED.USER_PROFILE_ID'
this is the error
I just don't understand why it is happening since I've run it locally and the code all works and it's the same code being deployed on Docker
I'm also getting this issue with my fivetran sync connector
Copy code
Error during execution of task: HTTPError('404 Client Error: Not Found for url: <https://api.fivetran.com/v1/connectors/%0Alethal_conservation>')
a
So, there are a couple of issues here, let's number them to track and not get lost: 1. When it comes to "SQL compilation error", this seems like something dbt-specific, it's not an error coming from Prefect. Perhaps you could try asking about this specific error in the dbt Slack? 2. When it comes to the 404 Client error from Fivetran API, this could happen because you are trying to access an invalid URL or sending invalid credentials in your API request. 3. If, as you say, the issue is related to Prefect's Docker storage, do you have a specific stack trace from Prefect that indicates that Docker storage is the issue here? Based on what you provided, nothing is showing any Docker issues. I'm not sure if that's true, but I believe that why you succeed locally is because you have everything set up locally to talk to all these APIs (dbt and Fivetran), and somehow this configuration is not provided correctly when you deploy your Prefect flow. I would expect that the issue might be in the way you pass secrets to the tasks. Could you give it a try to pass the secrets as I described above?
m
It's weird though because the same code works locally without an error... so it has to be something with the Prefect deploy specifically
This specific model that is erroring out has been running for the last few months, I just did a few small code changes, so I know it's not related to the tasks
a
There are a couple of things that you could do now. 1. Since something in the Docker image seems to be configured differently than in your local env, you could try and run your DBT workflow inside of a running container to be sure that everything in the container matches your local env 2. It can be that due to the Fivetran outage, your dbt task is missing some data in staging - this could be the initial error with a missing column. 3. Again, cross-check the secrets.
btw @Madison Schott I’ve just found out there is a really nice article about running dbt in Prefect - sharing in case this might be useful https://medium.com/slateco-blog/prefect-orchestrating-dbt-10f3ca0baea9