https://prefect.io logo
j

Jarvis Stubblefield

10/07/2022, 7:10 PM
I would like some advice about production deployment of agents. I’m using Prefect 2 with Prefect Cloud 2… So I’m working to get my agent(s) to work within my Django environment…once that is finished, I am planning on deploying a new production machine behind a Load Balancer with a minimal percentage of production load as it’s primary job is to run the agent(s) and their tasks. Is there anything wrong technically or theoretically with that thought? I have to have my full production code available to Prefect as the tasks are going to be triggered by actions people can take within our system. (We are using this as a replacement for Celery, hopefully).
1
n

Nate

10/07/2022, 10:28 PM
Hi @Jarvis Stubblefield
Is there anything wrong technically or theoretically with that thought?
From what I can get out of your description, I don't believe there's anything inherently wrong with that idea! Given that this is a relatively complex use-case, I would encourage you to reach out to cs@prefect.io if you need more specific / customized infrastructure advice.
👍 1
j

Jarvis Stubblefield

10/07/2022, 11:57 PM
@Nate I have reached out to that email address. Thanks for your suggestion and I look forward to hearing back!
a

Andrew Brookins

10/09/2022, 12:08 AM
Another thing you could do is create Deployments for your flows and then use our recent utility run_deployment() to kick them off. See the API docs here: https://docs.prefect.io/api-ref/prefect/deployments/#prefect.deployments.run_deployment
j

Jarvis Stubblefield

10/10/2022, 7:24 PM
@Andrew Brookins At first I thought you were talking about my need to run the same Deployment (schedule + flow) with different parameters, but from re-reading your response and my question… you’re talking about using that method for the application to kick off a flow run from a deployment when users “click” or interact with the system.
@Nate Allie responded to my email and told me I needed to discuss this with the community as I’m not a paying customer. Going to be hard for me to be a paying customer if I cannot get the software to work properly. I’m a bit confused and surprised by the bouncing back and forth I am receiving… I understand they likely only respond to paying customers, however, I’m not yet one and if I cannot get the system working I will never be one.
a

Andrew Brookins

10/10/2022, 7:39 PM
For sure! Happy to help you work through it as best I can. I’m a former Django dev myself and personally love the idea of using Prefect instead of Celery, after many years of using Celery. 😄
a

Anthony Head

10/10/2022, 7:47 PM
@Jarvis Stubblefield just sent over my calendar via email. Looking forward to connecting
a

Andrew Brookins

10/10/2022, 7:50 PM
As far as Deployments go, they’re a way to establish metadata about how to run a flow, and once you set one up, you can trigger it by name via the API. So if you’re only keeping the two codebases together (flows and Django app) so that you can run your Prefect flows directly from the Django app, this is an option that might help you do that without needing them to run together. However, if your Prefect flows need to use your project’s Django ORM models, then it could make sense for everything to exist together, as I think you described. I’ll check your other thread, where I saw Ryan looking at some of your output.
j

Jarvis Stubblefield

10/10/2022, 7:50 PM
@Andrew Brookins & @Anthony Head I look forward to connecting with each of you and making Django / Prefect a tighter and more cohesive integration as I do think Prefect would make an excellent replacement for Celery 🙂.
🙌 2
@Anthony Head I look forward to meeting with you this Friday. Thank you for reaching out! 🙂
@Andrew Brookins Yea, Ryan was looking at some of my output (seems it’s using a private TMP directory or something along those lines). I know if I could run “prefect” through the Django Management commands it would have the “right” environment. However, I’ve not worked with AsyncIO but think I might have an idea of how to make that work with the Django command.
@Andrew Brookins My idea isn’t working. I installed
anyio
and attempted to run the
OrionAgent
within the Django command, but I don’t know what I’m doing and it’s obvious. lol
It is talking about needing a co-routine or something when I do
anyio.run()
.
k

Kalise Richmond

10/13/2022, 6:57 PM
Hey Jarvis, are you able to share some sample code for how you are running the OrionAgent or the stack trace for the error message that you are getting?
j

Jarvis Stubblefield

10/13/2022, 7:04 PM
@Kalise Richmond Sorry took me a minute to find your message since Slack is having issues with Threads... I'd be happy to share it all ... 🙂 one moment.
I actually just got this to work. It simply printed that it was running the agent and exited the command. And I'm not certain running the agent is the correct path, just one I'm trying...
Copy code
# -*- coding: utf-8
from __future__ import unicode_literals

"""
Custom Django management command to work with Prefect.
Via various flags, you can run a specific flow on demand,
register flows with the server, or run a local agent.
"""
import anyio
import os
from typing import Any

from django.core.management.base import BaseCommand, CommandError, CommandParser
from prefect.deployments import LocalFileSystem, Deployment
from prefect.agent import OrionAgent

# from deployments.flows.reset_timeseries_end_times import (
#     flow as reset_timeseries_end_times,
# )
# from deployments.flows import default_dataset_refresh

flows = {
    # "reset_timeseries_end_times": reset_timeseries_end_times,
    # default_dataset_refresh.flow_name: default_dataset_refresh.flow
}

# daily = Schedule(clocks=[CronClock("5 6 * * *")])


