https://prefect.io logo
#prefect-community
Title
# prefect-community
d

Daniel Lomartra

06/05/2022, 12:01 AM
Hello, If I have a parameter which is a list of dictionaries, is it possible to map a task over both the keys and the values of the parameter? Something like this:
Copy code
with Flow(
    name=FLOW_CONSTANTS["flow_name"],
    storage=STORAGE,
    run_config = DockerRun(
        image=DOCKER_IMAGE,
        labels=FLOW_LABELS
    )
) as flow:
    dbt_job = Parameter(
        name='dbt_job',
        default=[
                {
                    "name": "Job Step Name 1", "command": r"Arbitrary dbt CLI command #1"
                },
                {
                    "name": "Job Step Name 2", "command": r"Arbitrary dbt CLI command #2"
                }
            ]
    )
    dbt.map(
        command = dbt_job['command'],
        task_args= {"name": dbt_job['name']}
    )
a

Anna Geller

06/05/2022, 11:14 AM
Yes, you can map over multiple arguments, but the input must be a list since dictionaries are not ordered at a first glance your code looks fine - can you run and check?
d

Daniel Lomartra

06/05/2022, 1:46 PM
Thanks, Anna. I get some error about the slug generation. Heres the stack trace:
Copy code
Traceback (most recent call last):
    File "C:\Users\DLomartra\source\repos\MEP2\prefect-cloud\01_main\lib\site-packages\prefect\cli\build_register.py", line 133, in load_flows_from_script
    namespace = runpy.run_path(abs_path, run_name="<flow>")
    File "C:\Users\MarlinAdmin\AppData\Local\Programs\Python\Python39\lib\runpy.py", line 268, in run_path
    return _run_module_code(code, init_globals, run_name,
    File "C:\Users\MarlinAdmin\AppData\Local\Programs\Python\Python39\lib\runpy.py", line 97, in _run_module_code
    _run_code(code, mod_globals, init_globals,
    File "C:\Users\MarlinAdmin\AppData\Local\Programs\Python\Python39\lib\runpy.py", line 87, in _run_code
    exec(code, run_globals)
    File "C:\Users\DLomartra\source\repos\MEP2\prefect-cloud\01_main\flow\orchestration\test.py", line 108, in <module>
    dbt.map(
    File "C:\Users\DLomartra\source\repos\MEP2\prefect-cloud\01_main\lib\site-packages\prefect\core\task.py", line 813, in map
    return new.bind(
    File "C:\Users\DLomartra\source\repos\MEP2\prefect-cloud\01_main\lib\site-packages\prefect\core\task.py", line 754, in bind
    self.set_dependencies(
    File "C:\Users\DLomartra\source\repos\MEP2\prefect-cloud\01_main\lib\site-packages\prefect\core\task.py", line 854, in set_dependencies
    flow.set_dependencies(
    File "C:\Users\DLomartra\source\repos\MEP2\prefect-cloud\01_main\lib\site-packages\prefect\core\flow.py", line 985, in set_dependencies
    self.add_task(task)
    File "C:\Users\DLomartra\source\repos\MEP2\prefect-cloud\01_main\lib\site-packages\prefect\core\flow.py", line 535, in add_task
    self.slugs[task] = task.slug or self._generate_task_slug(task)
    File "C:\Users\DLomartra\source\repos\MEP2\prefect-cloud\01_main\lib\site-packages\prefect\core\flow.py", line 503, in _generate_task_slug        
    prefix = "-".join(parts)
  TypeError: sequence item 0: expected str instance, GetItem found
k

Kevin Kho

06/05/2022, 4:53 PM
I don’t think you can use Parameters as task_args. Task_args are applied during build time. Parameter values become populated at run time (if I’m right)
a

Anna Geller

06/05/2022, 9:12 PM
Kevin is right - I confused that with dbt_kwargs which can be passed as data dependencies to solve your problem of being able to run your dbt steps in parallel and still setting a custom name for each mapped dbt task run, you would probably need a flow of flows - check out those resources here for some examples about dbt and those for flow of flows
2 Views