Hey everyone — I’m trying to use a shellTask and i...
# ask-community
b
Hey everyone — I’m trying to use a shellTask and it is working but I’m getting duplicated logs, with slightly different formatting but identical content. I can’t seem to find a way to disable/hide the duplicated logs. Here’s an example of duplication:
Copy code
[2021-06-22 13:25:46+0100] INFO - prefect.ShellTask | 13:25:46 | Concurrency: 12 threads (target='dev')
INFO:prefect.ShellTask:13:25:46 | Concurrency: 12 threads (target='dev')
k
Hey @Bruno Murino, are you calling
flow.run
inside your flow by any chance?
Does this only happen for the ShellTask and is there any thing that shows the task did run twice?
b
yes I am — only for the shell task
the task certainly just ran once
well actually what I have is something like this:
Copy code
def create_flow():
    ....
    return flow

if file==main:
    flow = create_flow()
    flow.run(args)
k
And then you register with the CLI? I think the flow.run() might be getting caught here. Could you try registering without the flow.run() and running that first?
I tested it and only got one log. I am positive your
flow.run
is getting registered along with the flow, especially because the loggers have different syntax
Copy code
import prefect
from prefect import Task
from prefect import task, Flow, Parameter
from prefect.tasks.shell import ShellTask

shell_task = ShellTask(
    helper_script="",
    shell="bash",
    log_stderr=True,
    return_all=True,
    stream_output=True,
)

@task
def get_command(param):
    return f"echo '{param}'"

with Flow(name="Test") as flow:
    param = Parameter('param', 1)
    cmd = get_command(param)
    test = shell_task(command=cmd)

flow.register("omlds")
b
oh I’m not registering, just running locally
k
Oh I see. Would you be able to give me a minimal example?
b
Copy code
import pathlib


import prefect
from prefect import task, Flow, Parameter, unmapped
from prefect.executors import LocalDaskExecutor

from prefect.tasks.shell import ShellTask
from schedule import get_schedule

shell_task = ShellTask(helper_script="cd dbt/", stream_output=True)

@task
def get_config(env, scope):

    global_config = {
        'env': env,
        'scope': scope,
    }

    config = {
        **global_config,
    }

    return config


def configure_flow(flow):

    flow.schedule = get_schedule()
    flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=8)

def create_flow():

    with Flow("kensington_pipeline") as flow:
        env = Parameter('env')
        scope = Parameter('scope')
        config = get_config(env, scope)

        shell_task(command="dbt run", env={'ENV': env})

    configure_flow(flow)

    return flow

if __name__ == "__main__":
    flow = create_flow()
    flow.run(env="prod", scope = "ss", run_on_schedule=False)
k
I didn’t see any duplicate logs when I ran this. I compressed it down. Could you try this minimal example and see if you get duplicate logs?
Copy code
from prefect import task, Flow, Parameter, unmapped
from prefect.tasks.shell import ShellTask

shell_task = ShellTask(stream_output=True)

def create_flow():
    with Flow("kensington_pipeline") as flow:
        shell_task(command="echo 1")
    return flow

if __name__ == "__main__":
    flow = create_flow()
    flow.run(run_on_schedule=False)
b
your example doesn’t yield duplicated logs, however I was able to replicate my issue with you example by adding a parameter and passing the parameter value to the ’env” argument of the shell task:
Copy code
from prefect import task, Flow, Parameter, unmapped
from prefect.tasks.shell import ShellTask

shell_task = ShellTask(stream_output=True)

def create_flow():
    with Flow("kensington_pipeline") as flow:
        env = Parameter('env')
        shell_task(command="echo 1", env={'ENV': env})
    return flow

if __name__ == "__main__":
    flow = create_flow()
    flow.run(env='dev', run_on_schedule=False)
k
Thanks! Will look into it
b
sorry, that wasn’t the example! Sending agian
k
Ok I was wondering what I did wrong cuz it seemed to be working 😅
b
I’m close to identifying it but it currently it seems to be related BOTH to my custom state handler and passing the ‘env’ parameter to the ‘env’ of the shell task
it’s very odd, every time I think I’ve singled out the culprit, it changes…
Copy code
import pathlib


import prefect
from prefect import task, Flow, Parameter, unmapped
from prefect.executors import LocalDaskExecutor

