https://prefect.io logo
d

Dave

02/22/2021, 6:51 PM
Hello there! I have a flow that schedules other flows based on a third party API. So I thought StartFlowRun had the functionality I needed but I came up a little short. My problem is I want to execute the StartFlowRun task but I have a list of (flow_name,parameters), So I am not quite sure if I can accomplish what I want with task.map(...). I've attached a little snippet in the thread to give an idea on what my problem is ⬇️
Copy code
import prefect
...

@task
def setup_flows(response):
    flows_to_run = []
    for avail in json.loads(response):
        if (avail['Name']) == 'id1':
            params = {'Date':avail['Date']}
            flows_to_run.append({'flow_name': 'flow_1','parameters': params})
        if (avail['Name']) == 'id2':
            params = {'Date':avail['Date']}
            flows_to_run.append( {'flow_name': 'flow_2','parameters': params})
    return flows_to_run
	
start_flow = StartFlowRun(project_name="project")

with Flow("flow_scheduler", schedule, state_handlers=[state_handler]) as flow:
    response = fetch()
    mapped = setup_flows(response)
    flows = start_flow.map(mapped)
k

Kyle Moon-Wright

02/22/2021, 7:10 PM
Hey @Dave, Are you encountering a specific error? If not, take a look at this thread, in which there are two means to accomplish mapping with StartFlowRun depending on your specific needs.
d

Dave

02/22/2021, 7:30 PM
Hi Kyle, In my case I am receiving an error because I have to specify the parameters in the start_flow.map() e.g:
Copy code
...

setup_flows(content):
	flow_params = []
	for avail in json.loads(content):
		flow_params.append({'Date':avail['Date']})
	return flow_params

start_flow = StartFlowRun(project_name="project")
...

with Flow("flow_scheduler", schedule, state_handlers=[state_handler]) as flow:
...
params = setup_flows(response)
start_flow.map(flow_name=unmapped('flow_1'), parameters=params)
This would work because I am specifying the "flow_name" and the "parameters", but I would like it to iterate over the flow_name,parameter pairs. This means if I don't specify what the input is, it always believe its the flow_name parameter in the StartFlowRun.
Update: I made my own version of the StartFlowRun to get around it! 🦦
🚀 1
🦦 1
k

Kyle Moon-Wright

02/25/2021, 12:44 AM
Hey Dave, sorry for missing this - it is very easy to get distracted with everything going on at Prefect…. I’m glad you figured this out with a custom StartFlowRun Task!
d

Dave

02/25/2021, 7:56 AM
No worries, it's also hard to get around and know everything 😄