https://prefect.io logo
c

Cody Webb

08/22/2023, 6:00 PM
Copy code
arg_chunks = [
            args_list[i : i + chunk_size]  # noqa
            for i in range(0, len(args_list), chunk_size)
        ]
        args_lst = [
                    (args[0], args[1])
                    for chunk in arg_chunks
                    for args in chunk
                    ]
        # since 100 args / 10 workers, my agent will spawn 10 worker sub-flows
        <http://logger.info|logger.info>(f"Spawning {len(arg_chunks)} worker flows...")
        await asyncio.gather(
            *[
                run_deployment(
                    # returns a FlowRun object
                    timeout=0,  # allows non blocking
                    name=deploy_id,
                    parameters={
                                "project": project,
                                "arg_1": args[0],
                                "arg_2": args[1]
                                },
                )
                for args in args_lst
            ]
        )

@flow(name="dask-flow", task_runner=DaskTaskRunner(address="<tcp://dask-scheduler:8786>"), persist_result=True, result_storage=LocalFileSystem.load("local-storage"))
async def gpu_computation(project, arg_1,arg_2):
     .....
     .....
     project["col"] = gpu_compute(arg_1,arg_2)
     project.save(...)
     -----
so i have this dask flow which creates n workers based on how many chunks there are, and the flow is a long running task, that runs the args and does some gpu computation and then saves a file, the problem i am having is it is cancelling the tasks as some are long running, i dont want them to cancel, just to wait, or perhaps i should just have jobs start for first chunk and then have second chunk wait for completion of first? i dont know the best way to do this ? thoughts?
n

Nate

08/22/2023, 6:11 PM
hi @Cody Webb
it is cancelling the tasks as some are long running
what is cancelling the tasks?
c

Cody Webb

08/22/2023, 6:16 PM
one sec running again to get the logs i ran at work and then power went out so i just got home lol
n

Nate

08/22/2023, 6:16 PM
oh jeez! 👍 no rush
c

Cody Webb

08/22/2023, 6:33 PM
okay maybe it was because i was on battery, now it seems lke one will successfully complete but then most others error out ? is there way to stagger these out or only chunk them or am i missing something? im thinking that im overloading the api im hitting that does the gpu computation or something?
like in this run i ran 40 chunks in batches of 10 , (4 workers), and only one result was saved
basically itll run 10-15 at a time successfully and then the rest seem to crash, so i think somewhere its basically ddosing it, i dont have observability into the fast api layer endpoint im hitting for gpu compute
n

Nate

08/22/2023, 6:42 PM
most others error out
do you have the trace for one that errored out?
is there way to stagger these out or only chunk them or am i missing something?
im not sure i understand, but you can use concurrency limits to enforce max parallelism
im thinking that im overloading the api
are you running against a local server or cloud? or ephemerally its worth mentioning that
run_deployment
is not a task, so the DaskTaskRunner wont change the execution of
run_deployment
calls at all, unless you're calling it from a task - hard to tell from above where you're awaiting your run_deployment calls
c

Cody Webb

08/22/2023, 6:42 PM
im running it inside a flow^
um locally now but wil be on cloud eventually,
n

Nate

08/22/2023, 6:44 PM
gotcha, so you could be running into trouble running lots of stuff against the ephemeral api (i.e. no dedicated API, just the one that we spin up if you're not talking to a server/cloud) and task_runners dont do anything to subflow execution, what is your expectation with the DaskTaskRunner here?
c

Cody Webb

08/22/2023, 6:44 PM
theres really no trace outside of the it saying ERROR : FAILED no reason as to why, and looking in the gpu service isnt saying much of anything either my intuition is what i was saying, as if i do a job of just 10 or so itll run completely fine, but anything more it dies, so was wondering if there a better approach here
n

Nate

08/22/2023, 6:45 PM
yep i wrote that recipe 🙂
c

Cody Webb

08/22/2023, 6:46 PM
ha nice, yea its cool , basically each task doesnt depend on any of the others so was trying to do them concurrently
so from single flow to get args -> many tasks
n

Nate

08/22/2023, 6:47 PM
that makes sense! hmm
looking in the gpu service
as a sanity check, do things work as expected if you remove dask and your GPU service from the situation?
c

Cody Webb

08/22/2023, 6:47 PM
yea
im basically hitting a gpu service with fast api endpoints , and the gpu obv the bottleneck here and does a couple stages of model inference
eventually there will be multi gpus in prod here^ but just developing and stuff i only have my laptop
n

Nate

08/22/2023, 6:53 PM
oh wait, is the
gpu_computation
flow the one being kicked off with
run_deployment
?
c

Cody Webb

08/22/2023, 6:54 PM
ya exactly
n

Nate

08/22/2023, 6:54 PM
so len_chunks flow runs are all pointing at the same dask scheduler ?
c

Cody Webb

08/22/2023, 6:54 PM
the run_deployment kicks off the run which i send the args to the gpu to do the inference on
yep
going to try with concurrency limits of around 10 on the work pool here will report back
Copy code
prefect-server            | 19:12:47.430 | WARNING | prefect.server.services.cancellationcleanup - CancellationCleanup took 29.649518 seconds to run, which is longer than its loop interval of 20.0 seconds.
okay so i did a 45 run , 15 concurrency limit, would it make sense to do multiple agents with low concurency work pools?
out of 45 only 2 have successfully completed insofar 🫠
n

Nate

08/22/2023, 7:24 PM
im not sure how you're using tasks in the
gpu_computation
flow, but i suspect that the weirdness is related to the dask task runner somehow - where / how are you using dask in there?
c

Cody Webb

08/22/2023, 7:25 PM
thats all im using it for is to dispatch the tasks, its all basically just api code, hitting different endpoints of an api
n

Nate

08/22/2023, 7:25 PM
just as a datapoint, did you try without the dask task runner?
c

Cody Webb

08/22/2023, 7:26 PM
no but i will, what one do you rec? just sequential?
n

Nate

08/22/2023, 7:26 PM
id just rm
task_runner
kwarg and let it use the default
ConcurrentTaskRunner
c

Cody Webb

08/22/2023, 7:26 PM
okay
oh i should say , i switched out a threadpoolexecutor for this flow with prefect and that was working fine
if that helps diagnose
thank you 1
(i wanted more observability and prefect is super sweet 🙂 )
1
yea still getting it hmm , lemme try to do sequential tasks instead of async
or something i dont know
i as trying to speed this up a bit by using concurrency but seems like theres some bottlenecks on how many tasks/requests the gpu backend/endpoints can handle concurrently, if i do sequential itll work but take a lot longer
plus then if multiple users using at once, will just go even slower 🙂
ope, concurreny limit of 10 flow runs and concurrent runner worked
n

Nate

08/22/2023, 8:28 PM
nice!
c

Cody Webb

08/22/2023, 8:29 PM
okay so, in that case, should i add another agent/pool if i want to go faster?
or like multiple users using etc?
n

Nate

08/22/2023, 8:31 PM
but seems like theres some bottlenecks on how many tasks/requests the gpu backend/endpoints can handle concurrently
i suppose that depends, is your gpu service still the bottleneck? if it were, im not sure extra worker/pools would help
c

Cody Webb

08/22/2023, 8:31 PM
kinda like horiz. scaling?
n

Nate

08/22/2023, 8:31 PM
i might be misunderstanding
c

Cody Webb

08/22/2023, 8:31 PM
i think the concurrent requests are the bottleneck
i might be wrong and it could be the single gpu,
but i think its the amount of requests hitting it at once
c

Cody Webb

08/22/2023, 8:53 PM
(Still have like half of the gpu mem)