also is there a common pattern for chaining tasks ...
# ask-community
d
also is there a common pattern for chaining tasks post mapping? I.e. if i map a list of ints to a task that multiples them by 2, and then want each of those to pass their value down to a parametrized StartFlowRun task (baiscally a task that will do some comptutation over a map of values, and then trigger n downstream flows for each value
k
Not sure if there's a common pattern that others are using, but you can certainly pass values to the
StartFlowRun
task using the
parameters
kwarg - which accepts a dict of all of your parameters in that flow (that is the
{name_of_your_task: the_value_of_the_Parameter}
). In this case, you'd likely start a FlowRun for each of the returned mapped values.
d
yes using the parameter args seem straight forward with a single pass through, but when defining a flow its not like i can loop of a list of results and create a task for each, i would have to use the map functioanlity, but unsure how to make the map functiaonlity only change the parameter kwarg
ah i guess unmapped inputs plus mapped inputs would work but id still have to transform the results from the triggered task into a parameters dict somehow
j
@DJ Erraballi You can do looping if you want: https://docs.prefect.io/core/advanced_tutorials/task-looping.html
upvote 2
d
hmm not sure loop here in this scenario does what i want, i need to in the flow create multiple isntances of a task, i can easily do that with the map function, but i am mapping over the results of a upstream task, and i need to actually transfsorm those results because the input to the mapped task is not equal to the result of the upstream task (and could never, startFlowRun takes a combination of unmapped and mapped params, and the mapped params themselves require transformation into the parameter dict
i just need to know how I can apply a transformation over the mapped results
lemme see if i can create a concrete exmaple of what i am trying to do
Copy code
@task
def taskA() -> List[int]: 
  return results

with Flow() as flow:
  param = Parameter('blah')
  a_results = taskA()
  flow_run_task = FlowRunTask()
  flow_run_task.map(flow_name=unmapped('DownstreamFlow'), parameters=[{'blah': param, 'resultId': result} for result in a_results])
so the expectation is that we have N flow run tasks triggered after running taskA, (no wait)
j
@DJ Erraballi just make another task to prep your params. (So move the list comprehension inside a task.) Full example that runs:
Copy code
import prefect
from prefect import task, Flow, Parameter, unmapped
from typing import List


@task
def taskA() -> List[int]:
    return [1, 2, 3]


@task
def prep_params(p, a_results):
    return [{'blah': p, 'resultId': result} for result in a_results]


@task
def fake_flow_run_task(flow_name, params):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Running flow named {} with params: {}".format(flow_name, params))


with Flow("Test") as flow:
    p = Parameter('blah')
    a_results = taskA()
    prepped_params = prep_params(p, a_results)
    results = fake_flow_run_task.map(flow_name=unmapped('DownstreamFlow'), params=prepped_params)

flow.run(parameters={'blah': 'test'})