How do I create and run flows on a schedule in Pre...
# ask-community
c
How do I create and run flows on a schedule in Prefect Orion? Let’s say I have this:
Copy code
schedule = IntervalSchedule(interval=datetime.timedelta(seconds=10))
deployment_spec = DeploymentSpec(name="hola", flow=greetings_flow, schedule=schedule)
Do I now use OrionClient and some methods to deploy? We currently use Prefect core in a pod and simply do flow.run() with a schedule attached, and I’m looking for a working example to do the equivalent in Orion (even if I gather behind the scenes it will do something else via a server and DB). Basically I’m looking for this: https://orion-docs.prefect.io/concepts/deployments/#running-deployments-with-the-api 🙂
z
Hey @Christoph Deil — the easiest way to deploy from a specification like that is from the CLI
prefect deployment create file.py
Then you’d start the server
prefect orion start
We actually don’t have a great way to support the
flow.run(…)
on a schedule pattern yet, we’ll have to think about the best way to address that use case.
I’d recommend using cron to call your flow script every 10 seconds instead
c
This seems to almost work and do the equivalent:
Copy code
schedule = IntervalSchedule(interval=datetime.timedelta(seconds=10))
    deployment_spec = DeploymentSpec(name="hola", flow=greetings_flow, schedule=schedule)
    client = OrionClient()
    deployment_id = create_deployment_from_spec(spec=deployment_spec, client=client)
    subprocess.run("prefect orion start", shell=True)
But I see some errors:
Copy code
22:59:32.526 | Exception in callback SubprocessStreamProtocol.pipe_data_received(1, b"EXCEPTION 8...XCEPTION 81\n")
handle: <Handle SubprocessStreamProtocol.pipe_data_received(1, b"EXCEPTION 8...XCEPTION 81\n")>
Traceback (most recent call last):
  File "/Users/cdeil/opt/anaconda3/envs/prefect-orion/lib/python3.9/asyncio/events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  File "/Users/cdeil/opt/anaconda3/envs/prefect-orion/lib/python3.9/asyncio/subprocess.py", line 73, in pipe_data_received
    reader.feed_data(data)
  File "/Users/cdeil/opt/anaconda3/envs/prefect-orion/lib/python3.9/asyncio/streams.py", line 472, in feed_data
    assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
22:59:32.535 | Flow run '3f327c9d-dbd5-43af-a61f-1c6e6df30c86' exited with exception: BrokenWorkerProcess()
So I guess trying to make it work from one Python process isn’t a good idea? (it’s what we currently have, a click CLI which does some argument stuff and then creates a flow and schedule and runs it based on user arguments to the CLI)
z
Yeah I wouldn’t recommend trying to do it that way
For your use case example
Copy code
while True:
    greetings_flow()
    time.sleep(10)
should be sufficient?
You can handle an arbitrary schedule with something like, but the
Schedule.get_dates
interface is only intended to be consumed by our scheduler
Copy code
import sys
from datetime import timedelta

import anyio
import pendulum

from prefect import flow
from prefect.orion.schemas.schedules import IntervalSchedule


@flow
def my_flow():
    print(f"Hello! Running with {sys.executable}")


schedule = IntervalSchedule(interval=timedelta(seconds=10))


async def main():
    while True:
        now = pendulum.now("utc")
        next_date = (await schedule.get_dates(start=now, n=1))[0]
        wait_time = now.diff(next_date).total_seconds()
        print(f"Waiting {wait_time} seconds until {next_date}")
        await anyio.sleep(wait_time)
        my_flow()


anyio.run(main)
1
c
@Zanie - thank you! We’ll try it out and discuss in the team today which style we prefer (either sleep in function, or only deploy from the script but then run
prefect orion start
separately as main blocking command in the pod.
z
The sleep solution is better if you want to pull all of the run data into a single Orion instance in the future, you can just set a
PREFECT_ORION_HOST
env var and they’ll report to one server.
In the second, you’ll be running multiple servers and if you mix their data they’ll start running flows that belong in other pods.
154 Views