Dave
02/22/2021, 6:51 PMimport prefect
...
@task
def setup_flows(response):
flows_to_run = []
for avail in json.loads(response):
if (avail['Name']) == 'id1':
params = {'Date':avail['Date']}
flows_to_run.append({'flow_name': 'flow_1','parameters': params})
if (avail['Name']) == 'id2':
params = {'Date':avail['Date']}
flows_to_run.append( {'flow_name': 'flow_2','parameters': params})
return flows_to_run
start_flow = StartFlowRun(project_name="project")
with Flow("flow_scheduler", schedule, state_handlers=[state_handler]) as flow:
response = fetch()
mapped = setup_flows(response)
flows = start_flow.map(mapped)
Kyle Moon-Wright
02/22/2021, 7:10 PMDave
02/22/2021, 7:30 PM...
setup_flows(content):
flow_params = []
for avail in json.loads(content):
flow_params.append({'Date':avail['Date']})
return flow_params
start_flow = StartFlowRun(project_name="project")
...
with Flow("flow_scheduler", schedule, state_handlers=[state_handler]) as flow:
...
params = setup_flows(response)
start_flow.map(flow_name=unmapped('flow_1'), parameters=params)
This would work because I am specifying the "flow_name" and the "parameters", but I would like it to iterate over the flow_name,parameter pairs.
This means if I don't specify what the input is, it always believe its the flow_name parameter in the StartFlowRun.Kyle Moon-Wright
02/25/2021, 12:44 AMDave
02/25/2021, 7:56 AM