https://prefect.io logo
k

Kyle

07/10/2023, 10:30 PM
what is the recommended method for running sub flows sequentially where each subflow is a bunch of tasks mapped to run concurrently? i dont want the second subflow to run until the first one is complete. i tried the following but its complaining about the mapped list results… Example…
Copy code
i get AttributeError: 'list' object has no attribute 'submit' because of the multiple tasks

@task
def task1(item: str):
    # make api call; save data
    return featherfilepath

@task 
def task2(item: str):
    # process stats; save data
    return featherfilepath

@flow # flowrun task1
def map_task1():
    # get list of items to process; convert to list
    items = feather_instance.get("items")
    items = items['name'].tolist()
    # map a task to each item in the list; run concurrently
    task1_multiple = task1.map(item=items)

@flow # flowrun task2
def map_task2():
    # get list of items to process; convert to list
    items = feather_instance.get("items")
    items = items['name'].tolist()
    # map a task to each item in the list; run concurrently
    task2_multiple = task2.map(item=items)

@flow
def flowrun():
    # api calls; cache from feather for one hour
    task1 = map_task1().submit()
    # process ta after all klines are saved to feathers
    task2 = map_task2().submit(wait_for=[task1])

if __name__ == "__main__":
    flowrun()