Bruno Nunes
02/17/2022, 10:50 AMAnna Geller
02/17/2022, 11:17 AM@task
def get_file_path(fa_id):
return "C:\\Development\\Temp\\Prefect_Cirrus_Core\\core\\" + fa_id + "\\"
@task
def get_run_instance(fa_path, cycle_id, base_dt):
return fa_path + "prefect\\run_instance\\" + "prefect-spre-" + cycle_id + "-" + base_dt
with Flow("prefect-spre-dataPrep") as flow:
# Global parameters
BASE_DT = Parameter("BASE_DT", default="12312019")
ENTITY_ID = Parameter("ENTITY_ID", default="SASBank_1")
CYCLE_ID = Parameter("CYCLE_ID", default="10000")
FA_ID = Parameter("FA_ID", default="2022.1.1")
FA_PATH = get_file_path(FA_ID)
RUN_INSTANCE = get_run_instance(FA_PATH, CYCLE_ID, BASE_DT)
# Initialize and run the task
args = processParameters(
BASE_DT=BASE_DT,
CYCLE_ID=CYCLE_ID,
ENTITY_ID=ENTITY_ID,
FA_ID=FA_ID,
FA_PATH=FA_PATH,
RUN_INSTANCE=RUN_INSTANCE,
NODE_CODE="core_node_init",
RUN_OPTION="core_cfg.run_option",
SYSTEM_OPTION="sys_cfg.run_option",
FLOW_OPTION="core_res.flow_option",
)
initialize(args)
Bruno Nunes
02/17/2022, 12:34 PMAnna Geller
02/17/2022, 4:08 PMimport prefect
@task
def do_sth_with_parameters():
base_dt = prefect.context.parameters["BASE_DT"]
Bruno Nunes
02/17/2022, 4:50 PM