Hi - I am trying to find out if the upstream_tasks can take wildcards - I am kind of struggling to find the documentation on this kwarg.
k
Kevin Kho
02/16/2022, 2:55 PM
upstream_tasks
takes Task objects like the actual variable. Not the name.
r
Richard Hughes
02/16/2022, 3:03 PM
Copy code
for key in ODSCloneGrantScripts:
ODS_Grant_Executing = SqlExecution(Database='ODS_DBT', SqlScript=ODSCloneGrantScripts[key], task_args={"name":"ODS_CLONE_GRANT: "+str(key)}, upstream_tasks=None)
flow_run = create_flow_run(flow_name="System.SQLDynamicExecution",
project_name="PROD",
parameters={"SQLFileName": "F:\\Shares\\PostCloneGrants.sql"
})
flow_run.set_upstream()
Richard Hughes
02/16/2022, 3:05 PM
how do I run the task in my dictionary and then set a dependency on those tasks to run the flow_run task last after all off them have been completed
k
Kevin Kho
02/16/2022, 3:07 PM
Ah I know what you are saying. One sec.
Kevin Kho
02/16/2022, 3:08 PM
Here is an example, you append those tasks to a list and then manipulate the list
Copy code
@task
def add_one(x):
res = x + 1
print(res)
return res
@task
def add_two(x):
res = x + 2
print(res)
return res
with Flow("forloop") as flow:
inputs = [1, 2, 3, 4, 5]
tasks = []
for _in in inputs:
a = add_one(_in)
b = add_two(a)
tasks.append(a)
tasks.append(b)
# set dependencies
for i in range(1, len(tasks)):
tasks[i].set_upstream(tasks[i - 1])
flow.run()
👀 1
r
Richard Hughes
02/17/2022, 3:25 PM
@Kevin Kho Super Helpful! Thanks for providing this information.
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.