https://prefect.io logo
Title
a

Adam Everington

11/22/2021, 1:17 PM
Hey guys, what do all these "add"'s mean? they clutter up the schematic quite badly, could we hide them?
a

Anna Geller

11/22/2021, 1:24 PM
@Adam Everington Add is a logical operator task created automatically by Prefect. If you add some number within your flow structure, it creates an Add task under the hood. You can definitely hide it if you want - it depends on how you structure your flow. When you move the addition into separate tasks, you could hide it. See the example here:
from prefect import task, Flow, Parameter

@task
def return_some_number(x):
    return x

with Flow('Add one') as flow:
    result = Parameter('x', default=1) + 1
    nr = result + 1
    return_some_number(nr)


flow.visualize()
And compare it to this:
from prefect import task, Flow, Parameter

@task
def add_one(x):
    return x + 1

@task
def add_one_again(x):
    return x + 1

with Flow('Add one') as flow:
    param = Parameter('x', default=1)
    result = add_one(param)
    add_one_again(result)


flow.visualize()
a

Adam Everington

11/22/2021, 2:28 PM
ok that makes sense, but i'm not manipulating the results of tasks within the flow, just passing them around... take my code here :
with Flow("Create-Postcode-Incode-Lookup"
    ,executor=LocalDaskExecutor()
    ,storage = Git(
        flow_path = "./ONS/Create-ONS-Incode-Lookup.py",
        repo="prefect",
        branch_name="main",
        git_clone_url_secret_name="DEVOPS_URL"
    )
    ,run_config=UniversalRun(labels=['Cloud01'])) as flow:

    #Params
    working_directory = Parameter("Working Directory",default=r"/media/data0/prefect/ons",required=False)
    server = Parameter("RefData_Server",default="<http://myazureserver.database.windows.net|myazureserver.database.windows.net>",required=False)
    db = Parameter("RefData_Db",default="RefData",required=False)
    user = Parameter('SQL User Name', default='admin', required=False)

    #Secrets
    pw = get_secret_value(user)      
    cnxn_string = get_cnxn_string(server,db,user,pw)

    #Tasks
    url = get_download_url()
    files = download_postcode_directory(url, working_directory)
    postcode_file,parish_file = unzip_files(files, working_directory)
    postcodes = get_postcodes(postcode_file)
    parishes = get_parishes(parish_file)
    to_load = merge_datasets(postcodes, parishes)
    load = load_to_sql_server(to_load, cnxn_string, 'ONSInCodeLookup')
    clean_landing_area(working_directory,upstream_tasks=[load])

flow.schedule = schedules.Schedule(
    clocks=[CronClock("0 18 28 * *")]
)

flow.register(
   project_name='Capture-ETL',
   idempotency_key=flow.serialized_hash()
)
and look at my flow visual I sent, the ref data param just gets passed to 1 task, that's it and yet it's showing 2 add operators? It's not a biggy... I'm just curious!
a

Anna Geller

11/22/2021, 3:42 PM
FYI: resolved via DM. For posterity: the issue was a missing @task decorator in the function get_cnxn_string
👍 1
a

Adam Everington

11/22/2021, 6:58 PM
yep... total noob move, @Anna Geller amazing as always
🙌 1