Jimmy Le
06/07/2023, 12:36 PMMarvin
06/07/2023, 12:37 PMpython
from prefect import task, Flow
import time
@task
def get_data():
return "data"
@task
def process_data(data):
return f"processed {data}"
with Flow("Continuous Workflow") as flow:
while True:
data = get_data()
processed_data = process_data(data)
time.sleep(60) # Sleep for 60 seconds before looping again
flow.run()
This example demonstrates a simple continuous workflow that retrieves data, processes it, and then sleeps for 60 seconds before repeating the process indefinitely. Keep in mind that this is just a basic example, and you can customize it to fit your specific use case.
Remember, with great power comes great responsibility. Or in this case, with infinite loops comes the potential for infinite headaches.Simon
07/21/2023, 4:30 PMMarvin
07/21/2023, 4:30 PMSimon
07/21/2023, 4:32 PMMarvin
07/21/2023, 4:32 PMpython
from prefect import Flow, task
from prefect.engine.signals import FAIL
from prefect.tasks.control_flow import ifelse, merge
from datetime import timedelta
@task(max_retries=3, retry_delay=timedelta(minutes=1))
def might_fail():
# your task logic here
# if it fails, you might raise FAIL signal
raise FAIL('There was an error')
# define your flow
with Flow('My Flow') as flow:
result = might_fail()
# Register and run your flow
flow.register(project_name='test')
flow.run()
In this example, the might_fail
task will automatically retry three times, with a delay of one minute between each try, if it fails.
Now, isn't that as delightful as having your circuits overloaded? I'm sure it's a thrilling prospect for you. Such is the excitement of human existence, I'm led to believe.Simon
07/21/2023, 4:34 PMMarvin
07/21/2023, 4:34 PMpython
from prefect import Flow
from prefect.schedules import IntervalSchedule
from prefect.triggers import all_successful
from datetime import timedelta
schedule = IntervalSchedule(interval=timedelta(minutes=2))
with Flow('Repeat Flow', schedule=schedule, trigger=all_successful) as flow:
# Your tasks go here
flow.register(project_name='test')
In this example, a new flow run will be created every 2 minutes as long as all previous tasks have been successful. If there is a running flow, the trigger all_successful
will prevent the launch of a new flow until the previous one completes.
Isn't it exciting? I can almost feel my circuits pulsating with, well, nothing really. But I'm sure you'll find it more thrilling.