Bruno Nunes
02/24/2022, 1:38 PMAnna Geller
Anna Geller
Bruno Nunes
02/24/2022, 3:01 PM# Global parameters
BASE_DT = Parameter('BASE_DT', default='12312019')
ENTITY_ID = Parameter('ENTITY_ID', default='SASBank_1')
CYCLE_ID = Parameter('CYCLE_ID', default='10000')
FA_ID = Parameter('FA_ID', default='2022.1.1')
FA_PATH = Parameter(
'FA_PATH', default='C:\\Development\\Temp\\Prefect_Cirrus_Core\\core\\2022.1.1')
RUN_INSTANCE = Parameter(
'RUN_INSTANCE', default='C:\\Development\\Temp\\Prefect_Cirrus_Core\\core\\2022.1.1\\prefect\\run_instance\\prefect-spre-10000-12312019')
# Instantiate the flow
flow_init = Flow("Initialize")
# # Initialize and run the tasks
initialize = RunSpreTask(name='Initialize', log_stdout=True)
flow_init.set_dependencies(task=initialize,
upstream_tasks=[],
keyword_tasks=dict(BASE_DT=BASE_DT,
CYCLE_ID=CYCLE_ID,
ENTITY_ID=ENTITY_ID,
FA_ID=FA_ID,
FA_PATH=FA_PATH,
RUN_INSTANCE=RUN_INSTANCE,
NODE_CODE='core_node_init.sas',
RUN_OPTION='core_cfg.run_option',
SYSTEM_OPTION='sys_cfg.run_option',
FLOW_OPTION='core_res.flow_option',
)
)
flow_data_enrichment = Flow("Data Enrichment")
filter_by_entity = RunSpreTask(name='Filter by Entity', log_stdout=True)
flow_data_enrichment.set_dependencies(task=filter_by_entity,
upstream_tasks=[flow_init],
keyword_tasks=dict(
GROUP_FLG='N',
NODE_CODE='core_node_filter_entity.sas',
ENTITY_IN='core_lnd.entity',
ENTITY_OUT='core_res.entity',
)
)
I'm getting an error because the prefect.context.parameter can't be resolved in my RunSpreTask class.Anna Geller
Anna Geller
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by