Hi everyone :slightly_smiling_face: we're playing ...
# ask-community
l
Hi everyone 🙂 we're playing with Adjustments in Schedules (https://docs.prefect.io/api/latest/schedules/adjustments.html#functions). Our goal is simple - implement a simple jitter for flow execution (we would like to spread the flow start times somewhat randomly in time in order to make the work distribution more even). The documentation does not explicitly state that you cannot create your own functions. Yet, you list them out here - https://github.com/PrefectHQ/prefect/blob/8807f335fa1385275d9dd28faa857e5fd22536b7/src/prefect/serialization/schedule.py#L28. Is there a way to easily "plug in" a custom function? Honestly, we're more than happy to contribute this function to the codebase, if the community finds it useful - so that's also an option for us. Any help or feedback is appreciated 🙂 thank you 🙂
a
Hi @Lukáš Polák, we are definitely open to contributions and if you’re interested, perhaps this documentation may be helpful. You could probably start by opening a Github issue with your example implementation so that we can discuss what are the best ways to solve the problem. I could ask the team what would be the best place for such functions, but this module seems the right one. I’m not 100% sure if schedule is the right place to decide where the execution should take place. There are many ways to distribute work across compute instances without tying the execution decisions to the schedule: • KubernetesAgent and autoscaling, • Serverless compute platforms, e.g. if you use AWS: leveraging Fargate either on ECS (
ECSAgent
) or on EKS (
KubernetesAgent
), • offloading the execution to a remote Dask cluster, allowing the
DaskExecutor
to make decisions about how to compute the tasks and distribute their execution across workers. So if the problem is distributing work, there are many ways to tackle this problem without adjusting the schedule. Can you give us more information how do you run your flows - which agent, executor and storage do you use?
k
Hey @Lukáš Polák, is the jitter related to ECS by chance?
You can do it in the Flow state handler too, unless it’s the agent struggling to pick up all these jobs.
l
Hi. so our setup is basic - we run the tasks in a remote dask cluster via local agent. the need is rather simple - we schedule multiple Flows via a single API call. All these Flows hit the same API. ideally, we would like to spread their execution randomly in order to lower the pressure on that API. When thinking about where to put it, we considered: • adjusting local agent - to postpone the start of the Flow by random number of seconds) it could work, but it would be a bit confusing for anybody looking at Prefect dashboard (scheduled jobs would be stuck in "pending" state for no apparent reason). • adjusting dask scheduler - a bit tricky to postpone only the start of the Flow. We would probably postpone every task by random number of seconds which is something, we dont. That's why those Adjustments seemed like a great tool for the job. @Anna Geller you correctly pointed out that spot. We in fact looked at that part of the codebase and considered adding the code there. I'll see if I find some time this week and test it locally (will let you know). @Kevin Kho could you please elaborate on the idea of updating the state handler? If I understand it correctly, you suggest adding some kind of sleep(rand()) to the handler and run it before every flow is triggered. It would cause the Flow to be stuck in "pending" state for a random number of seconds. right?
k
That is right. The flow will be submitted already. It’s just executing those stuff at the start of the Flow Run in the state handler.
sleep(rand())
would then spread out those API calls.
a
@Lukáš Polák when you say that you schedule multiple Flows via a single API call, which API are you referring to? Do you know that you can schedule flows directly using Prefect Cloud or Prefect Server? (just checking to ensure we have the same understanding)
l
@Anna Geller we have our API that talks to Prefect Server using your library. basically, we schedule multiple different data integrations at once
a
What if you would attach a different schedule to each of those instead of triggering those via API at once? Perhaps this would solve your issue? e.g. for jobs that have no dependencies between each other: job A runs at 1 AM, job B runs at 2 AM, … etc.
l
we actually do attach different schedules to them. that was one of the ideas to add a jitter to
start_date
of the IntervalClock. Currently, we're testing the approach with having the delay in state handlers. I'll let you know, if that's a viable option. one of the down sides I see already is that the agent "works" with the flow for longer - we use local agent so we create separate processes for each flow. having each of them "sleeping" for several minutes takes a toll on the memory footprint of the agent 😕
a
@Lukáš Polák did you check the current memory footprint? Overall, agent is a lightweight process, so your solution may work just fine. You described that the actual execution takes place on a remote Dask cluster, not on the agent. The agent is really only polling the API for work and triggering flows, and Dask is executing task runs and communicating states with the API.
l
I'm just writing a separate thread in that regards and will be diving into that issue on Monday. we noticed that our agent's memory consumption spikes from 150MB to 1.5GB when under load of several Flows (even without this delay mechanism).
👍 1