https://prefect.io logo
#prefect-community
Title
# prefect-community
d

Daniel Gazit

02/04/2020, 2:52 PM
Hi :)) Question: how does one scale tasks vertically? I have a pipeline that maps some initial values to multiple tasks. I want the entire pipeline to be performed on each initial value before moving to the next value (or that each worker will process the entire pipeline before moving to the next one)
j

Joe Schmid

02/04/2020, 4:23 PM
Hi @Daniel Gazit, my two cents as a fellow Prefect user is that your best bet for this might be to do it via multiple Flow runs. The initial task in your flow could grab the first item in the list to process and run the rest of the flow on that item. Your last task in the flow could trigger another Flow run, passing the remaining N - 1 items as a parameter to that next flow and so on. The Nth flow run wouldn't have any more items to process so it would avoid triggering another Flow run and your processing would be complete.
Some clarification on the above: if you were running with Prefect Cloud, what I described above is possible and you'd use FlowRunTask (https://docs.prefect.io/api/latest/tasks/cloud.html#flowruntask) to trigger the next Flow run. With Core, that's not available, so change what I wrote above to: run the Flows synchronously in a loop something like this:
Copy code
items = [1, 2, 3, 4]
previous_results = None
for i in items:
    previous_results = flow.run(parameters={
        "item_to_process": i,
        "previous_results": previous_results})
Hope this helps. Let me know if that makes sense.
d

Daniel Gazit

02/06/2020, 6:16 AM
Yeah, this was really helpful, thanks
👍 1