Gage Toschlog
04/20/2021, 9:14 PM@task
def run_DBT_task():
task = DbtShellTask(
profile_name='snowflake',
environment='sandbox',
profiles_dir='.',
stream_output=True,
return_all=True
)
task.run(command="cd ~/src/prefect && dbt run")
with Flow(name="Local Test") as flow:
run_DBT_task()
flowID = flow.register(project_name="51", labels=["localtest"])
client.create_flow_run(flow_id = flowID)
Gage Toschlog
04/20/2021, 9:15 PMGage Toschlog
04/20/2021, 9:18 PMnicholas
task
variable in your run_DBT_task
method, since that scoping may interfere with your imported task
decorator.Gage Toschlog
04/20/2021, 9:33 PMnicholas
Gage Toschlog
04/20/2021, 9:37 PM@task(name = "Chizzle Task",log_stdout = True)
def chizzle_execute(cmd):
chizzle_dbt_task = DbtShellTask(
profile_name='snowflake',
environment='eng',
profiles_dir='.',
stream_output=True,
return_all=True
)
#<http://logger.info|logger.info>('Starting task!')
#result = task.run(command=cmd)
chizzle_dbt_task.run(command=cmd)
#<http://logger.info|logger.info>(result)
def chizzle(pid, integration_type_list, lookback, is_full_refresh, batch_load_type):
stats = statsd.StatsClient(ENV['STATSD_HOST'], 8125, prefix="snowflake_etl")
rand_int = random.randint(1,5)
print(f"Sleeping {rand_int} seconds...")
time.sleep(rand_int)
variables = json.dumps({
"pid":pid,
"batch_load_type":batch_load_type,
"lookback":lookback
})
#cd /usr/local/bin/ &&
run_cmd = f"dbt run --vars '{variables}' --model {integration_type_list}"
if is_full_refresh:
stats.incr(f'work.is_full_refresh')
stats.incr(f'work.{pid}.is_full_refresh')
ENV['ACTIVE_WAREHOUSE'] = ENV['SF_FULL_REFRESH_WAREHOUSE']
run_cmd = f'{run_cmd} --full-refresh'
print(run_cmd)
client = Client()
with Flow(name="Chizzle DBT Flow Mark 9", storage=Local(add_default_labels=False)) as chizzle_flow:
chizzle_execute(run_cmd)
flow_id = chizzle_flow.register(project_name='51', labels=['eng','chizzle'])
client.create_flow_run(flow_id = flow_id)
nicholas
log_stdout=True
to the DbtShellTask
constructor?Gage Toschlog
04/20/2021, 11:16 PMnicholas
log_to_cloud=True
like this:
[logging]
log_to_cloud=True
You can find that config at ~/.prefect/config.toml
Gage Toschlog
04/20/2021, 11:39 PMnicholas
Gage Toschlog
04/20/2021, 11:42 PMGage Toschlog
04/21/2021, 12:30 AMnicholas
stream_output=True
, I believe the log level defaults to INFO
; is your logging level set appropriately in the container?Gage Toschlog
04/21/2021, 5:12 PM@task(name = "Chizzle Task")
def chizzle_execute(cmd):
chizzle_dbt_task = DbtShellTask(
profile_name='snowflake',
environment='eng',
profiles_dir='.',
stream_output=True,
log_stdout=True,
return_all=True
)
<http://logger.info|logger.info>('Starting task!')
result = chizzle_dbt_task.run(command=cmd)
#chizzle_dbt_task.run(command=cmd)
<http://logger.info|logger.info>(result)
Gage Toschlog
04/21/2021, 5:19 PMSean Talia
04/21/2021, 8:40 PMGage Toschlog
04/21/2021, 8:44 PMSean Talia
04/21/2021, 8:48 PMGage Toschlog
04/21/2021, 8:51 PMThis flow was built using Prefect '0.14.16', but you currently have Prefect '0.14.14' installed. We recommend loading flows with the same Prefect version they were built with, failure to do so may result in errors.
Sean Talia
04/21/2021, 8:52 PMGage Toschlog
04/21/2021, 8:52 PMGage Toschlog
04/21/2021, 8:52 PMSean Talia
04/21/2021, 8:53 PMGage Toschlog
04/21/2021, 8:57 PM