https://prefect.io logo
b

Bruno Nunes

02/24/2022, 1:38 PM
Hello, I'm trying to find examples of subflows using imperative statements. Do anyone has an example that could share? Thanks Bruno
a

Anna Geller

02/24/2022, 1:47 PM
What do you mean by imperative statements?
If you mean the imperative API for task definition, you can check out this thread that discusses this topic more. If you have some custom classes, you can always call them in your tasks. And if you are looking for more subflow/flow-of-flows examples in general, check out this list.
b

Bruno Nunes

02/24/2022, 3:01 PM
I'm trying to do something like: ...
Copy code
# Global parameters
BASE_DT = Parameter('BASE_DT', default='12312019')
ENTITY_ID = Parameter('ENTITY_ID', default='SASBank_1')
CYCLE_ID = Parameter('CYCLE_ID', default='10000')
FA_ID = Parameter('FA_ID', default='2022.1.1')
FA_PATH = Parameter(
    'FA_PATH', default='C:\\Development\\Temp\\Prefect_Cirrus_Core\\core\\2022.1.1')
RUN_INSTANCE = Parameter(
    'RUN_INSTANCE', default='C:\\Development\\Temp\\Prefect_Cirrus_Core\\core\\2022.1.1\\prefect\\run_instance\\prefect-spre-10000-12312019')

# Instantiate the flow
flow_init = Flow("Initialize")
# # Initialize and run the tasks
initialize = RunSpreTask(name='Initialize', log_stdout=True)
flow_init.set_dependencies(task=initialize,
                           upstream_tasks=[],
                           keyword_tasks=dict(BASE_DT=BASE_DT,
                                              CYCLE_ID=CYCLE_ID,
                                              ENTITY_ID=ENTITY_ID,
                                              FA_ID=FA_ID,
                                              FA_PATH=FA_PATH,
                                              RUN_INSTANCE=RUN_INSTANCE,
                                              NODE_CODE='core_node_init.sas',
                                              RUN_OPTION='core_cfg.run_option',
                                              SYSTEM_OPTION='sys_cfg.run_option',
                                              FLOW_OPTION='core_res.flow_option',
                                              )
                           )

flow_data_enrichment = Flow("Data Enrichment")
filter_by_entity = RunSpreTask(name='Filter by Entity', log_stdout=True)
flow_data_enrichment.set_dependencies(task=filter_by_entity,
                                      upstream_tasks=[flow_init],
                                      keyword_tasks=dict(
                                          GROUP_FLG='N',
                                          NODE_CODE='core_node_filter_entity.sas',
                                          ENTITY_IN='core_lnd.entity',
                                          ENTITY_OUT='core_res.entity',
                                      )
                                      )
I'm getting an error because the prefect.context.parameter can't be resolved in my RunSpreTask class.
a

Anna Geller

02/24/2022, 3:04 PM
it’s probably easier to instantiate the task globally and then calling it within your Flow e.g.
also, the parameters shouldn’t be defined globally, but rather within the Flow block. Check out this blog post - it provides many examples https://www.prefect.io/blog/how-to-make-your-data-pipelines-more-dynamic-using-parameters-in-prefect/
3 Views