Bruno Murino
06/22/2021, 5:40 PM[2021-06-22 13:25:46+0100] INFO - prefect.ShellTask | 13:25:46 | Concurrency: 12 threads (target='dev')
INFO:prefect.ShellTask:13:25:46 | Concurrency: 12 threads (target='dev')
Kevin Kho
flow.run
inside your flow by any chance?Kevin Kho
Bruno Murino
06/22/2021, 5:46 PMBruno Murino
06/22/2021, 5:46 PMBruno Murino
06/22/2021, 5:47 PMdef create_flow():
....
return flow
if file==main:
flow = create_flow()
flow.run(args)
Kevin Kho
Kevin Kho
flow.run
is getting registered along with the flow, especially because the loggers have different syntax
import prefect
from prefect import Task
from prefect import task, Flow, Parameter
from prefect.tasks.shell import ShellTask
shell_task = ShellTask(
helper_script="",
shell="bash",
log_stderr=True,
return_all=True,
stream_output=True,
)
@task
def get_command(param):
return f"echo '{param}'"
with Flow(name="Test") as flow:
param = Parameter('param', 1)
cmd = get_command(param)
test = shell_task(command=cmd)
flow.register("omlds")
Bruno Murino
06/22/2021, 6:14 PMKevin Kho
Bruno Murino
06/22/2021, 6:27 PMimport pathlib
import prefect
from prefect import task, Flow, Parameter, unmapped
from prefect.executors import LocalDaskExecutor
from prefect.tasks.shell import ShellTask
from schedule import get_schedule
shell_task = ShellTask(helper_script="cd dbt/", stream_output=True)
@task
def get_config(env, scope):
global_config = {
'env': env,
'scope': scope,
}
config = {
**global_config,
}
return config
def configure_flow(flow):
flow.schedule = get_schedule()
flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=8)
def create_flow():
with Flow("kensington_pipeline") as flow:
env = Parameter('env')
scope = Parameter('scope')
config = get_config(env, scope)
shell_task(command="dbt run", env={'ENV': env})
configure_flow(flow)
return flow
if __name__ == "__main__":
flow = create_flow()
flow.run(env="prod", scope = "ss", run_on_schedule=False)
Kevin Kho
from prefect import task, Flow, Parameter, unmapped
from prefect.tasks.shell import ShellTask
shell_task = ShellTask(stream_output=True)
def create_flow():
with Flow("kensington_pipeline") as flow:
shell_task(command="echo 1")
return flow
if __name__ == "__main__":
flow = create_flow()
flow.run(run_on_schedule=False)
Bruno Murino
06/22/2021, 9:43 PMfrom prefect import task, Flow, Parameter, unmapped
from prefect.tasks.shell import ShellTask
shell_task = ShellTask(stream_output=True)
def create_flow():
with Flow("kensington_pipeline") as flow:
env = Parameter('env')
shell_task(command="echo 1", env={'ENV': env})
return flow
if __name__ == "__main__":
flow = create_flow()
flow.run(env='dev', run_on_schedule=False)
Kevin Kho
Bruno Murino
06/22/2021, 9:46 PMKevin Kho
Bruno Murino
06/22/2021, 9:52 PMBruno Murino
06/22/2021, 10:12 PMimport pathlib
import prefect
from prefect import task, Flow, Parameter, unmapped
from prefect.executors import LocalDaskExecutor
from prefect.tasks.shell import ShellTask
from schedule import get_schedule
shell_task = ShellTask(helper_script="cd dbt/", stream_output=True)
def get_aws_session(env, local_aws_profile):
import boto3 as _boto3
import logging as _logging
if env == "prod" or local_aws_profile == "":
<http://_logging.info|_logging.info>("Getting AWS Session for PROD")
aws_session = _boto3.Session(region_name="eu-west-1")
else:
aws_session = _boto3.Session(
profile_name=local_aws_profile, region_name="eu-west-1"
)
return aws_session
@task
def get_config(env, scope):
global_config = {
'env': env,
'scope': scope,
'aws_session': get_aws_session(env=env, local_aws_profile='prod'),
}
config = {
**global_config,
}
return config
def configure_flow(flow):
# flow.state_handlers = [my_state_handler]
flow.schedule = get_schedule()
flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=8)
def create_flow():
with Flow("kensington_pipeline") as flow:
env = Parameter('env')
scope = Parameter('scope')
config = get_config(env, scope)
shell_task(command="ls", env = {'ENV': config['env']})
configure_flow(flow)
return flow
if __name__ == "__main__":
flow = create_flow()
flow.run(env="prod", scope = "ss", run_on_schedule=False)
Kevin Kho
Kevin Kho
def get_aws_session(env, local_aws_profile):
import boto3 as _boto3
# import logging as _logging
if env == "prod" or local_aws_profile == "":
# <http://_logging.info|_logging.info>("Getting AWS Session for PROD")
aws_session = _boto3.Session(region_name="eu-west-1")
else:
aws_session = _boto3.Session(
profile_name=local_aws_profile, region_name="eu-west-1"
)
return aws_session
Bruno Murino
06/22/2021, 10:16 PMimport prefect
from prefect import task, Flow, Parameter, unmapped
from prefect.tasks.shell import ShellTask
shell_task = ShellTask(stream_output=True)
@task
def get_config(env):
import logging
# <http://logging.info|logging.info>('uncommenting this line causes log duplication')
return {'env': env}
def configure_flow(flow):
flow.schedule = get_schedule()
flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=8)
def create_flow():
with Flow("kensington_pipeline") as flow:
env = Parameter('env')
config = get_config(env)
shell_task(command="ls", env = {'ENV': config['env']})
return flow
if __name__ == "__main__":
flow = create_flow()
flow.run(env="prod", run_on_schedule=False)
Bruno Murino
06/22/2021, 10:16 PMBruno Murino
06/22/2021, 10:17 PMKevin Kho
@task
def get_config(env):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>('uncommenting this line causes log duplication')
return {'env': env}
Bruno Murino
06/22/2021, 10:18 PMKevin Kho
Bruno Murino
06/22/2021, 10:19 PMKevin Kho
Bruno Murino
06/22/2021, 10:23 PMBruno Murino
06/22/2021, 10:24 PMKevin Kho
logger = logging.getLogger('simpleExample')
, it won’t cause the duplicatesBruno Murino
06/22/2021, 10:29 PMKevin Kho
Bruno Murino
06/22/2021, 10:37 PMBruno Murino
06/22/2021, 10:39 PMKevin Kho
Bruno Murino
06/22/2021, 11:23 PM