https://prefect.io logo
l

Luis Gallegos

01/20/2021, 6:22 PM
Hi there, I have a few months with prefect. Can yo help me with this question?: I have the following flow. 1.- 300 csv (with different name and size) -> gzip -> EXASOL(DW) -> Delete (csv, gz) 2.- A flow that proccess 1 csv per run My question is how handle Prefect the flows and server resources if i run al csv at the same time. That's means run 300 flow's at the same time. (Does some kind of smart upload so don't collapse the server?) Enviroment info: # Server 24 Cores - 96GB RAM # Single Node # Total size csv (150GB aprox) # Prefect Core 0.14.0 # Docker Prefect core-0.14.0 # Executor LocalDaskExecutor (You can give some advice with DaskExecutor too) Note: I avoid pass a list of the csv in each task because disk space. Notice that i delete the file at the end of the flow Thanks for your help 🙂
r

Ruslan

01/20/2021, 6:33 PM
how do second flow understand that first flow already processed first file?
j

Jim Crist-Harif

01/20/2021, 6:39 PM
Hi Luis, Prefect will currently attempt to run all 300 flows at the same time. Usually users running at this scale are using some kind of backing resource manager (e.g. kubernetes, ecs, etc...) which would ensure resources aren't overcommitted. For example, when running with Kubernetes, Prefect would submit 300 kubernetes jobs (one for each flow), and kubernetes would queue and run these jobs as resources become available. If you're running everything on a single large server, you'll currently need to manage resource contention yourself. If you're running with a flow per CSV, you might want to schedule these separately. Alternatively, you could make use of the of
StartFlowRun
and use a single "orchestrating" flow to run each flow in turn. If the orchestrating flow is run with a
LocalDaskExecutor
, only
num_workers
flows will be executed at a time. For example:
Copy code
start_flow_run = StartFlowRun("your-csv-flow", wait=True)

with Flow("orchestrating-flow") as flow:
    # run_parameters takes a list of dicts of parameters to run your-csv-flow with
    run_parameters = Parameter("run_parameters")
    start_flow_run.map(parameters=run_parameters)

flow.executor = LocalDaskExecutor(num_workers=8)  # only 8 flows will run concurrently
See https://docs.prefect.io/core/idioms/flow-to-flow.html#scheduling-a-flow-of-flows for more information on using
StartFlowRun
.
Alternatively, you could make a single big flow for handling all 300 csv files. In that case, the executor (either
DaskExecutor
or
LocalDaskExecutor
) would handle manage your resources, ensuring that tasks only run when resources are free. Either pattern works, depends on how you intend to use these flows.
l

Luis Gallegos

01/20/2021, 7:01 PM
Hi @Ruslan Actually is only one flow with four tasks. I apologize if i dont explain my self very well. The flow receives a parameter that is the name of the csv file to load.
r

Ruslan

01/20/2021, 7:04 PM
so you may launch same flow many times and they will work in parallel
l

Luis Gallegos

01/20/2021, 7:09 PM
Hi @Jim Crist-Harif I think that the StartFlowRun in the best solution for me right now. I will try it. However use a resource manager like Kubernetes sounds really good. I will discuss this with my team for maybe someday implement it. In the other hand i avoid use a big flow handling all 300 csv, because at the final task i delete earch of the .gz files, for free disk space. Thanks for your help.
3 Views