https://prefect.io logo
Title
k

Kevin Jacobs

05/13/2022, 11:22 AM
Would it be possible to run a flow forever in Prefect Orion (and restart the flow when the flow fails)? This would be very helpful for streaming flows, such as a TwitterListener.
🙌 1
a

Anna Geller

05/13/2022, 11:37 AM
Yes, it would be, but it would make it hard to troubleshoot. It may be easier to run each stream as a separate flow run. I totally understand your problem and shared my approach here The basic idea is to run a while loop in a persistent service - exactly as you mentioned a never ending process, but instead of having a never ending flow run, you have a never ending process that triggers multiple flow runs:
@flow(name="Streaming Pipeline with Tasks")
def main():
    orders = get_real_time_data()
    load_to_dwh(orders)


if __name__ == "__main__":
    while True:
        main()
        time.sleep(5)
The benefit of this approach is that you get way more visibility and observability, which can be especially valuable when things go wrong.
🙌 1
k

Kevin Jacobs

05/13/2022, 12:10 PM
Thank you for your explanation, that makes sense 🙂; every streaming job could be modelled as infinite batched jobs
a

Anna Geller

05/13/2022, 12:14 PM
sort of yes - you can see it more as micro-batches really? if you want to, this
time.sleep()
could be milliseconds if needed. The main idea is to get more visibility into which streams succeeded and which didn't