https://prefect.io logo
t

Tom Kaszemacher

08/02/2022, 10:02 PM
Hi all! I could use some help here. I’m trying to query a DB to get a collection of inputs to process, and to dispatch each input to a sub-flow:
Copy code
@task()
def get_inputs() -> List:
    # This task retrieves a list of inputs to process from a DB and returns it
    pass


with Flow(name='dispatcher') as dispatcher:

    inputs = get_inputs()

    for i in inputs:
        create_flow_run('worker', parameters={'input': i})
It doesn’t work as I need to provide either nouts or return a Tuple in order to iterate over my inputs, however I don’t know the size of it. What would be a correct approach here? Thanks!
1
m

Mason Menges

08/02/2022, 10:23 PM
Hey Tom for prefect 1.0 you'll want to check out mapping here https://docs-v1.prefect.io/core/concepts/mapping.html. I believe this should allow you to iterate the create_flow_run task based on the number of inputs you get.
upvote 1
t

Tom Kaszemacher

08/02/2022, 10:57 PM
Hey @Mason Menges yes that works thanks!
m

Mason Menges

08/02/2022, 10:58 PM
No Problem 😄
n

Nate

08/03/2022, 1:10 AM
if you're curious, you can do something very close to your original code with
prefect>=2.0.0
! for example:
Copy code
from prefect import task, flow
from typing import Any, List

@flow
def my_subflow(subflow_input: Any):
    print('interesting tasks being called!')

@task()
def get_inputs() -> List:
    print('gathering inputs...')
    return []

@flow
def my_flow(inputs: Any = None):
    
    inputs = inputs if inputs else get_inputs()

    for i in inputs:
        my_subflow(subflow_input=i)
t

Tom Kaszemacher

08/05/2022, 1:26 PM
Thanks @Nate
5 Views