Thread
#prefect-community
    Luis Gallegos

    Luis Gallegos

    1 year ago
    Hi there, I have a few months with prefect. Can yo help me with this question?: I have the following flow. 1.- 300 csv (with different name and size) -> gzip -> EXASOL(DW) -> Delete (csv, gz) 2.- A flow that proccess 1 csv per run My question is how handle Prefect the flows and server resources if i run al csv at the same time. That's means run 300 flow's at the same time. (Does some kind of smart upload so don't collapse the server?) Enviroment info:

    Server 24 Cores - 96GB RAM

    Single Node

    Total size csv (150GB aprox)

    Prefect Core 0.14.0

    Docker Prefect core-0.14.0

    Executor LocalDaskExecutor (You can give some advice with DaskExecutor too)

    Note: I avoid pass a list of the csv in each task because disk space. Notice that i delete the file at the end of the flow Thanks for your help 🙂
    r

    Ruslan

    1 year ago
    how do second flow understand that first flow already processed first file?
    Jim Crist-Harif

    Jim Crist-Harif

    1 year ago
    Hi Luis, Prefect will currently attempt to run all 300 flows at the same time. Usually users running at this scale are using some kind of backing resource manager (e.g. kubernetes, ecs, etc...) which would ensure resources aren't overcommitted. For example, when running with Kubernetes, Prefect would submit 300 kubernetes jobs (one for each flow), and kubernetes would queue and run these jobs as resources become available. If you're running everything on a single large server, you'll currently need to manage resource contention yourself. If you're running with a flow per CSV, you might want to schedule these separately. Alternatively, you could make use of the of
    StartFlowRun
    and use a single "orchestrating" flow to run each flow in turn. If the orchestrating flow is run with a
    LocalDaskExecutor
    , only
    num_workers
    flows will be executed at a time. For example:
    start_flow_run = StartFlowRun("your-csv-flow", wait=True)
    
    with Flow("orchestrating-flow") as flow:
        # run_parameters takes a list of dicts of parameters to run your-csv-flow with
        run_parameters = Parameter("run_parameters")
        start_flow_run.map(parameters=run_parameters)
    
    flow.executor = LocalDaskExecutor(num_workers=8)  # only 8 flows will run concurrently
    See https://docs.prefect.io/core/idioms/flow-to-flow.html#scheduling-a-flow-of-flows for more information on using
    StartFlowRun
    .
    Alternatively, you could make a single big flow for handling all 300 csv files. In that case, the executor (either
    DaskExecutor
    or
    LocalDaskExecutor
    ) would handle manage your resources, ensuring that tasks only run when resources are free. Either pattern works, depends on how you intend to use these flows.
    Luis Gallegos

    Luis Gallegos

    1 year ago
    Hi @Ruslan Actually is only one flow with four tasks. I apologize if i dont explain my self very well. The flow receives a parameter that is the name of the csv file to load.
    r

    Ruslan

    1 year ago
    so you may launch same flow many times and they will work in parallel
    Luis Gallegos

    Luis Gallegos

    1 year ago
    Hi @Jim Crist-Harif I think that the StartFlowRun in the best solution for me right now. I will try it. However use a resource manager like Kubernetes sounds really good. I will discuss this with my team for maybe someday implement it. In the other hand i avoid use a big flow handling all 300 csv, because at the final task i delete earch of the .gz files, for free disk space. Thanks for your help.