class Command(BaseCommand):
    help = "Run Prefect flow registration, agent, or a single flow"

    def add_arguments(self, parser: CommandParser) -> None:
        parser.add_argument(
            "--agent", help="Run local Prefect Agent", action="store_true"
        )
        parser.add_argument("--run_flow", help="Run a specified flow by name")

    def handle(self, *args: Any, **options: Any):
        if options["run_flow"]:
            try:
                flow = flows[options["run_flow"]]
            except KeyError:
                raise CommandError(
                    f"Specified flow ({options['run_flow']}) does not exist"
                )

            flow.run()

        if options["agent"]:
            anyio.run(self.agent)
        else:
            self.stdout.write("Not running local Prefect Agent")

    async def agent(self):
        """ Run a local Prefect agent """
        self.stdout.write("Running local Prefect Agent")

        agent = OrionAgent(["tenzinga_django_dev"])
        await agent.start()
This is the running of that command (now that I got it to run)...
Copy code
(ppower) ┌─[ballisticpain@BallisticDevelopment] - [~/Development/tenzinga/ppower] - [Thu Oct 13, 14:05]
└─[$] <git:(feature/prefect-tasks*)> ./manage.py prefect --agent
Running local Prefect Agent
a

Andrew Brookins

10/13/2022, 8:58 PM
I spent some time on this and ran into some issues that I haven’t had time to resolve. Here is the agent wrapper I came up with:
Copy code
import asyncio
from typing import List

from django.core.management.base import BaseCommand, CommandError
from prefect.agent import OrionAgent
from prefect.client import get_client
from prefect.settings import PREFECT_AGENT_QUERY_INTERVAL
from prefect.utilities.services import critical_service_loop


async def start_agent(work_queues: List[str]):
    async with get_client() as client:
        async with OrionAgent(work_queues=work_queues) as agent:
            print(
                "Agent started! Looking for work from "
                f"queue(s): {', '.join(work_queues)}..."
            )

            await critical_service_loop(
                agent.get_and_submit_flow_runs,
                PREFECT_AGENT_QUERY_INTERVAL.value(),
                printer=print,
            )


class Command(BaseCommand):
    help = 'Start the Prefect agent'

    def add_arguments(self, parser):
        parser.add_argument('work_queues', nargs='+', type=str)

    def handle(self, *args, **options):
        asyncio.run(start_agent(options["work_queues"]))
        self.stdout.write(self.style.SUCCESS('Successfully started Prefect agent'))
My feeling is that we’ll need to provide the community with some kind of
django-prefect
extension if we want to make this more usable (or a community member will). I was able to write a flow that used the Django ORM and run it within a Django view — but this doesn’t use “asynchronous” execution; it runs the flow within the HTTP request/response cycle. What I wanted was to get Celery-like “fire and forget” semantics, where I could, in Prefect terminology, schedule a flow run within the view and end the HTTP request without waiting for the flow run to complete. This should be possible using a Deployment and the new
run_deployment()
utility. However, I ran into pathing and import issues. IIRC, if I stored the flow code in a Django app, the agent would pick up the flow run correctly but not be able to find the flow on disk (I used local storage to store the flow). And I think if I stored the flow outside of a Django app, I had trouble setting up Django so that the ORM was usable by the flow — how you do this changed since I worked with Django. 😅 I haven’t had time to dig more into it!
Oh wow, I just got this working
Holy smokes 😂
j

Jarvis Stubblefield

10/13/2022, 11:47 PM
Woot!!
I’m excited to see what you have arrived at. I got my flow to run, it errors out with thread stuff I’ve not worked with before
a

Andrew Brookins

10/13/2022, 11:49 PM
It’s not perfect — I need to figure out a settings-related thing. I’ll put up a rough example repo when that’s smoothed out.
@Jarvis Stubblefield If you post the traceback I can take a look.
j

Jarvis Stubblefield

10/13/2022, 11:49 PM
I think I need to provide a local storage block or something to save the file to so I can then work on the part that sends an email
This has my progress today… I am understanding Prefect a bit more. I think we need something to tell the process what working directory to use… like I can if I use SupervisorD or SystemD
n

Nate

10/13/2022, 11:54 PM
Hi @Jarvis Stubblefield - not totally caught up on this thread, but this discourse article may help with running your agent with systemd
👀 1
a

Andrew Brookins

10/13/2022, 11:55 PM
@Jarvis Stubblefield I’ll need the whole trace to help with the threading issue (it’ll be long).
j

Jarvis Stubblefield

10/14/2022, 12:00 AM
Yes sir… I’ll get that to you later tonight. I’ll have to save as a file… I tried putting it in directly and it was 10k characters too long…
🙌 1
a

Andrew Brookins

10/14/2022, 12:51 AM
Ok, rough draft of what I cobbled together: https://github.com/abrookins/django-prefect-example
j

Jarvis Stubblefield

10/14/2022, 4:45 PM
@Andrew Brookins I didn’t end up needing to send you the logs as I was able to work through them and have the runs working and creating files. I created a Deployment script for the same flow. Now I’m going to check out the repository you so graciously created. I’m also looking at creating an issue/PR for Process.
🙌 1
@Anthony Head I have the Calendly event on my calendar, however, there’s no meeting link or phone number to be able to meet?
1
3 Views