DJ Erraballi
11/09/2020, 7:28 PMKyle Moon-Wright
11/09/2020, 7:51 PMStartFlowRun
task using the parameters
kwarg - which accepts a dict of all of your parameters in that flow (that is the {name_of_your_task: the_value_of_the_Parameter}
). In this case, you'd likely start a FlowRun for each of the returned mapped values.DJ Erraballi
11/09/2020, 7:54 PMDJ Erraballi
11/09/2020, 7:55 PMJoe Schmid
11/09/2020, 8:02 PMDJ Erraballi
11/10/2020, 6:25 PMDJ Erraballi
11/10/2020, 6:25 PMDJ Erraballi
11/10/2020, 6:25 PMDJ Erraballi
11/10/2020, 6:26 PM@task
def taskA() -> List[int]:
return results
with Flow() as flow:
param = Parameter('blah')
a_results = taskA()
flow_run_task = FlowRunTask()
flow_run_task.map(flow_name=unmapped('DownstreamFlow'), parameters=[{'blah': param, 'resultId': result} for result in a_results])
DJ Erraballi
11/10/2020, 6:38 PMJoe Schmid
11/10/2020, 8:03 PMimport prefect
from prefect import task, Flow, Parameter, unmapped
from typing import List
@task
def taskA() -> List[int]:
return [1, 2, 3]
@task
def prep_params(p, a_results):
return [{'blah': p, 'resultId': result} for result in a_results]
@task
def fake_flow_run_task(flow_name, params):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Running flow named {} with params: {}".format(flow_name, params))
with Flow("Test") as flow:
p = Parameter('blah')
a_results = taskA()
prepped_params = prep_params(p, a_results)
results = fake_flow_run_task.map(flow_name=unmapped('DownstreamFlow'), params=prepped_params)
flow.run(parameters={'blah': 'test'})