https://prefect.io logo
r

Richard Hughes

02/16/2022, 2:55 PM
Hi - I am trying to find out if the upstream_tasks can take wildcards - I am kind of struggling to find the documentation on this kwarg.
k

Kevin Kho

02/16/2022, 2:55 PM
upstream_tasks
takes Task objects like the actual variable. Not the name.
r

Richard Hughes

02/16/2022, 3:03 PM
Copy code
for key in ODSCloneGrantScripts:
        ODS_Grant_Executing = SqlExecution(Database='ODS_DBT', SqlScript=ODSCloneGrantScripts[key], task_args={"name":"ODS_CLONE_GRANT: "+str(key)}, upstream_tasks=None)

    flow_run = create_flow_run(flow_name="System.SQLDynamicExecution",
        project_name="PROD",
        parameters={"SQLFileName": "F:\\Shares\\PostCloneGrants.sql"
         })
    flow_run.set_upstream()
how do I run the task in my dictionary and then set a dependency on those tasks to run the flow_run task last after all off them have been completed
k

Kevin Kho

02/16/2022, 3:07 PM
Ah I know what you are saying. One sec.
Here is an example, you append those tasks to a list and then manipulate the list
Copy code
@task
def add_one(x):
    res = x + 1
    print(res)
    return res


@task
def add_two(x):
    res = x + 2
    print(res)
    return res


with Flow("forloop") as flow:
    inputs = [1, 2, 3, 4, 5]
    tasks = []
    for _in in inputs:
        a = add_one(_in)
        b = add_two(a)
        tasks.append(a)
        tasks.append(b)

    # set dependencies
    for i in range(1, len(tasks)):
        tasks[i].set_upstream(tasks[i - 1])

flow.run()
👀 1
r

Richard Hughes

02/17/2022, 3:25 PM
@Kevin Kho Super Helpful! Thanks for providing this information.
👍 1
3 Views