Title
c

Christoph Deil

01/12/2022, 9:39 PM
How do I create and run flows on a schedule in Prefect Orion? Let’s say I have this:
schedule = IntervalSchedule(interval=datetime.timedelta(seconds=10))
deployment_spec = DeploymentSpec(name="hola", flow=greetings_flow, schedule=schedule)
Do I now use OrionClient and some methods to deploy? We currently use Prefect core in a pod and simply do flow.run() with a schedule attached, and I’m looking for a working example to do the equivalent in Orion (even if I gather behind the scenes it will do something else via a server and DB). Basically I’m looking for this: https://orion-docs.prefect.io/concepts/deployments/#running-deployments-with-the-api 🙂
m

Michael Adkins

01/12/2022, 9:49 PM
Hey @Christoph Deil — the easiest way to deploy from a specification like that is from the CLI
prefect deployment create file.py
Then you’d start the server
prefect orion start
We actually don’t have a great way to support the
flow.run(…)
on a schedule pattern yet, we’ll have to think about the best way to address that use case.
I’d recommend using cron to call your flow script every 10 seconds instead
c

Christoph Deil

01/12/2022, 10:03 PM
This seems to almost work and do the equivalent:
schedule = IntervalSchedule(interval=datetime.timedelta(seconds=10))
    deployment_spec = DeploymentSpec(name="hola", flow=greetings_flow, schedule=schedule)
    client = OrionClient()
    deployment_id = create_deployment_from_spec(spec=deployment_spec, client=client)
    subprocess.run("prefect orion start", shell=True)
But I see some errors:
22:59:32.526 | Exception in callback SubprocessStreamProtocol.pipe_data_received(1, b"EXCEPTION 8...XCEPTION 81\n")
handle: <Handle SubprocessStreamProtocol.pipe_data_received(1, b"EXCEPTION 8...XCEPTION 81\n")>
Traceback (most recent call last):
  File "/Users/cdeil/opt/anaconda3/envs/prefect-orion/lib/python3.9/asyncio/events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  File "/Users/cdeil/opt/anaconda3/envs/prefect-orion/lib/python3.9/asyncio/subprocess.py", line 73, in pipe_data_received
    reader.feed_data(data)
  File "/Users/cdeil/opt/anaconda3/envs/prefect-orion/lib/python3.9/asyncio/streams.py", line 472, in feed_data
    assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
22:59:32.535 | Flow run '3f327c9d-dbd5-43af-a61f-1c6e6df30c86' exited with exception: BrokenWorkerProcess()
So I guess trying to make it work from one Python process isn’t a good idea? (it’s what we currently have, a click CLI which does some argument stuff and then creates a flow and schedule and runs it based on user arguments to the CLI)
m

Michael Adkins

01/12/2022, 10:38 PM
Yeah I wouldn’t recommend trying to do it that way
For your use case example
while True:
    greetings_flow()
    time.sleep(10)
should be sufficient?
You can handle an arbitrary schedule with something like, but the
Schedule.get_dates
interface is only intended to be consumed by our scheduler
import sys
from datetime import timedelta

import anyio
import pendulum

from prefect import flow
from prefect.orion.schemas.schedules import IntervalSchedule


@flow
def my_flow():
    print(f"Hello! Running with {sys.executable}")


schedule = IntervalSchedule(interval=timedelta(seconds=10))


async def main():
    while True:
        now = pendulum.now("utc")
        next_date = (await schedule.get_dates(start=now, n=1))[0]
        wait_time = now.diff(next_date).total_seconds()
        print(f"Waiting {wait_time} seconds until {next_date}")
        await anyio.sleep(wait_time)
        my_flow()


anyio.run(main)
1
c

Christoph Deil

01/13/2022, 8:09 AM
@Michael Adkins - thank you! We’ll try it out and discuss in the team today which style we prefer (either sleep in function, or only deploy from the script but then run
prefect orion start
separately as main blocking command in the pod.
m

Michael Adkins

01/13/2022, 4:03 PM
The sleep solution is better if you want to pull all of the run data into a single Orion instance in the future, you can just set a
PREFECT_ORION_HOST
env var and they’ll report to one server.
In the second, you’ll be running multiple servers and if you mix their data they’ll start running flows that belong in other pods.