https://prefect.io logo
Title
k

Kyle Austin

02/01/2023, 1:57 PM
I have a question about using subflows and passing large input parameters to subflows. Our team was hoping to write a standard subflow that happens after ingesting some data (mostly from various APIs) and resulting in a list output and the subflow would take the list output and take it on a standard process to get into our data warehouse. But when I try to pass a large amount of data to a subflow I am getting 403 Forbidden for url .... errors . Here is an example of what we had in mind would work.
from prefect import task, flow

@task(name="fake junk task")
def do_nothing(input):
    return input


@task(name="junk data task")
def make_junk_data(num_rows):
    return [
        {f"key_{i}": f"value_{i+j}" for i in range(500)}
        for j in range(num_rows)
    ]

@flow(name="subflow")
def subflow(data):
    do_nothing(data)

@flow(name="main flow")
def main_flow(num_rows):
    data = make_junk_data(num_rows)
    subflow(data=data)

if __name__ == "__main__":
    main_flow(50)
When main_flow input is 50 no problem. But if the input is increased to like 1000 it crashes and gives the error. httpx.HTTPStatusError: Client error '403 Forbidden' for url. By investigating the traceback it looks like the subflow is being "triggered" via API calls (to prefect cloud and with this huge input at a parameter to that call) in the background. So guessing this practice of passing a huge input as a parameter to a subflow is just not the ideal practice? Are there any recommendations for how to achieve this kind of pattern? I am thinking of writing the data first to file/cloud storage first and just pass the metadata for that instead but are there any parameters for subflows/tasks that would allow us to pass a huge list to a subflow like this? Thanks for any help!