Justin Chavez
06/02/2021, 9:46 PMfrom typing import Dict, List
from prefect import task, Flow
@task
def generate_user_status_list() -> List[Dict]:
return [
{
"user": "User 1",
"status": "active"
},
{
"user": "User 2",
"status": "inactive"
}
]
@task(log_stdout=True)
def print_user_status(user: str, status: str):
print(f"Status of {user} is {status}")
with Flow('check user status') as flow:
user_status_list = generate_user_status_list
print_user_status.map(user_status_list) # <<<< Fails here as it doesn't recognize the parameters in the dictionaries
flow.run()
It's the second to last line that I am trying to figure out. I want each member of the generated list in the first task to be the parameters to the second. It's like I am trying to do the parameterized flow run in the StartFlowRun section, except I want to run a task instead of a flow with the parameters: https://docs.prefect.io/core/idioms/flow-to-flow.html#running-a-parametrized-flow
I can get around this by creating separate tasks that create each parameter as a list of str, but was hoping for a simple all in one task that generates the params for some of my more complex downstream tasks.Kevin Kho
from typing import Dict, List
from prefect import task, Flow
@task
def generate_user_status_list() -> List[Dict]:
return [
{
"user": "User 1",
"status": "active"
},
{
"user": "User 2",
"status": "inactive"
}
]
@task(log_stdout=True)
def print_user_status(user_dict):
user = user_dict['user']
status = user_dict['status']
print(f"Status of {user} is {status}")
with Flow('check user status') as flow:
user_status_list = generate_user_status_list()
print_user_status.map(user_status_list) #
flow.run()
Kevin Kho
StartFlowRun
inside a task
like StartFlowRun(xxx, xxx, params={'user': user, "status": status}).run()
Kevin Kho
@task(log_stdout=True)
def print_user_status(user_dict):
user = user_dict['user']
status = user_dict['status']
StartFlowRun(xxx, xxx, params={'user': user, "status": status}).run()
Justin Chavez
06/02/2021, 10:01 PMprint_user_status
should maintain the separate input parameters since it's a task I am sharing with another flow. There isn't a way to do an unfurling of the dictionary as a map input?Justin Chavez
06/02/2021, 10:02 PMKevin Kho
Justin Chavez
06/02/2021, 10:06 PM