Hi guys, I'm trying to create a task template in ...
# prefect-server
b
Hi guys, I'm trying to create a task template in prefect passing a list of parameters. To ensure that my task is generic it has only one argument which is a python list of dict. The idea is to get my parameters, process them to define a python list and pass it to my tasks. The problem that I'm seeing is that the Parameter is creating a task for each manipulation of the arguments. Thanks
a
Could you please move the code block and the logs into the thread to keep the main channel a bit cleaner?
@Bruno Nunes I understand you want to avoid the intermediary tasks such as Add, List, Dict, correct? Those are created automatically when you perform string concatenation and manipulation within your flow. You can avoid that by moving this into separate tasks e.g.
Copy code
@task
def get_file_path(fa_id):
    return "C:\\Development\\Temp\\Prefect_Cirrus_Core\\core\\" + fa_id + "\\"

@task
def get_run_instance(fa_path, cycle_id, base_dt):
    return fa_path + "prefect\\run_instance\\" + "prefect-spre-" + cycle_id + "-" + base_dt


with Flow("prefect-spre-dataPrep") as flow:
    # Global parameters
    BASE_DT = Parameter("BASE_DT", default="12312019")
    ENTITY_ID = Parameter("ENTITY_ID", default="SASBank_1")
    CYCLE_ID = Parameter("CYCLE_ID", default="10000")
    FA_ID = Parameter("FA_ID", default="2022.1.1")
    FA_PATH = get_file_path(FA_ID)
    RUN_INSTANCE = get_run_instance(FA_PATH, CYCLE_ID, BASE_DT)
    # Initialize and run the task
    args = processParameters(
        BASE_DT=BASE_DT,
        CYCLE_ID=CYCLE_ID,
        ENTITY_ID=ENTITY_ID,
        FA_ID=FA_ID,
        FA_PATH=FA_PATH,
        RUN_INSTANCE=RUN_INSTANCE,
        NODE_CODE="core_node_init",
        RUN_OPTION="core_cfg.run_option",
        SYSTEM_OPTION="sys_cfg.run_option",
        FLOW_OPTION="core_res.flow_option",
    )
    initialize(args)
