Christoph Deil
01/12/2022, 9:39 PMschedule = IntervalSchedule(interval=datetime.timedelta(seconds=10))
deployment_spec = DeploymentSpec(name="hola", flow=greetings_flow, schedule=schedule)
Do I now use OrionClient and some methods to deploy?
We currently use Prefect core in a pod and simply do flow.run() with a schedule attached, and I’m looking for a working example to do the equivalent in Orion (even if I gather behind the scenes it will do something else via a server and DB).
Basically I’m looking for this: https://orion-docs.prefect.io/concepts/deployments/#running-deployments-with-the-api
🙂Michael Adkins
01/12/2022, 9:49 PMprefect deployment create file.py
Then you’d start the server prefect orion start
We actually don’t have a great way to support the flow.run(…)
on a schedule pattern yet, we’ll have to think about the best way to address that use case.Christoph Deil
01/12/2022, 10:03 PMschedule = IntervalSchedule(interval=datetime.timedelta(seconds=10))
deployment_spec = DeploymentSpec(name="hola", flow=greetings_flow, schedule=schedule)
client = OrionClient()
deployment_id = create_deployment_from_spec(spec=deployment_spec, client=client)
subprocess.run("prefect orion start", shell=True)
But I see some errors:
22:59:32.526 | Exception in callback SubprocessStreamProtocol.pipe_data_received(1, b"EXCEPTION 8...XCEPTION 81\n")
handle: <Handle SubprocessStreamProtocol.pipe_data_received(1, b"EXCEPTION 8...XCEPTION 81\n")>
Traceback (most recent call last):
File "/Users/cdeil/opt/anaconda3/envs/prefect-orion/lib/python3.9/asyncio/events.py", line 80, in _run
self._context.run(self._callback, *self._args)
File "/Users/cdeil/opt/anaconda3/envs/prefect-orion/lib/python3.9/asyncio/subprocess.py", line 73, in pipe_data_received
reader.feed_data(data)
File "/Users/cdeil/opt/anaconda3/envs/prefect-orion/lib/python3.9/asyncio/streams.py", line 472, in feed_data
assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
22:59:32.535 | Flow run '3f327c9d-dbd5-43af-a61f-1c6e6df30c86' exited with exception: BrokenWorkerProcess()
So I guess trying to make it work from one Python process isn’t a good idea?
(it’s what we currently have, a click CLI which does some argument stuff and then creates a flow and schedule and runs it based on user arguments to the CLI)Michael Adkins
01/12/2022, 10:38 PMwhile True:
greetings_flow()
time.sleep(10)
should be sufficient?Schedule.get_dates
interface is only intended to be consumed by our scheduler
import sys
from datetime import timedelta
import anyio
import pendulum
from prefect import flow
from prefect.orion.schemas.schedules import IntervalSchedule
@flow
def my_flow():
print(f"Hello! Running with {sys.executable}")
schedule = IntervalSchedule(interval=timedelta(seconds=10))
async def main():
while True:
now = pendulum.now("utc")
next_date = (await schedule.get_dates(start=now, n=1))[0]
wait_time = now.diff(next_date).total_seconds()
print(f"Waiting {wait_time} seconds until {next_date}")
await anyio.sleep(wait_time)
my_flow()
anyio.run(main)
Christoph Deil
01/13/2022, 8:09 AMprefect orion start
separately as main blocking command in the pod.Michael Adkins
01/13/2022, 4:03 PMPREFECT_ORION_HOST
env var and they’ll report to one server.