https://prefect.io logo
Title
d

Dzmitry Aliashkevich

02/08/2023, 3:24 PM
Hi everyone, question regarding putting flow runs to different agents, Prefect 1 (Prefect 2 is in future plans šŸ˜ž ) - see thread
:upvote: 1
āœ… 1
So we have flow and creating multiple flow runs. We would like to organise them in a way to utilise several Prefect agents. What is happening in reality is that one of agents just grabs all flow runs, and exhaust resources, while other are just sitting stale. Is there any better way than creating flow duplicates with different labels sets, to force runs to be distributed to different agents? Thanks
h

Haotian Li

02/08/2023, 4:07 PM
I also have the same problem, but more for heterogeneous hardware
g

George Coyne

02/08/2023, 7:22 PM
For prefect1 use labels!
k

Kyle McChesney

02/08/2023, 7:39 PM
I think if I am understanding correctly, its less of an issue of ā€œthis flow goes to this agentā€ and more of an issue of I have 100 flow runs, and I want to equally spread them out across multiple agents. Having to manually map this out with labels might be tricky, you’d need to implement some kind of check when a new flow run is added where you inspect your pool of agents, see which one is under utilized and set the label on the incoming flow run to match the agent
āž• 1
g

George Coyne

02/08/2023, 7:41 PM
There is unfortunately no concept of distribution in Prefect1
😿 1
d

Dzmitry Aliashkevich

02/09/2023, 7:38 AM
ok, for those who are facing it seems I’ve found less painfull approach: we can pass non-default labels to
create_flow_run
so, it allows to implement some form of round-robin, I’ve ended up with assigning additional unique labels for each agent, and when creating flow run - just randomly pick one of it and add to default labels list, something like:
@task()
def process_flow_batch():
    
    default_labels = ["ec2", ... ]

    agent_specific_labels = ["agent1", ... ]

    flow_update_batch = create_flow_run.run(
        flow_name="some_flow_name",
        project_name="some_project_name",
        labels=default_labels.append(agent_specific_labels[random.randint(0, len(agent_specific_labels) - 1)]
    )

    wait_for_flow_run.run(
        flow_update_batch
    )