Jarvis Stubblefield
10/07/2022, 7:10 PMNate
10/07/2022, 10:28 PMIs there anything wrong technically or theoretically with that thought?From what I can get out of your description, I don't believe there's anything inherently wrong with that idea! Given that this is a relatively complex use-case, I would encourage you to reach out to cs@prefect.io if you need more specific / customized infrastructure advice.
Jarvis Stubblefield
10/07/2022, 11:57 PMAndrew Brookins
10/09/2022, 12:08 AMJarvis Stubblefield
10/10/2022, 7:24 PMAndrew Brookins
10/10/2022, 7:39 PMAnthony Head
10/10/2022, 7:47 PMAndrew Brookins
10/10/2022, 7:50 PMJarvis Stubblefield
10/10/2022, 7:50 PManyio
and attempted to run the OrionAgent
within the Django command, but I don’t know what I’m doing and it’s obvious. lolanyio.run()
.Kalise Richmond
10/13/2022, 6:57 PMJarvis Stubblefield
10/13/2022, 7:04 PM# -*- coding: utf-8
from __future__ import unicode_literals
"""
Custom Django management command to work with Prefect.
Via various flags, you can run a specific flow on demand,
register flows with the server, or run a local agent.
"""
import anyio
import os
from typing import Any
from django.core.management.base import BaseCommand, CommandError, CommandParser
from prefect.deployments import LocalFileSystem, Deployment
from prefect.agent import OrionAgent
# from deployments.flows.reset_timeseries_end_times import (
# flow as reset_timeseries_end_times,
# )
# from deployments.flows import default_dataset_refresh
flows = {
# "reset_timeseries_end_times": reset_timeseries_end_times,
# default_dataset_refresh.flow_name: default_dataset_refresh.flow
}
# daily = Schedule(clocks=[CronClock("5 6 * * *")])
class Command(BaseCommand):
help = "Run Prefect flow registration, agent, or a single flow"
def add_arguments(self, parser: CommandParser) -> None:
parser.add_argument(
"--agent", help="Run local Prefect Agent", action="store_true"
)
parser.add_argument("--run_flow", help="Run a specified flow by name")
def handle(self, *args: Any, **options: Any):
if options["run_flow"]:
try:
flow = flows[options["run_flow"]]
except KeyError:
raise CommandError(
f"Specified flow ({options['run_flow']}) does not exist"
)
flow.run()
if options["agent"]:
anyio.run(self.agent)
else:
self.stdout.write("Not running local Prefect Agent")
async def agent(self):
""" Run a local Prefect agent """
self.stdout.write("Running local Prefect Agent")
agent = OrionAgent(["tenzinga_django_dev"])
await agent.start()
(ppower) ┌─[ballisticpain@BallisticDevelopment] - [~/Development/tenzinga/ppower] - [Thu Oct 13, 14:05]
└─[$] <git:(feature/prefect-tasks*)> ./manage.py prefect --agent
Running local Prefect Agent
Andrew Brookins
10/13/2022, 8:58 PMimport asyncio
from typing import List
from django.core.management.base import BaseCommand, CommandError
from prefect.agent import OrionAgent
from prefect.client import get_client
from prefect.settings import PREFECT_AGENT_QUERY_INTERVAL
from prefect.utilities.services import critical_service_loop
async def start_agent(work_queues: List[str]):
async with get_client() as client:
async with OrionAgent(work_queues=work_queues) as agent:
print(
"Agent started! Looking for work from "
f"queue(s): {', '.join(work_queues)}..."
)
await critical_service_loop(
agent.get_and_submit_flow_runs,
PREFECT_AGENT_QUERY_INTERVAL.value(),
printer=print,
)
class Command(BaseCommand):
help = 'Start the Prefect agent'
def add_arguments(self, parser):
parser.add_argument('work_queues', nargs='+', type=str)
def handle(self, *args, **options):
asyncio.run(start_agent(options["work_queues"]))
self.stdout.write(self.style.SUCCESS('Successfully started Prefect agent'))
django-prefect
extension if we want to make this more usable (or a community member will).
I was able to write a flow that used the Django ORM and run it within a Django view — but this doesn’t use “asynchronous” execution; it runs the flow within the HTTP request/response cycle.
What I wanted was to get Celery-like “fire and forget” semantics, where I could, in Prefect terminology, schedule a flow run within the view and end the HTTP request without waiting for the flow run to complete.
This should be possible using a Deployment and the new run_deployment()
utility. However, I ran into pathing and import issues. IIRC, if I stored the flow code in a Django app, the agent would pick up the flow run correctly but not be able to find the flow on disk (I used local storage to store the flow). And I think if I stored the flow outside of a Django app, I had trouble setting up Django so that the ORM was usable by the flow — how you do this changed since I worked with Django. 😅
I haven’t had time to dig more into it!Jarvis Stubblefield
10/13/2022, 11:47 PMAndrew Brookins
10/13/2022, 11:49 PMJarvis Stubblefield
10/13/2022, 11:49 PMNate
10/13/2022, 11:54 PMAndrew Brookins
10/13/2022, 11:55 PMJarvis Stubblefield
10/14/2022, 12:00 AMAndrew Brookins
10/14/2022, 12:51 AMJarvis Stubblefield
10/14/2022, 4:45 PM