Hi, I have a flow that contains a while True loop ...
# prefect-community
s
Hi, I have a flow that contains a while True loop and it starts to run on a schedule. However, I also want it to stop at some point. How should I do that? Is there a way to kill running flow runs? I'm talking about 2.0. Thanks!
1
I would expect something like a start_cron and end_cron property in the cronschedule.
Looks like killing the agent doesn't clean up the universal flow runs on the Linux box.
a
if you need a while loop, you are likely in the near real-time lane - in that case, I would move the while loop out of the actual flow and run the whole thing as a persistent service rather than a scheduled job:
Copy code
import requests
import time

# Import the Prefect flow module.
from prefect import flow


def get_real_time_data():
    data = requests.get("<http://127.0.0.1:8000/orders>").json()
    print(data)


# Decorate the main function with @flow.
@flow(name="Streaming Pipeline")
def main():
    get_real_time_data()


if __name__ == "__main__":
    while True:
        main()
        time.sleep(5)
this post discusses it in depth
Looks like killing the agent doesn't clean up the universal flow runs on the Linux box.
exactly, this may be problematic because data flows are not intended to be running forever and what you are likely looking for when using Prefect is more visibility into what happens within each of those runs - in that case treating the flow as the actual unit of observable business logic (with optional tasks to add even more visibility) and moving the while loop out of the flow may be beneficial and easier to troubleshoot
s
In my case I'm not so much interested in the individual runs but more on the state of the process as a whole. In my case I'd like to fetch the latest price from Apple for example. In these cases I'm not interested in the state of each run but more on whether I'm still actually fetching data. I'm also not interested in fetching data overnight as Apple is not traded then:) does that make sense?
As the state of each flow run needs to be persisted in the database I'm less eager to set it up as you mentioned in the blog post as it adds unnecessary overhead to what I'm interested in.
a
in that case, you can move the while loop into your flow but should you manually kill the run on the execution layer, atm it may still be displayed in the UI as running, we have an open item on the roadmap to allow marking such run as Finished/Cancelled/Failed from the UI
s
Thanks! Would be nice if that would be part of the orchestration layer. Currently, I feel that this is quite a blocker for us to move forward at the moment.
a
Do you mean marking a flow run state from the UI?
If so, this is on the immediate roadmap
s
Currently we use supervisord to manage our start/stop times. I'm not so concerned that my processes finish nicely and I would be fine if the orchestration layer is able to hard stop flow runs and put them on stopped state or something. Good to hear it's on the immediate roadmap:)
a
if the orchestration layer is able to hard stop flow runs
this is an extremely hard problem in the hybrid execution model as many processes can't be killed just via an API call - canceling work on a remote infrastructure is NOT on the immediate roadmap, only marking runs as Completed/Failed from the UI is
s
It seems that when you write stuff from scratch then it's easy to circumvent this missing feature. However just wrapping existing code into a flow where the existing code contains a while true loop doesn't really work.
a
can you elaborate?
s
In my case I have some class and somewhere deep in the class there is a while true if you need current data. In my case I need current data hence there is the while true. We have supervisord to start and stop the process and these commands are kicked off by a Crontab on some box. Now we want to improve on the setup and prefect looks like a good candidate. One of our challenges is getting insight into what processes are running and when and whether they crashed or not.
In case you write something new you can add the stoptime as a parameter in the flow and check each while loop.
a
I think your setup is a perfect example illustrating why Prefect 2.0 makes sense in use cases such as yours. You can entirely keep your setup, add a single flow decorator to your script as an entrypoint for Prefect observability and point your VM instance at your Orion or Cloud 2.0 PREFECT_API_URL and you are pretty much done - you can even keep the supervisor and CRON setup
main point here: you don't have to change your logic, setup and use things like deployments and work queues unless you really want to
the issue also is where you put this flow decorator - if you wrap some function that is called within the while loop with the flow decorator, you gain much more visibility than you would by wrapping the entire script with a while loop with
@flow
s
I agree on the last comment definitely. But in some cases that additional insight is not really needed (the balance of insight vs performance leans more to performance). Thinking about you other comment, I indeed think that would be a nice entry point. Would be nice if you will be able to create some orchestration layer that can replace the supervisord and crontab configs. (They're quite extensive and labour intensive to set up).
a
not sure whether you understood what I mean - the entire use of Prefect 2.0 could be just: • installing Prefect • pointing it at your Orion API • and adding a single flow decorator to your script this video explains more what I mean

https://www.youtube.com/watch?v=IeOcxXeQQ64

s
I understood :)
Great point indeed
👍 1
Was hoping to go all in at once but this is a good first step.
a
100% yup
s
There are some other use cases that other people are looking at (the map reduce implementation for example). Would I be able to apply the same technique in 1.2?
a
why would you want to use Prefect 1.2 at all?
you literally moved on to our next-gen product, why move back? 😄
s
We like Some consistency in setup. We're still figuring it out. I agree why use 1.2 indeed…
👍 1
Hi @Anna Geller , continuing on this thread. I see 6b is out which is great! Do you have any idea when stop times for while True flows are expected to become available?
a
I have no idea what you mean by stop times 🤣 could you explain? Not sure if this was the discussion I had with you or someone else but I would strongly recommend not running a never ending flow and instead using a while loop in some persistent service
s
Sorry about that:) let me elaborate.
When I have the following code
Copy code
@task
Def run_task():
    Print(‘hi’)