from prefect.tasks.shell import ShellTask
from schedule import get_schedule

shell_task = ShellTask(helper_script="cd dbt/", stream_output=True)

def get_aws_session(env, local_aws_profile):
    import boto3 as _boto3
    import logging as _logging
    if env == "prod" or local_aws_profile == "":
        <http://_logging.info|_logging.info>("Getting AWS Session for PROD")
        aws_session = _boto3.Session(region_name="eu-west-1")
    else:
        aws_session = _boto3.Session(
            profile_name=local_aws_profile, region_name="eu-west-1"
        )

    return aws_session

@task
def get_config(env, scope):

    global_config = {
        'env': env,
        'scope': scope,
        'aws_session': get_aws_session(env=env, local_aws_profile='prod'),
    }

    config = {
        **global_config,
    }

    return config


def configure_flow(flow):
    # flow.state_handlers = [my_state_handler]
    flow.schedule = get_schedule()
    flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=8)

def create_flow():

    with Flow("kensington_pipeline") as flow:
        env = Parameter('env')
        scope = Parameter('scope')
        config = get_config(env, scope)

        shell_task(command="ls", env = {'ENV': config['env']})

    configure_flow(flow)

    return flow

if __name__ == "__main__":
    flow = create_flow()
    flow.run(env="prod", scope = "ss", run_on_schedule=False)
k
I see the doubled now!
It’s the aws_session adding another logger here. You can comment it out like:
Copy code
def get_aws_session(env, local_aws_profile):
    import boto3 as _boto3
    # import logging as _logging
    if env == "prod" or local_aws_profile == "":
        # <http://_logging.info|_logging.info>("Getting AWS Session for PROD")
        aws_session = _boto3.Session(region_name="eu-west-1")
    else:
        aws_session = _boto3.Session(
            profile_name=local_aws_profile, region_name="eu-west-1"
        )
    return aws_session
b
Copy code
import prefect
from prefect import task, Flow, Parameter, unmapped

from prefect.tasks.shell import ShellTask

shell_task = ShellTask(stream_output=True)

@task
def get_config(env):

    import logging
    # <http://logging.info|logging.info>('uncommenting this line causes log duplication')

    return {'env': env}


def configure_flow(flow):
    flow.schedule = get_schedule()
    flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=8)

def create_flow():

    with Flow("kensington_pipeline") as flow:
        env = Parameter('env')
        config = get_config(env)

        shell_task(command="ls", env = {'ENV': config['env']})

    return flow

if __name__ == "__main__":
    flow = create_flow()
    flow.run(env="prod", run_on_schedule=False)
yes exactly I just posted a minimal example
it’s the “logging.info” from inside my function
k
You can just use the prefect logger like:
Copy code
@task
def get_config(env):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>('uncommenting this line causes log duplication')
    return {'env': env}
b
yes but the original function is part of an internal package, so I can’t use prefect there
k
This will only work when use inside tasks
b
I can change the internal package, though, just without prefect… what would be “correct” way of having that? I mean, what logging I can have on my internal functions that won’t break prefects logging ?
k
Ah ok you can do this to add the extra logger. Not 100% sure it will help, but it might remove the duplicates
b
hmm I’m trying but don’t think it’s helping
on my original function I think I’m using the “global” logger, instead of a named one, do you think this may be causing problems?
k
Yes I think it will encompass the Prefect one and that’s why you see duplicates. I think if you give it a name like
logger = logging.getLogger('simpleExample')
, it won’t cause the duplicates
b
yes I think so, doing some research.. tomorrow I’ll update my internal package to have a named logger and then the link you sent would make sense to “include” those logs if I wanted and the default behaviour would be to ignore, I think
👍 1
k
This is how Prefect creates the logger , in case you want to see
b
I didn’t find it immediately, but do you think it is possible that prefect is creating an unamed logger?
I’m not too familiar with logging, but if the duplicated logs we see come from my unamed logger on my internal package, and they are formatted with prefect style (are they? Not sure), then it means prefect is modifying the unamed logger, right?
k
I know what you’re saying. I think the ShellTask itself might, but there hasn’t been duplicates when run on Prefect Cloud. Do you plan to not use Cloud?
b
Yep, not using cloud