Jasono
11/24/2020, 6:00 PMjosh
11/24/2020, 6:52 PMJasono
11/24/2020, 6:55 PMjosh
11/24/2020, 6:57 PMJasono
11/24/2020, 6:58 PMjosh
11/24/2020, 7:02 PMwith Flow():
my_task(context.get("reports"))
That will not work because context is being evaluated at initialization of the flow object before registration. Subsequently the context provided by the UI will not be read in there because that code is running prior to the flow run. Instead your context should be accessed inside of a task
@task
def my_task():
prefect.context("reports")
Jasono
11/24/2020, 7:06 PMwith Flow("bdx_process_9xx") as flow:
acctDate = str(prefect.context.acctDate)
prodDate = str(prefect.context.prodDate)
runDesc9xx = prefect.context.runDesc9xx
print(f'ProdDate={prodDate} ..{runDesc9xx}')
for queryKey, (queryCmd, dependQry) in cmdPairs(acctDate, prodDate, runDesc9xx).items():
task = BDX_Task(name=queryKey)(queryKey =queryKey,
queryCmd=queryCmd
)
flow.add_task(task)
if len(dependQry) > 0 :
if type(dependQry) is list:
for q in dependQry:
flow.get_tasks(name=queryKey)[0].set_upstream(flow.get_tasks(name=q)[0])
else:
flow.get_tasks(name=queryKey)[0].set_upstream(flow.get_tasks(name=dependQry)[0])
josh
11/24/2020, 7:15 PMJasono
11/24/2020, 7:16 PM