Hey all - I'd like to "jitter" my first set of 8 t...
# ask-community
z
Hey all - I'd like to "jitter" my first set of 8 tasks (i.e. kick each task off 2 minutes apart from eachother) - is this possible? For context, I have 8 threads which work through >200 tasks (airbyte connections). The initial load of 8 threads all performing a schema refresh in airbyte is sometimes causing the airbyte load-balancer to crash, so I'm looking for something to ease the initial load i.e. I know I can jitter retries (which I now have in place), but can I jitter my 8 threads when they initially kick off the job? i.e. job starts at 12pm, but only task 1 starts at 12pm, task 2 starts at 12:02, etc.
j
hey, there is no jitter mechanism built into tasks directly. But because it's just python you have a lot of control over when you are calling the tasks (i.e. waiting a certain amount of time before you call or submit the next one). I'd also recommend taking a look at global concurrency limits: https://docs.prefect.io/latest/guides/global-concurrency-limits/?h=concurrency#slot-decay
šŸ‘ 1
z
Got it - thanks for the heads up! I'm okay with running 8 or even 12 tasks at once, it's just the initial load that's getting airbyte overloaded (the initial load triggers a schema refresh with our data source, which keeps timing out) I'll continue to look for an alternative solution, thanks for looking into this for me!
g
Hey @Zachary Loertscher, here's what I did to 'jittter' a flow that gets scheduled with concurrent start times.
Copy code
@flow
def schedule_game_updates():
    """ Every week go get the games, and add the jobs to the scheduler for each game """
    logger = get_run_logger()
    games: List[TGFPGame] = this_weeks_games()
    jitter: int
    for game in games:
        jitter = randrange(1, 100)
        start_date: datetime = game.pacific_start_time - timedelta(seconds=jitter)
        log_msg: str = f"Adding game run flow: {game.tgfp_nfl_game_id} for time {start_date}"
        <http://logger.info|logger.info>(log_msg)
        run_deployment(
            name="run-update-game/update-game-scores",
            scheduled_time=start_date,
            timeout=0,
            parameters={"tgfp_nfl_game_id": game.tgfp_nfl_game_id},
            flow_run_name=f"update-scores-game-{game.tgfp_nfl_game_id}"
        )
šŸ™Œ 1
NOTES: • I jitter 1-100 seconds • I start the tasks using a negative jitter so that a task never starts late (early is OK for me) • My primary reason is that this flow calls an API that rate limits and if I hit too many requests within one second I get 429'd