b
Logs: [2022-02-17 101344+0100] INFO - prefect.TaskRunner | Task 'FA_ID': Starting task run... [2022-02-17 101344+0100] INFO - prefect.TaskRunner | Task 'FA_ID': Finished task run for task with final state: 'Success' [2022-02-17 101344+0100] INFO - prefect.TaskRunner | Task 'List': Starting task run... [2022-02-17 101344+0100] INFO - prefect.TaskRunner | Task 'List': Finished task run for task with final state: 'Success' [2022-02-17 101344+0100] INFO - prefect.TaskRunner | Task 'CYCLE_ID': Starting task run... [2022-02-17 101344+0100] INFO - prefect.TaskRunner | Task 'CYCLE_ID': Finished task run for task with final state: 'Success' [2022-02-17 101345+0100] INFO - prefect.TaskRunner | Task 'BASE_DT': Starting task run... [2022-02-17 101345+0100] INFO - prefect.TaskRunner | Task 'BASE_DT': Finished task run for task with final state: 'Success' [2022-02-17 101345+0100] INFO - prefect.TaskRunner | Task 'List': Starting task run... [2022-02-17 101345+0100] INFO - prefect.TaskRunner | Task 'List': Finished task run for task with final state: 'Success' [2022-02-17 101345+0100] INFO - prefect.TaskRunner | Task 'ENTITY_ID': Starting task run... [2022-02-17 101345+0100] INFO - prefect.TaskRunner | Task 'ENTITY_ID': Finished task run for task with final state: 'Success' [2022-02-17 101345+0100] INFO - prefect.TaskRunner | Task 'Dict': Starting task run... [2022-02-17 101345+0100] INFO - prefect.TaskRunner | Task 'Dict': Finished task run for task with final state: 'Success' [2022-02-17 101345+0100] INFO - prefect.TaskRunner | Task 'List': Starting task run... [2022-02-17 101345+0100] INFO - prefect.TaskRunner | Task 'List': Finished task run for task with final state: 'Success' [2022-02-17 101345+0100] INFO - prefect.TaskRunner | Task 'Dict': Starting task run... [2022-02-17 101345+0100] INFO - prefect.TaskRunner | Task 'Dict': Finished task run for task with final state: 'Success' [2022-02-17 101345+0100] INFO - prefect.TaskRunner | Task 'Add': Starting task run... [2022-02-17 101345+0100] INFO - prefect.TaskRunner | Task 'Add': Finished task run for task with final state: 'Success' [2022-02-17 101345+0100] INFO - prefect.TaskRunner | Task 'List': Starting task run... [2022-02-17 101345+0100] INFO - prefect.TaskRunner | Task 'List': Finished task run for task with final state: 'Success' [2022-02-17 101345+0100] INFO - prefect.TaskRunner | Task 'Dict': Starting task run... [2022-02-17 101345+0100] INFO - prefect.TaskRunner | Task 'Dict': Finished task run for task with final state: 'Success' [2022-02-17 101345+0100] INFO - prefect.TaskRunner | Task 'Add': Starting task run... [2022-02-17 101345+0100] INFO - prefect.TaskRunner | Task 'Add': Finished task run for task with final state: 'Success' [2022-02-17 101345+0100] INFO - prefect.TaskRunner | Task 'List': Starting task run... [2022-02-17 101345+0100] INFO - prefect.TaskRunner | Task 'List': Finished task run for task with final state: 'Success' [2022-02-17 101345+0100] INFO - prefect.TaskRunner | Task 'Add': Starting task run... [2022-02-17 101345+0100] INFO - prefect.TaskRunner | Task 'Add': Finished task run for task with final state: 'Success' [2022-02-17 101345+0100] INFO - prefect.TaskRunner | Task 'Dict': Starting task run... [2022-02-17 101345+0100] INFO - prefect.TaskRunner | Task 'Dict': Finished task run for task with final state: 'Success' [2022-02-17 101345+0100] INFO - prefect.TaskRunner | Task 'Dict': Starting task run... [2022-02-17 101345+0100] INFO - prefect.TaskRunner | Task 'Dict': Finished task run for task with final state: 'Success' [2022-02-17 101346+0100] INFO - prefect.TaskRunner | Task 'Add': Starting task run... [2022-02-17 101346+0100] INFO - prefect.TaskRunner | Task 'Add': Finished task run for task with final state: 'Success' [2022-02-17 101346+0100] INFO - prefect.TaskRunner | Task 'Add': Starting task run... [2022-02-17 101346+0100] INFO - prefect.TaskRunner | Task 'Add': Finished task run for task with final state: 'Success' [2022-02-17 101346+0100] INFO - prefect.TaskRunner | Task 'Add': Starting task run... [2022-02-17 101346+0100] INFO - prefect.TaskRunner | Task 'Add': Finished task run for task with final state: 'Success' [2022-02-17 101346+0100] INFO - prefect.TaskRunner | Task 'Add': Starting task run... [2022-02-17 101346+0100] INFO - prefect.TaskRunner | Task 'Add': Finished task run for task with final state: 'Success' [2022-02-17 101346+0100] INFO - prefect.TaskRunner | Task 'List': Starting task run... [2022-02-17 101346+0100] INFO - prefect.TaskRunner | Task 'List': Finished task run for task with final state: 'Success' [2022-02-17 101346+0100] INFO - prefect.TaskRunner | Task 'Dict': Starting task run... [2022-02-17 101346+0100] INFO - prefect.TaskRunner | Task 'Dict': Finished task run for task with final state: 'Success' [2022-02-17 101346+0100] INFO - prefect.TaskRunner | Task 'List': Starting task run... [2022-02-17 101346+0100] INFO - prefect.TaskRunner | Task 'List': Finished task run for task with final state: 'Success' [2022-02-17 101346+0100] INFO - prefect.TaskRunner | Task 'Initialize': Starting task run...
🙏 1
@Anna Geller - Thanks a lot for your reply. So, I guess that parameters are used as tasks so it means that if I manipulate the parameter object it assumes that the resulting object is a new task. What I'm trying to do is to get the values from the parameters and use them as inputs for the next tasks but I don't want to have a link to the downstream tasks nor create a new task. Is this possible? New code: from prefect import Flow, task, Task, Parameter import saspy import os import time class RunSpreTask(Task):     def run(self, **params):         print(params)         list_params = []         for key, value in params.items():             dict_param = dict()             dict_param["NAME"] = key             dict_param["VALUE"] = value             list_params.append(dict_param)         # Create a SAS session and run the core_run_sas.sas wrapper passing the parameters         sas = saspy.SASsession()         sas.symput('FA_PATH', params['FA_PATH'])         sas.symput('INPUT_PARAMETERS', list_params)         r = sas.submit('''             %put INPUT_PARAMETERS: &INPUT_PARAMETERS;             %include "&FA_PATH\prefect\source\core_run_sas.sas";         ''')         # Show the log and save it in a file         print(r['LOG'])         log_name = params['NODE_CODE'] + "_" + \             time.strftime("%Y%m%d_%H%M%S") + '.log'         log_file = open(os.path.join(             params['RUN_INSTANCE']+'\\logs', log_name), "w")         log_file.write(r['LOG'])         log_file.close()         # Close the SAS session         sas.endsas() # Instantiate the flow and the task initialize = RunSpreTask(name='Initialize', log_stdout=True) initialize2 = RunSpreTask(name='Initialize2', log_stdout=True) flow = Flow("prefect-spre-dataPrep") # Global parameters BASE_DT = Parameter('BASE_DT', default='12312019') ENTITY_ID = Parameter('ENTITY_ID', default='SASBank_1') CYCLE_ID = Parameter('CYCLE_ID', default='10000') FA_ID = Parameter('FA_ID', default='2022.1.1') FA_PATH = Parameter(     'FA_PATH', default='C:\\Development\\Temp\\Prefect_Cirrus_Core\\core\\2022.1.1\\') RUN_INSTANCE = Parameter(     'RUN_INSTANCE', default='C:\\Development\\Temp\\Prefect_Cirrus_Core\\core\\2022.1.1\\prefect\\run_instance\\prefect-spre-10000-12312019') # Initialize and run the task flow.set_dependencies(task=initialize,                       upstream_tasks=[],                       keyword_tasks=dict(BASE_DT=BASE_DT,                                          CYCLE_ID=CYCLE_ID,                                          ENTITY_ID=ENTITY_ID,                                          FA_ID=FA_ID,                                          FA_PATH=FA_PATH,                                          RUN_INSTANCE=RUN_INSTANCE,                                          NODE_CODE='core_node_init',                                          RUN_OPTION='core_cfg.run_option',                                          SYSTEM_OPTION='sys_cfg.run_option',                                          FLOW_OPTION='core_res.flow_option')                       ) flow.set_dependencies(task=initialize2,                       upstream_tasks=[initialize],                       keyword_tasks=dict(                           FA_PATH=FA_PATH,                           RUN_INSTANCE=RUN_INSTANCE,                           NODE_CODE='core_node_init2',                           FLOW_OPTION='core_res.flow_option')                       ) flow.visualize() flow.run()
a
you can also retrieve parameter values from context - perhaps it fits your use case better? e.g.
Copy code
import prefect

@task
def do_sth_with_parameters():
    base_dt = prefect.context.parameters["BASE_DT"]
b
This works fine thanks! 👍
👍 1