https://prefect.io logo
Title
b

Bruno Nunes

02/17/2022, 10:50 AM
Hi guys, I'm trying to create a task template in prefect passing a list of parameters. To ensure that my task is generic it has only one argument which is a python list of dict. The idea is to get my parameters, process them to define a python list and pass it to my tasks. The problem that I'm seeing is that the Parameter is creating a task for each manipulation of the arguments. Thanks
a

Anna Geller

02/17/2022, 11:17 AM
Could you please move the code block and the logs into the thread to keep the main channel a bit cleaner?
@Bruno Nunes I understand you want to avoid the intermediary tasks such as Add, List, Dict, correct? Those are created automatically when you perform string concatenation and manipulation within your flow. You can avoid that by moving this into separate tasks e.g.
@task
def get_file_path(fa_id):
    return "C:\\Development\\Temp\\Prefect_Cirrus_Core\\core\\" + fa_id + "\\"

@task
def get_run_instance(fa_path, cycle_id, base_dt):
    return fa_path + "prefect\\run_instance\\" + "prefect-spre-" + cycle_id + "-" + base_dt


with Flow("prefect-spre-dataPrep") as flow:
    # Global parameters
    BASE_DT = Parameter("BASE_DT", default="12312019")
    ENTITY_ID = Parameter("ENTITY_ID", default="SASBank_1")
    CYCLE_ID = Parameter("CYCLE_ID", default="10000")
    FA_ID = Parameter("FA_ID", default="2022.1.1")
    FA_PATH = get_file_path(FA_ID)
    RUN_INSTANCE = get_run_instance(FA_PATH, CYCLE_ID, BASE_DT)
    # Initialize and run the task
    args = processParameters(
        BASE_DT=BASE_DT,
        CYCLE_ID=CYCLE_ID,
        ENTITY_ID=ENTITY_ID,
        FA_ID=FA_ID,
        FA_PATH=FA_PATH,
        RUN_INSTANCE=RUN_INSTANCE,
        NODE_CODE="core_node_init",
        RUN_OPTION="core_cfg.run_option",
        SYSTEM_OPTION="sys_cfg.run_option",
        FLOW_OPTION="core_res.flow_option",
    )
    initialize(args)
b

Bruno Nunes