@flow
def my_flow()
    While True:
        Run_task()
I can start this using a deployment but it doesn't stop. What will be the preferred method for stopping this flow?
a
removing the flow run will also effectively stop it - you can do it from the UI or CLI:
Copy code
prefect flow-run delete UUID
s
And is that something that can be scheduled?
Supposing I want to run this between 10am and 8pm?
a
Perhaps add something like this?
Copy code
import pendulum

now = pendulum.now().time()
if pendulum.time(hour=10, minute=0) > now > pendulum.time(hour=20, minute=0):
    # your logic here
or schedule with RRule between those times every 10 seconds?
s
That would work indeed, but there is no schedule yet that allows start and stop I guess. These look a bit like workarounds. (Which is fine for now, but would be nice if they become part of the feature set I guess) :)
a
not really workarounds - running Prefect from some persistent service is a first-class feature we totally support - I wouldn't consider it as a hack
s
Yeah, but putting the schedule inside the flow run feels a bit weird.
And putting in an RR rule basically generates a series of batches and hence avoids the while True or am I missing something?
I was under the impression that you had some feature in the pipeline where the flow run stops generating new tasks after some stoptime was passed.
a
I don't think talking about solutions before clarifying the use case will bring us forward - can you share your exact use case? 1. Do you want some operation to run e.g. every 10 seconds? if so, then the RRule option would work. 2. Do you want it truly real-time but only running those continuously running operations between 10 and 8 pm? if so, then running it as a persistent service with pure Python logic (not schedule) checking for time to start and stop the operation-loop-cycle and pausing/sleeping if the time is not right to run the cycle
👍 1
please share as much as possible about your use case and I will respond later or tomorrow
s
Will do. Thanks!
Hi @Anna Geller, let me give it a try. Our business operates between 8h to 22h and during that time we need processes to run real-time. Outside these times we don't need that. I would like to be able to start these processes automatically and stop them as well. I'm not really concerned about the processes stopping at precisely 22h but I don't want to run them overnight. The 22h depends on DST and can adjust to 21h. Furthermore, I need to be able to monitor these processes and be informed when they go down/crash. Does this help?
a
It does help a lot! but your use case is hard since it requires real-time but still running sort of on schedule which are 2 conflicting goals
it might be that you can define a DeploymentSpec with RRule that runs the flow between 8 and 22 every day every 30 seconds - would that work for you? I think rrule should be able to handle that
and yes, with RRule you would no longer need a while loop