Bruno Nunes
02/24/2022, 1:38 PMAnna Geller
02/24/2022, 1:47 PMBruno Nunes
02/24/2022, 3:01 PM# Global parameters
BASE_DT = Parameter('BASE_DT', default='12312019')
ENTITY_ID = Parameter('ENTITY_ID', default='SASBank_1')
CYCLE_ID = Parameter('CYCLE_ID', default='10000')
FA_ID = Parameter('FA_ID', default='2022.1.1')
FA_PATH = Parameter(
'FA_PATH', default='C:\\Development\\Temp\\Prefect_Cirrus_Core\\core\\2022.1.1')
RUN_INSTANCE = Parameter(
'RUN_INSTANCE', default='C:\\Development\\Temp\\Prefect_Cirrus_Core\\core\\2022.1.1\\prefect\\run_instance\\prefect-spre-10000-12312019')
# Instantiate the flow
flow_init = Flow("Initialize")
# # Initialize and run the tasks
initialize = RunSpreTask(name='Initialize', log_stdout=True)
flow_init.set_dependencies(task=initialize,
upstream_tasks=[],
keyword_tasks=dict(BASE_DT=BASE_DT,
CYCLE_ID=CYCLE_ID,
ENTITY_ID=ENTITY_ID,
FA_ID=FA_ID,
FA_PATH=FA_PATH,
RUN_INSTANCE=RUN_INSTANCE,
NODE_CODE='core_node_init.sas',
RUN_OPTION='core_cfg.run_option',
SYSTEM_OPTION='sys_cfg.run_option',
FLOW_OPTION='core_res.flow_option',
)
)
flow_data_enrichment = Flow("Data Enrichment")
filter_by_entity = RunSpreTask(name='Filter by Entity', log_stdout=True)
flow_data_enrichment.set_dependencies(task=filter_by_entity,
upstream_tasks=[flow_init],
keyword_tasks=dict(
GROUP_FLG='N',
NODE_CODE='core_node_filter_entity.sas',
ENTITY_IN='core_lnd.entity',
ENTITY_OUT='core_res.entity',
)
)
I'm getting an error because the prefect.context.parameter can't be resolved in my RunSpreTask class.Anna Geller
02/24/2022, 3:04 PM