Gage Toschlog
04/20/2021, 9:14 PM@task
def run_DBT_task():
task = DbtShellTask(
profile_name='snowflake',
environment='sandbox',
profiles_dir='.',
stream_output=True,
return_all=True
)
task.run(command="cd ~/src/prefect && dbt run")
with Flow(name="Local Test") as flow:
run_DBT_task()
flowID = flow.register(project_name="51", labels=["localtest"])
client.create_flow_run(flow_id = flowID)Gage Toschlog
04/20/2021, 9:15 PMGage Toschlog
04/20/2021, 9:18 PMnicholas
task variable in your run_DBT_task method, since that scoping may interfere with your imported task decorator.Gage Toschlog
04/20/2021, 9:33 PMnicholas
Gage Toschlog
04/20/2021, 9:37 PM@task(name = "Chizzle Task",log_stdout = True)
def chizzle_execute(cmd):
chizzle_dbt_task = DbtShellTask(
profile_name='snowflake',
environment='eng',
profiles_dir='.',
stream_output=True,
return_all=True
)
#<http://logger.info|logger.info>('Starting task!')
#result = task.run(command=cmd)
chizzle_dbt_task.run(command=cmd)
#<http://logger.info|logger.info>(result)
def chizzle(pid, integration_type_list, lookback, is_full_refresh, batch_load_type):
stats = statsd.StatsClient(ENV['STATSD_HOST'], 8125, prefix="snowflake_etl")
rand_int = random.randint(1,5)
print(f"Sleeping {rand_int} seconds...")
time.sleep(rand_int)
variables = json.dumps({
"pid":pid,
"batch_load_type":batch_load_type,
"lookback":lookback
})
#cd /usr/local/bin/ &&
run_cmd = f"dbt run --vars '{variables}' --model {integration_type_list}"
if is_full_refresh:
stats.incr(f'work.is_full_refresh')
stats.incr(f'work.{pid}.is_full_refresh')
ENV['ACTIVE_WAREHOUSE'] = ENV['SF_FULL_REFRESH_WAREHOUSE']
run_cmd = f'{run_cmd} --full-refresh'
print(run_cmd)
client = Client()
with Flow(name="Chizzle DBT Flow Mark 9", storage=Local(add_default_labels=False)) as chizzle_flow:
chizzle_execute(run_cmd)
flow_id = chizzle_flow.register(project_name='51', labels=['eng','chizzle'])
client.create_flow_run(flow_id = flow_id)nicholas
log_stdout=True to the DbtShellTask constructor?Gage Toschlog
04/20/2021, 11:16 PMnicholas
log_to_cloud=True like this:
[logging]
log_to_cloud=True
You can find that config at ~/.prefect/config.tomlGage Toschlog
04/20/2021, 11:39 PMnicholas
Gage Toschlog
04/20/2021, 11:42 PMGage Toschlog
04/21/2021, 12:30 AMnicholas
stream_output=True , I believe the log level defaults to INFO ; is your logging level set appropriately in the container?Gage Toschlog
04/21/2021, 5:12 PM@task(name = "Chizzle Task")
def chizzle_execute(cmd):
chizzle_dbt_task = DbtShellTask(
profile_name='snowflake',
environment='eng',
profiles_dir='.',
stream_output=True,
log_stdout=True,
return_all=True
)
<http://logger.info|logger.info>('Starting task!')
result = chizzle_dbt_task.run(command=cmd)
#chizzle_dbt_task.run(command=cmd)
<http://logger.info|logger.info>(result)Gage Toschlog
04/21/2021, 5:19 PMSean Talia
04/21/2021, 8:40 PMGage Toschlog
04/21/2021, 8:44 PMSean Talia
04/21/2021, 8:48 PMGage Toschlog
04/21/2021, 8:51 PMThis flow was built using Prefect '0.14.16', but you currently have Prefect '0.14.14' installed. We recommend loading flows with the same Prefect version they were built with, failure to do so may result in errors.Sean Talia
04/21/2021, 8:52 PMGage Toschlog
04/21/2021, 8:52 PMGage Toschlog
04/21/2021, 8:52 PMSean Talia
04/21/2021, 8:53 PMGage Toschlog
04/21/2021, 8:57 PM