Hi, Im wondering if it’s possible to run multiple flows using only prefect core (and not running pre...
t
Hi, Im wondering if it’s possible to run multiple flows using only prefect core (and not running prefect-server) ?
a
It’s possible to run multiple flows, but you can’t schedule or deploy your flow using only Prefect Core. Core is only for local development, so if you want to see the UI or register your flow to run daily on some remote cluster, you need either Prefect Cloud or Server. Note that in Prefect 2.0 this will be different - Orion allows you to run everything locally (incl. building deployments) and see the UI.
k
I think you can’t if you are talking about concurrent flows. Since there is no agent, you need to spin up another Python process. It would be easier to use the backend
t
Thank you @Anna Geller @Kevin Kho ! Yes I was talking about concurrent flows. Guess I will need to use the backend then. I also would like to run my flows continuously by running the flow x mins after the previous run finished (instead of starting flow run at a fixed interval using IntervalSchedule). Can you please point me to the right direction on how to do this ?
k
You the
create_flow_run
task and schedule a new
scheduled_start_time
t
@Kevin Kho im not sure if I’m fully understand. Do I use
create_flow_run
within another flow to create the main flow run, or do I use
create_flow_run
within the flow (at the end) in order to call itself to run again at the next scheduled time (not sure if this is possible)?
Basically what im trying to achieve here is similar to running a while loop that performs all the tasks, with time.sleep() at the end of the loop.
a
You could do that using a flow-level handler as follows:
Copy code
from prefect import task, Flow
from prefect.tasks.prefect import create_flow_run
import time


@task(log_stdout=True)
def hello_world():
    print("Sleeping...")
    time.sleep(4)  # to have enough time to kill it
    return "hello world"


def never_ending_state_handler(obj, old_state, new_state):
    if new_state.is_successful():
        time.sleep(60) # as much as you want to have pause in between
        create_flow_run.run(flow_name="never-ending-flow", project_name="community")
    return new_state


with Flow("never-ending-flow", state_handlers=[never_ending_state_handler]) as flow:
    hello_task = hello_world()
What Kevin suggested was to do the same as above but instead of doing time.sleep() in between flow runs, simply add a timedelta to the scheduled_start_time as shown here:
Copy code
from prefect import task, Flow
from prefect.tasks.prefect import create_flow_run
from datetime import timedelta
import time


@task(log_stdout=True)
def hello_world():
    print("Sleeping...")
    time.sleep(4)  # to have enough time to kill it
    return "hello world"


def never_ending_state_handler(obj, old_state, new_state):
    if new_state.is_successful():
        create_flow_run.run(
            flow_name="never-ending-flow",
            project_name="community",
            scheduled_start_time=timedelta(minutes=2),
        )
    return new_state


with Flow("never-ending-flow", state_handlers=[never_ending_state_handler]) as flow:
    hello_task = hello_world()
t
This is perfect ! Thank you so much @Anna Geller
👍 1
1111 Views