02/17/2022, 12:34 PM
Logs: [2022-02-17 10:13:44+0100] INFO - prefect.TaskRunner | Task 'FA_ID': Starting task run... [2022-02-17 10:13:44+0100] INFO - prefect.TaskRunner | Task 'FA_ID': Finished task run for task with final state: 'Success' [2022-02-17 10:13:44+0100] INFO - prefect.TaskRunner | Task 'List': Starting task run... [2022-02-17 10:13:44+0100] INFO - prefect.TaskRunner | Task 'List': Finished task run for task with final state: 'Success' [2022-02-17 10:13:44+0100] INFO - prefect.TaskRunner | Task 'CYCLE_ID': Starting task run... [2022-02-17 10:13:44+0100] INFO - prefect.TaskRunner | Task 'CYCLE_ID': Finished task run for task with final state: 'Success' [2022-02-17 10:13:45+0100] INFO - prefect.TaskRunner | Task 'BASE_DT': Starting task run... [2022-02-17 10:13:45+0100] INFO - prefect.TaskRunner | Task 'BASE_DT': Finished task run for task with final state: 'Success' [2022-02-17 10:13:45+0100] INFO - prefect.TaskRunner | Task 'List': Starting task run... [2022-02-17 10:13:45+0100] INFO - prefect.TaskRunner | Task 'List': Finished task run for task with final state: 'Success' [2022-02-17 10:13:45+0100] INFO - prefect.TaskRunner | Task 'ENTITY_ID': Starting task run... [2022-02-17 10:13:45+0100] INFO - prefect.TaskRunner | Task 'ENTITY_ID': Finished task run for task with final state: 'Success' [2022-02-17 10:13:45+0100] INFO - prefect.TaskRunner | Task 'Dict': Starting task run... [2022-02-17 10:13:45+0100] INFO - prefect.TaskRunner | Task 'Dict': Finished task run for task with final state: 'Success' [2022-02-17 10:13:45+0100] INFO - prefect.TaskRunner | Task 'List': Starting task run... [2022-02-17 10:13:45+0100] INFO - prefect.TaskRunner | Task 'List': Finished task run for task with final state: 'Success' [2022-02-17 10:13:45+0100] INFO - prefect.TaskRunner | Task 'Dict': Starting task run... [2022-02-17 10:13:45+0100] INFO - prefect.TaskRunner | Task 'Dict': Finished task run for task with final state: 'Success' [2022-02-17 10:13:45+0100] INFO - prefect.TaskRunner | Task 'Add': Starting task run... [2022-02-17 10:13:45+0100] INFO - prefect.TaskRunner | Task 'Add': Finished task run for task with final state: 'Success' [2022-02-17 10:13:45+0100] INFO - prefect.TaskRunner | Task 'List': Starting task run... [2022-02-17 10:13:45+0100] INFO - prefect.TaskRunner | Task 'List': Finished task run for task with final state: 'Success' [2022-02-17 10:13:45+0100] INFO - prefect.TaskRunner | Task 'Dict': Starting task run... [2022-02-17 10:13:45+0100] INFO - prefect.TaskRunner | Task 'Dict': Finished task run for task with final state: 'Success' [2022-02-17 10:13:45+0100] INFO - prefect.TaskRunner | Task 'Add': Starting task run... [2022-02-17 10:13:45+0100] INFO - prefect.TaskRunner | Task 'Add': Finished task run for task with final state: 'Success' [2022-02-17 10:13:45+0100] INFO - prefect.TaskRunner | Task 'List': Starting task run... [2022-02-17 10:13:45+0100] INFO - prefect.TaskRunner | Task 'List': Finished task run for task with final state: 'Success' [2022-02-17 10:13:45+0100] INFO - prefect.TaskRunner | Task 'Add': Starting task run... [2022-02-17 10:13:45+0100] INFO - prefect.TaskRunner | Task 'Add': Finished task run for task with final state: 'Success' [2022-02-17 10:13:45+0100] INFO - prefect.TaskRunner | Task 'Dict': Starting task run... [2022-02-17 10:13:45+0100] INFO - prefect.TaskRunner | Task 'Dict': Finished task run for task with final state: 'Success' [2022-02-17 10:13:45+0100] INFO - prefect.TaskRunner | Task 'Dict': Starting task run... [2022-02-17 10:13:45+0100] INFO - prefect.TaskRunner | Task 'Dict': Finished task run for task with final state: 'Success' [2022-02-17 10:13:46+0100] INFO - prefect.TaskRunner | Task 'Add': Starting task run... [2022-02-17 10:13:46+0100] INFO - prefect.TaskRunner | Task 'Add': Finished task run for task with final state: 'Success' [2022-02-17 10:13:46+0100] INFO - prefect.TaskRunner | Task 'Add': Starting task run... [2022-02-17 10:13:46+0100] INFO - prefect.TaskRunner | Task 'Add': Finished task run for task with final state: 'Success' [2022-02-17 10:13:46+0100] INFO - prefect.TaskRunner | Task 'Add': Starting task run... [2022-02-17 10:13:46+0100] INFO - prefect.TaskRunner | Task 'Add': Finished task run for task with final state: 'Success' [2022-02-17 10:13:46+0100] INFO - prefect.TaskRunner | Task 'Add': Starting task run... [2022-02-17 10:13:46+0100] INFO - prefect.TaskRunner | Task 'Add': Finished task run for task with final state: 'Success' [2022-02-17 10:13:46+0100] INFO - prefect.TaskRunner | Task 'List': Starting task run... [2022-02-17 10:13:46+0100] INFO - prefect.TaskRunner | Task 'List': Finished task run for task with final state: 'Success' [2022-02-17 10:13:46+0100] INFO - prefect.TaskRunner | Task 'Dict': Starting task run... [2022-02-17 10:13:46+0100] INFO - prefect.TaskRunner | Task 'Dict': Finished task run for task with final state: 'Success' [2022-02-17 10:13:46+0100] INFO - prefect.TaskRunner | Task 'List': Starting task run... [2022-02-17 10:13:46+0100] INFO - prefect.TaskRunner | Task 'List': Finished task run for task with final state: 'Success' [2022-02-17 10:13:46+0100] INFO - prefect.TaskRunner | Task 'Initialize': Starting task run...
:thank-you: 1
@Anna Geller - Thanks a lot for your reply. So, I guess that parameters are used as tasks so it means that if I manipulate the parameter object it assumes that the resulting object is a new task. What I'm trying to do is to get the values from the parameters and use them as inputs for the next tasks but I don't want to have a link to the downstream tasks nor create a new task. Is this possible? New code: from prefect import Flow, task, Task, Parameter import saspy import os import time class RunSpreTask(Task):     def run(self, **params):         print(params)         list_params = []         for key, value in params.items():             dict_param = dict()             dict_param["NAME"] = key             dict_param["VALUE"] = value             list_params.append(dict_param)         # Create a SAS session and run the core_run_sas.sas wrapper passing the parameters         sas = saspy.SASsession()         sas.symput('FA_PATH', params['FA_PATH'])         sas.symput('INPUT_PARAMETERS', list_params)         r = sas.submit('''             %put INPUT_PARAMETERS: &INPUT_PARAMETERS;             %include "&FA_PATH\prefect\source\core_run_sas.sas";         ''')         # Show the log and save it in a file         print(r['LOG'])         log_name = params['NODE_CODE'] + "_" + \             time.strftime("%Y%m%d_%H%M%S") + '.log'         log_file = open(os.path.join(             params['RUN_INSTANCE']+'\\logs', log_name), "w")         log_file.write(r['LOG'])         log_file.close()         # Close the SAS session         sas.endsas() # Instantiate the flow and the task initialize = RunSpreTask(name='Initialize', log_stdout=True) initialize2 = RunSpreTask(name='Initialize2', log_stdout=True) flow = Flow("prefect-spre-dataPrep") # Global parameters BASE_DT = Parameter('BASE_DT', default='12312019') ENTITY_ID = Parameter('ENTITY_ID', default='SASBank_1') CYCLE_ID = Parameter('CYCLE_ID', default='10000') FA_ID = Parameter('FA_ID', default='2022.1.1') FA_PATH = Parameter(     'FA_PATH', default='C:\\Development\\Temp\\Prefect_Cirrus_Core\\core\\2022.1.1\\') RUN_INSTANCE = Parameter(     'RUN_INSTANCE', default='C:\\Development\\Temp\\Prefect_Cirrus_Core\\core\\2022.1.1\\prefect\\run_instance\\prefect-spre-10000-12312019') # Initialize and run the task flow.set_dependencies(task=initialize,                       upstream_tasks=[],                       keyword_tasks=dict(BASE_DT=BASE_DT,                                          CYCLE_ID=CYCLE_ID,                                          ENTITY_ID=ENTITY_ID,                                          FA_ID=FA_ID,                                          FA_PATH=FA_PATH,                                          RUN_INSTANCE=RUN_INSTANCE,                                          NODE_CODE='core_node_init',                                          RUN_OPTION='core_cfg.run_option',                                          SYSTEM_OPTION='sys_cfg.run_option',                                          FLOW_OPTION='core_res.flow_option')                       ) flow.set_dependencies(task=initialize2,                       upstream_tasks=[initialize],                       keyword_tasks=dict(                           FA_PATH=FA_PATH,                           RUN_INSTANCE=RUN_INSTANCE,                           NODE_CODE='core_node_init2',                           FLOW_OPTION='core_res.flow_option')                       ) flow.visualize() flow.run()
a

Anna Geller

02/17/2022, 4:08 PM
you can also retrieve parameter values from context - perhaps it fits your use case better? e.g.
import prefect

@task
def do_sth_with_parameters():
    base_dt = prefect.context.parameters["BASE_DT"]
b

Bruno Nunes

02/17/2022, 4:50 PM
This works fine thanks! 👍
👍 1