Hi everyone, I am trying to define a new flow. In ...
# ask-community
a
Hi everyone, I am trying to define a new flow. In this flow I have a task that I map on thousands of values. The problem is that some children tasks fail because there are too many open connections simultaneously. When I run my flow on 1000 arguments it works fine but on 8000, some child tasks are stuck in a running state. Is there a way to do the mapping consecutively by chunks of 1000 arguments ? Basically, If I give a parameter list of 5000 arguments to my flow, I would like the flow to run the task five times consecutively. Each time the task would me mapped over 1000 arguments. I don’t have to use the results of the tasks if that would be an issue. The flow would run on a localdaskexecutor with 15 proceseses and use a prefect server as an endpoint/host. Thank you so much for your help guys
a
Hi @Adrien B can you confirm whether I understand the problem: since your Server instance struggles mapping over, say 8000 tasks in a single FlowRun, you would like to split the work into 8 flow runs, each handling 1000 tasks?
@Adrien B if that’s the case, maybe something like this will work for you: • your normal flow:
Copy code
from prefect import Flow, Parameter, task
from prefect.executors import LocalDaskExecutor

plus_one = task(lambda x: x + 1)


with Flow("dummy-child-flow", executor=LocalDaskExecutor()) as flow:
    numbers = Parameter("list_of_numbers", default=[1, 2, 3, 4, 5, 6, 7, 8, 9])
    mapped_result = plus_one.map(numbers)
• a “parent” flow that handles triggering flow runs with 1000 tasks each:
Copy code
from prefect import Flow, task, unmapped
from prefect.tasks.prefect import create_flow_run
from prefect.executors import LocalDaskExecutor


@task
def generate_thousand_numbers(start, stop, step):
    nrs = range(start, stop, step)
    return list(nrs)


with Flow("massively_mapped_flow", executor=LocalDaskExecutor()) as flow:
    flow_run_1 = generate_thousand_numbers(1, 1000, 1)
    flow_run_2 = generate_thousand_numbers(1000, 2000, 1)
    flow_run_3 = generate_thousand_numbers(2000, 3000, 1)
    flow_run_4 = generate_thousand_numbers(3000, 4000, 1)
    flow_run_5 = generate_thousand_numbers(4000, 5000, 1)
    # ... until 8
    parameters = [
        dict(list_of_numbers=flow_run_1),
        dict(list_of_numbers=flow_run_2),
        dict(list_of_numbers=flow_run_3),
        dict(list_of_numbers=flow_run_4),
        dict(list_of_numbers=flow_run_5),
        # ... until 8
    ]
    mapped_flows = create_flow_run.map(
        parameters=parameters,
        flow_name=unmapped("dummy-child-flow"),
        project_name=unmapped("community"),
    )
a
Hi @Anna Geller thank you for your answer. Yes that would work but wouldn’t I loose the ability to track the log of each child task run on the server ?
a
no, you wouldn’t. This would create 8 separate flow runs, each with 1000 mapped child tasks. You would only need to somehow know which FlowRun is responsible for which input. If you have some meaningful grouping for those inputs, then tracking would likely be easier.
k
There is a new sub flow run created and the logs would be available in the sub flow run page.
a
Ok thank you.