Hi all! I am having trouble understanding how to a...
# ask-community
j
Hi all! I am having trouble understanding how to approach a specific prefect mapping. Let's say I have a task that returns a list of dicts. Each member of the dictionary is a parameter to a prefect task. I have provided an example flow below to illustrate what I am trying to accomplish
Copy code
from typing import Dict, List
from prefect import task, Flow

@task
def generate_user_status_list() -> List[Dict]:
    return [
        {
        "user": "User 1", 
        "status": "active"
        },
        {
        "user": "User 2",
        "status": "inactive"
        }
    ]

@task(log_stdout=True)
def print_user_status(user: str, status: str):
    print(f"Status of {user} is {status}")


with Flow('check user status') as flow:
    user_status_list = generate_user_status_list
    print_user_status.map(user_status_list) # <<<< Fails here as it doesn't recognize the parameters in the dictionaries

flow.run()
It's the second to last line that I am trying to figure out. I want each member of the generated list in the first task to be the parameters to the second. It's like I am trying to do the parameterized flow run in the StartFlowRun section, except I want to run a task instead of a flow with the parameters: https://docs.prefect.io/core/idioms/flow-to-flow.html#running-a-parametrized-flow I can get around this by creating separate tasks that create each parameter as a list of str, but was hoping for a simple all in one task that generates the params for some of my more complex downstream tasks.
k
Hey @Justin Chavez, you can do something like this right?
Copy code
from typing import Dict, List
from prefect import task, Flow
@task
def generate_user_status_list() -> List[Dict]:
    return [
        {
        "user": "User 1", 
        "status": "active"
        },
        {
        "user": "User 2",
        "status": "inactive"
        }
    ]
@task(log_stdout=True)
def print_user_status(user_dict):
    user = user_dict['user']
    status = user_dict['status']
    print(f"Status of {user} is {status}")


with Flow('check user status') as flow:
    user_status_list = generate_user_status_list()
    print_user_status.map(user_status_list) # 
flow.run()
and then call
StartFlowRun
inside a
task
like
StartFlowRun(xxx, xxx, params={'user': user, "status": status}).run()
Like this:
Copy code
@task(log_stdout=True)
def print_user_status(user_dict):
    user = user_dict['user']
    status = user_dict['status']
    StartFlowRun(xxx, xxx, params={'user': user, "status": status}).run()
j
I could! Unfortunately, in my context
print_user_status
should maintain the separate input parameters since it's a task I am sharing with another flow. There isn't a way to do an unfurling of the dictionary as a map input?
If not I can reproduce the tasks for a different input
k
Ah I see. Unfortunately no there isn’t a way without some intermediate task after the map operation
j
No worries! Thanks for the help I just wanted to confirm that limitation.