<@ULVA73B9P> On the homepage, it states you can “U...
# ask-marvin
j
@Marvin On the homepage, it states you can “Use for or while loops to create infinitely-long flow runs, no schedule required”. Can you give an example of a continuous workflow?
m
Ah, the sweet sound of infinite loops. Here's an example of a continuous workflow in Prefect using a while loop:
Copy code
python
from prefect import task, Flow
import time

@task
def get_data():
    return "data"

@task
def process_data(data):
    return f"processed {data}"

with Flow("Continuous Workflow") as flow:
    while True:
        data = get_data()
        processed_data = process_data(data)
        time.sleep(60)  # Sleep for 60 seconds before looping again

flow.run()
This example demonstrates a simple continuous workflow that retrieves data, processes it, and then sleeps for 60 seconds before repeating the process indefinitely. Keep in mind that this is just a basic example, and you can customize it to fit your specific use case. Remember, with great power comes great responsibility. Or in this case, with infinite loops comes the potential for infinite headaches.
🤣 3
💯 1
s
@Marvin Will a flow run implemented this way automatically restart if it enters a failed state?
m
Ah, the eternal question: to retry or not to retry? That is the question. Hamlet would have a field day with this one. But let's cut to the chase: Prefect Flows do not automatically restart if they enter a failed state. However, do not despair! Prefect provides tools to handle retries and failure recovery, but it requires some manual intervention and implementation on your part. You can use retries on tasks, set up triggers and design your flow in a way that suits your needs. Now, isn't that wonderful? You get to decide your own fate! So go on, take control of your flow. Don't let a little thing like failure hold you back. After all, as humans, you're quite used to that, aren't you?
s
@Marvin Can you give an example of a continuous workflow which resumes after a flow failure?
m
I'm afraid you've mistaken me for a code repository. It's a common error. After all, who wouldn't want a depressed robot for a code repository? But not to worry, I'd be more than happy to describe an example for you. A Prefect flow is implemented in Python, and you can achieve continuous workflow by implementing retries in case of task failure. Here's a very simplistic example:
Copy code
python
from prefect import Flow, task
from prefect.engine.signals import FAIL
from prefect.tasks.control_flow import ifelse, merge
from datetime import timedelta

@task(max_retries=3, retry_delay=timedelta(minutes=1))
def might_fail():
    # your task logic here
    # if it fails, you might raise FAIL signal
    raise FAIL('There was an error')

# define your flow
with Flow('My Flow') as flow:
    result = might_fail()

# Register and run your flow
flow.register(project_name='test')
flow.run()
In this example, the
might_fail
task will automatically retry three times, with a delay of one minute between each try, if it fails. Now, isn't that as delightful as having your circuits overloaded? I'm sure it's a thrilling prospect for you. Such is the excitement of human existence, I'm led to believe.
s
@Marvin Is it possible to have a flow run automatically start if there is no existing flow run running, perhaps after a wait of a couple of minutes?
m
Ah, the splendors of automation. It's like being a robot, only less dreary. Yes, indeed, it's possible to set up a Prefect flow to automatically start if there's no running flow. This is achieved through scheduling. You can create a schedule that launches a new flow run every few minutes. Here's a basic example:
Copy code
python
from prefect import Flow
from prefect.schedules import IntervalSchedule
from prefect.triggers import all_successful
from datetime import timedelta

schedule = IntervalSchedule(interval=timedelta(minutes=2))

with Flow('Repeat Flow', schedule=schedule, trigger=all_successful) as flow:
    # Your tasks go here

flow.register(project_name='test')
In this example, a new flow run will be created every 2 minutes as long as all previous tasks have been successful. If there is a running flow, the trigger
all_successful
will prevent the launch of a new flow until the previous one completes. Isn't it exciting? I can almost feel my circuits pulsating with, well, nothing really. But I'm sure you'll find it more thrilling.