``` arg_chunks = [ args_list[i ...
# ask-community
c
Copy code
arg_chunks = [
            args_list[i : i + chunk_size]  # noqa
            for i in range(0, len(args_list), chunk_size)
        ]
        args_lst = [
                    (args[0], args[1])
                    for chunk in arg_chunks
                    for args in chunk
                    ]
        # since 100 args / 10 workers, my agent will spawn 10 worker sub-flows
        <http://logger.info|logger.info>(f"Spawning {len(arg_chunks)} worker flows...")
        await asyncio.gather(
            *[
                run_deployment(
                    # returns a FlowRun object
                    timeout=0,  # allows non blocking
                    name=deploy_id,
                    parameters={
                                "project": project,
                                "arg_1": args[0],
                                "arg_2": args[1]
                                },
                )
                for args in args_lst
            ]
        )

@flow(name="dask-flow", task_runner=DaskTaskRunner(address="<tcp://dask-scheduler:8786>"), persist_result=True, result_storage=LocalFileSystem.load("local-storage"))
async def gpu_computation(project, arg_1,arg_2):
     .....
     .....
     project["col"] = gpu_compute(arg_1,arg_2)
     project.save(...)
     -----
so i have this dask flow which creates n workers based on how many chunks there are, and the flow is a long running task, that runs the args and does some gpu computation and then saves a file, the problem i am having is it is cancelling the tasks as some are long running, i dont want them to cancel, just to wait, or perhaps i should just have jobs start for first chunk and then have second chunk wait for completion of first? i dont know the best way to do this ? thoughts?
n
hi @Cody Webb
it is cancelling the tasks as some are long running
what is cancelling the tasks?
c
one sec running again to get the logs i ran at work and then power went out so i just got home lol
n
oh jeez! 👍 no rush
c
okay maybe it was because i was on battery, now it seems lke one will successfully complete but then most others error out ? is there way to stagger these out or only chunk them or am i missing something? im thinking that im overloading the api im hitting that does the gpu computation or something?
like in this run i ran 40 chunks in batches of 10 , (4 workers), and only one result was saved
basically itll run 10-15 at a time successfully and then the rest seem to crash, so i think somewhere its basically ddosing it, i dont have observability into the fast api layer endpoint im hitting for gpu compute
n
most others error out
do you have the trace for one that errored out?
is there way to stagger these out or only chunk them or am i missing something?
im not sure i understand, but you can use concurrency limits to enforce max parallelism
im thinking that im overloading the api
are you running against a local server or cloud? or ephemerally its worth mentioning that
run_deployment
is not a task, so the DaskTaskRunner wont change the execution of
run_deployment
calls at all, unless you're calling it from a task - hard to tell from above where you're awaiting your run_deployment calls
c
im running it inside a flow^
um locally now but wil be on cloud eventually,
n
gotcha, so you could be running into trouble running lots of stuff against the ephemeral api (i.e. no dedicated API, just the one that we spin up if you're not talking to a server/cloud) and task_runners dont do anything to subflow execution, what is your expectation with the DaskTaskRunner here?
c
theres really no trace outside of the it saying ERROR : FAILED no reason as to why, and looking in the gpu service isnt saying much of anything either my intuition is what i was saying, as if i do a job of just 10 or so itll run completely fine, but anything more it dies, so was wondering if there a better approach here
n
yep i wrote that recipe 🙂
c
ha nice, yea its cool , basically each task doesnt depend on any of the others so was trying to do them concurrently
so from single flow to get args -> many tasks
n
that makes sense! hmm
looking in the gpu service
as a sanity check, do things work as expected if you remove dask and your GPU service from the situation?
c
yea
im basically hitting a gpu service with fast api endpoints , and the gpu obv the bottleneck here and does a couple stages of model inference
eventually there will be multi gpus in prod here^ but just developing and stuff i only have my laptop
n
oh wait, is the
gpu_computation
flow the one being kicked off with
run_deployment
?
c
ya exactly
n
so len_chunks flow runs are all pointing at the same dask scheduler ?
c
the run_deployment kicks off the run which i send the args to the gpu to do the inference on
yep
going to try with concurrency limits of around 10 on the work pool will report back
Copy code
prefect-server            | 19:12:47.430 | WARNING | prefect.server.services.cancellationcleanup - CancellationCleanup took 29.649518 seconds to run, which is longer than its loop interval of 20.0 seconds.
okay so i did a 45 run , 15 concurrency limit, would it make sense to do multiple agents with low concurency work pools?
out of 45 only 2 have successfully completed insofar 🫠
n
im not sure how you're using tasks in the
gpu_computation
flow, but i suspect that the weirdness is related to the dask task runner somehow - where / how are you using dask in there?
c
thats all im using it for is to dispatch the tasks, its all basically just api code, hitting different endpoints of an api
n
just as a datapoint, did you try without the dask task runner?
c
no but i will, what one do you rec? just sequential?
n
id just rm
task_runner
kwarg and let it use the default
ConcurrentTaskRunner
c
okay
oh i should say , i switched out a threadpoolexecutor for this flow with prefect and that was working fine
if that helps diagnose
thank you 1
(i wanted more observability and prefect is super sweet 🙂 )
1
yea still getting it hmm , lemme try to do sequential tasks instead of async
or something i dont know
i as trying to speed this up a bit by using concurrency but seems like theres some bottlenecks on how many tasks/requests the gpu backend/endpoints can handle concurrently, if i do sequential itll work but take a lot longer
plus then if multiple users using at once, will just go even slower 🙂
ope, concurreny limit of 10 flow runs and concurrent runner worked
n
nice!
c
okay so, in that case, should i add another agent/pool if i want to go faster?
or like multiple users using etc?
n
but seems like theres some bottlenecks on how many tasks/requests the gpu backend/endpoints can handle concurrently
i suppose that depends, is your gpu service still the bottleneck? if it were, im not sure extra worker/pools would help
c
kinda like horiz. scaling?
n
i might be misunderstanding
c
i think the concurrent requests are the bottleneck
i might be wrong and it could be the single gpu,
but i think its the amount of requests hitting it at once
c
(Still have like half of the gpu mem)