Hello. I’m attempting to use the DBTShellTask to ...
# ask-community
g
Hello. I’m attempting to use the DBTShellTask to execute DBT tasks and stream the DBTShellTask output to the Prefect flow logs. This is working locally when I start a local agent and register and run the flow, but the same code is not streaming the output when executing inside a docker container. Any suggestions?
Copy code
@task
def run_DBT_task():

    task = DbtShellTask(
        profile_name='snowflake',
        environment='sandbox',
        profiles_dir='.',
        stream_output=True,
        return_all=True
    )
    
    task.run(command="cd ~/src/prefect && dbt run")

with Flow(name="Local Test") as flow:
    run_DBT_task()

flowID = flow.register(project_name="51", labels=["localtest"])

client.create_flow_run(flow_id = flowID)
👀 1
Flow Logs (Docker):
Flow Logs (Local):
n
Hi @Gage Toschlog - have you tried turning on std_out logging? In addition, you may want to rename that
task
variable in your
run_DBT_task
method, since that scoping may interfere with your imported
task
decorator.
g
@nicholas - I will try that and report back. Thanks!
n
Let me know what you find 👍 🙂
g
Unfortunately I’m seeing the same behavior. Here is the current code that is running in the docker container and a screen shot of the logs:
Copy code
@task(name = "Chizzle Task",log_stdout = True)
def chizzle_execute(cmd):

    chizzle_dbt_task = DbtShellTask(
        profile_name='snowflake',
        environment='eng',
        profiles_dir='.',
        stream_output=True,
        return_all=True
    )

    #<http://logger.info|logger.info>('Starting task!')

    #result = task.run(command=cmd)

    chizzle_dbt_task.run(command=cmd)

    #<http://logger.info|logger.info>(result)

def chizzle(pid, integration_type_list, lookback, is_full_refresh, batch_load_type):
    stats = statsd.StatsClient(ENV['STATSD_HOST'], 8125, prefix="snowflake_etl")
    rand_int = random.randint(1,5)

    print(f"Sleeping {rand_int} seconds...")

    time.sleep(rand_int)

    variables = json.dumps({
        "pid":pid, 
        "batch_load_type":batch_load_type,
        "lookback":lookback
    })

    #cd /usr/local/bin/ && 

    run_cmd = f"dbt run --vars '{variables}' --model {integration_type_list}"

    if is_full_refresh:
        stats.incr(f'work.is_full_refresh')
        stats.incr(f'work.{pid}.is_full_refresh')
        ENV['ACTIVE_WAREHOUSE'] = ENV['SF_FULL_REFRESH_WAREHOUSE']
        run_cmd = f'{run_cmd} --full-refresh'

    print(run_cmd)
 
    client = Client()

    with Flow(name="Chizzle DBT Flow Mark 9", storage=Local(add_default_labels=False)) as chizzle_flow:
        chizzle_execute(run_cmd)
    
    flow_id = chizzle_flow.register(project_name='51', labels=['eng','chizzle'])
    client.create_flow_run(flow_id = flow_id)
n
Hm, can you pass
log_stdout=True
to the
DbtShellTask
constructor?
g
Unfortunately, no change.
n
Hm, can you confirm that the config in your container has
log_to_cloud=True
like this:
Copy code
[logging]
log_to_cloud=True
You can find that config at
~/.prefect/config.toml
g
We are hosting our own instance of Prefect. Does that config still apply?
n
Yup - the setting was introduced before Server existed so it's carried over!
g
Ok, I’ll try it!
No luck 😞 The flows are registered and executed within a Python thread. Could that cause any issues?
n
Hi @Gage Toschlog - just tried this myself and I'm not able to reproduce. When passing
stream_output=True
, I believe the log level defaults to
INFO
; is your logging level set appropriately in the container?
g
Yep - It looks like it is set to DEBUG by default and I haven’t changed it. What’s bizarre is if I instantiate a logger and log the result of the DBTShell Task, it does send the DBT output up. However, we need it to stream as it’s running.
Copy code
@task(name = "Chizzle Task")
def chizzle_execute(cmd):

    chizzle_dbt_task = DbtShellTask(
        profile_name='snowflake',
        environment='eng',
        profiles_dir='.',
        stream_output=True,
        log_stdout=True,
        return_all=True
    )

    <http://logger.info|logger.info>('Starting task!')

    result = chizzle_dbt_task.run(command=cmd)

    #chizzle_dbt_task.run(command=cmd)

    <http://logger.info|logger.info>(result)
s
@Gage Toschlog what version of prefect is your flow running with?
g
I’m running Prefect 0.14.16
s
hmmm okay, I had just submitted a PR that got merged a couple of weeks ago to address this very issue so i thought perhaps you might not have been on the latest
g
Hmm. I just added --show-flow-logs to the agent start command and I am seeing a warning:
Copy code
This flow was built using Prefect '0.14.16', but you currently have Prefect '0.14.14' installed. We recommend loading flows with the same Prefect version they were built with, failure to do so may result in errors.
s
oh is your agent running with the older version?
g
Which is odd because the flow is created and executed in the same environment
Possibly…
s
you may want to kill the agent and bring it back up with 0.14.16; i actually haven't tried what you're doing because I had been told that the version the agent is running with always needs to be ahead of what the flow was registered with, so i don't know if that actually will fix it
g
Ok. I will try that!