https://prefect.io logo
d

Darren Liu

01/25/2023, 3:05 AM
Hi prefectionists, first of all great work on a promising product! I am looking for a solution rec for this use case. Suppose there are events that starts at unknown times. Once it is detected that an event has started a flow is looped with some persistent state until it's detected that the event has ended. There can be multiple events happening at once, so there can be multiple loops but only as many as there are events. Therefore, only one loop for each event, as there are no safeguards for multiple loops working on the same event with the same state and inserting identical data. Normally, I would have a repeating event monitoring job that detects the start of events, publish the job with initial state into a work queue, and job consumers that carry out the job, and re-publish into the queue once completed with latest state so the next iteration can begin. The same job can detect end of event and complete without publishing back into the queue. What would be the ideal setup using prefect? thanks in advance for reading the long rant!
p

Peyton Runyan

01/25/2023, 3:30 PM
Do you have a somewhat representative toy example of what the flow might be doing and why it's looping?
d

Darren Liu

01/26/2023, 1:52 AM
Yes, it's a sporting event, the latest data is available via API call. The loop is to "keep checking" the latest data. State is needed to check if the latest data is new, if new, insert new data and update state. Regardless, keep checking until the event is over.
p

Peyton Runyan

01/27/2023, 1:09 AM
That’s very helpful! There’s a few ways to roll something like this in prefect. I’ll throw something together tomorrow and drop a couple links too
💯 1
If you're not familiar with prefect, this might dump a whole bunch of concepts on you at once. If you use prefect cloud, you can use automations to essentially keep your exact logic. You can have a worker of some sort, either a long-running flow or a process independent of prefect, that queries for new games. When it finds those games, it can kick off a flow run using the prefect
Copy code
client.create_flow_run_from_deployment(
  game_deployment,
  parameters = {
    'game_id': <id>,
    'state': None
  }
)
You would then have your flow, which would take the parameters of
gamde_id
and
state
. At the end of the flow, it would hit the automations API to schedule a new deployment with the state
Copy code
@flow
def my_logic(game_id: UUID, state: dict):
  info = get_info()
  if info == state["info"]:
    return cleanup()
  
  state["info"] = processing_logic()
  run_automation(id=id, state=state)
We currently don't have client methods for creating automations, so you would have to wrap an API call, but it's documented here, and you can get additional help here in the slack. https://docs.prefect.io/ui/automations/#create-an-automation
Another option, is to keep the master process/looping flow, but have then have the logic for monitoring game within a single flow. The master process still queries for games
Copy code
games = get_games():
for game in games():
  client.create_flow_run_from_deployment(
    long_running_game_deployment,
    parameters = {
      'game_id': <id>,
      `pool_interval': <some_int>,
    }
  )
But then instead of having a flow that kicks off another flow each loop, you just have that flow loop for the duration of the game
Copy code
@flow
async def game_loop(game_id: UUID, poll_interval: int):

  while True:
    game_info = get_game_info(game_id)
    if is_finished(game_info):
      break()
    perform_logic()
    asyncio.sleep(poll_interval)
  
  cleanup()
d

Darren Liu

01/31/2023, 7:32 AM
Excellent options, exactly what I was looking for. I'm leaning towards option B, seems to offer more poll interval granularity. I'll be attempting implementation coming week, thanks @Peyton Runyan
👍 1