Hello everyone, I’m having trouble getting the `pr...
# prefect-community
j
Hello everyone, I’m having trouble getting the
prefect agent
to execute within my Django environment (using Django ORM in the flow). I finally got the deployment created by adding my project to the sys path before running the Django setup. That works for running the flow manually, but now I’m getting another error when the Agent tries to run the flow. Full error in the thread message. So my idea at this point is to create a Django Management command (which executes in the Django environment) to run the agent through Python code. I have an example of doing this in Prefect v1, but I’m using Prefect v2 and haven’t see where this is clearly documented.
LocalAgent
no longer seems to exist in Prefect v2 to run an agent through code. Any ideas of other ways to run this or how to run the agent from within Python would be super helpful and wholly appreciated!
Here’s the full agent run + error message output…
Copy code
└─[$] <git:(feature/prefect-tasks*)> prefect agent start -q "tenzinga_django_dev"
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Starting v2.4.1 agent connected to <https://api.prefect.cloud/api/accounts/5626ffe9-0140-4e88-babc-4a4fc614bb99/workspaces/ee8a533d-2754-420e-87f2-2d6b084984af>...

  ___ ___ ___ ___ ___ ___ _____     _   ___ ___ _  _ _____
 | _ \ _ \ __| __| __/ __|_   _|   /_\ / __| __| \| |_   _|
 |  _/   / _|| _|| _| (__  | |    / _ \ (_ | _|| .` | | |
 |_| |_|_\___|_| |___\___| |_|   /_/ \_\___|___|_|\_| |_|


Agent started! Looking for work from queue(s): tenzinga_django_dev...
14:25:57.159 | INFO    | prefect.agent - Submitting flow run '92c51db6-de80-4b48-9abc-8c5228e03658'
14:25:57.399 | INFO    | prefect.infrastructure.process - Opening process 'mahogany-wildebeest'...
14:25:57.408 | INFO    | prefect.agent - Completed submission of flow run '92c51db6-de80-4b48-9abc-8c5228e03658'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
14:26:26.401 | ERROR   | Flow run 'mahogany-wildebeest' - Flow could not be retrieved from deployment.
Traceback (most recent call last):
  File "<frozen importlib._bootstrap_external>", line 843, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "base/flows/log_entry.py", line 22, in <module>
    django.setup()
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/django/__init__.py", line 19, in setup
    configure_logging(settings.LOGGING_CONFIG, settings.LOGGING)
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/django/conf/__init__.py", line 79, in __getattr__
    self._setup(name)
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/django/conf/__init__.py", line 66, in _setup
    self._wrapped = Settings(settings_module)
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/django/conf/__init__.py", line 157, in __init__
    mod = importlib.import_module(self.SETTINGS_MODULE)
  File "/Users/ballisticpain/.pyenv/versions/anaconda3-2020.11/lib/python3.8/importlib/__init__.py", line 127, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
  File "<frozen importlib._bootstrap>", line 991, in _find_and_load
  File "<frozen importlib._bootstrap>", line 961, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
  File "<frozen importlib._bootstrap>", line 991, in _find_and_load
  File "<frozen importlib._bootstrap>", line 973, in _find_and_load_unlocked
ModuleNotFoundError: No module named 'ppower'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/engine.py", line 257, in retrieve_flow_then_begin_flow_run
    flow = await load_flow_from_flow_run(flow_run, client=client)
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/client/orion.py", line 80, in with_injected_client
    return await fn(*args, **kwargs)
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/deployments.py", line 70, in load_flow_from_flow_run
    flow = await run_sync_in_worker_thread(import_object, str(import_path))
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 57, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/utilities/importtools.py", line 193, in import_object
    module = load_script_as_module(script_path)
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/utilities/importtools.py", line 156, in load_script_as_module
    raise ScriptError(user_exc=exc, path=path) from exc
prefect.exceptions.ScriptError: Script at 'base/flows/log_entry.py' encountered an exception
14:26:32.275 | INFO    | prefect.infrastructure.process - Process 'mahogany-wildebeest' exited cleanly.
So I have found the
OrionAgent
and have attempted to run a Django Command using that… however that’s using
async/await
and I have zero experience there. Here’s what I am trying so far…
Copy code
# -*- coding: utf-8
from __future__ import unicode_literals

"""
Custom Django management command to work with Prefect.
Via various flags, you can run a specific flow on demand,
register flows with the server, or run a local agent.
"""
import os
from typing import Any

from django.core.management.base import BaseCommand, CommandError, CommandParser
from prefect.deployments import LocalFileSystem, Deployment
from prefect.agent import OrionAgent

# from deployments.flows.reset_timeseries_end_times import (
#     flow as reset_timeseries_end_times,
# )
# from deployments.flows import default_dataset_refresh

flows = {
    # "reset_timeseries_end_times": reset_timeseries_end_times,
    # default_dataset_refresh.flow_name: default_dataset_refresh.flow
}

# daily = Schedule(clocks=[CronClock("5 6 * * *")])


class Command(BaseCommand):
    help = "Run Prefect flow registration, agent, or a single flow"

    def add_arguments(self, parser: CommandParser) -> None:
        parser.add_argument(
            "--agent", help="Run local Prefect Agent", action="store_true"
        )
        parser.add_argument("--run_flow", help="Run a specified flow by name")

    def handle(self, *args: Any, **options: Any):
        if options["run_flow"]:
            try:
                flow = flows[options["run_flow"]]
            except KeyError:
                raise CommandError(
                    f"Specified flow ({options['run_flow']}) does not exist"
                )

            flow.run()

        if options["agent"]:
            self.agent()
        else:
            self.stdout.write("Not running local Prefect Agent")

    async def agent(self):
        """ Run a local Prefect agent """
        self.stdout.write("Running local Prefect Agent")

        agent = OrionAgent(["tenzinga_django_dev"])
        await agent.start()
As you can see I’ve started adding
async/await
to the various places based on the error output, but I don’t know how to turn a Django command into one that works with async/await…
r
I'm not sure if running an Agent that way will help. The agent is running the script in a separate process (so it won't see anything you added to sys.path, and can't find the ppower module.) Is your goal here to use the ORM models from an existing Django app inside your flow?
j
Ryan, thanks for responding… yes, my intention is to use Prefect to run several background tasks and/or scheduled tasks. Some may be triggered by an action on our app, and others are just the scheduled tasks…
The Django ORM being the biggest bit that needs Django…
I was hoping that it would grab the environment copy from the parents when it starts the new process…
If that isn’t the correct way, then how should I go about it?
r
Actually, looking through the entire exception, I might be incorrect about what's going wrong here. Is there any chance you could share the part of your code that is adding the project to the sys path?
j
@Ryan Peden Certainly… let me throw that in here and show you.
My Django is a little odd in it’s structure which has always made pathing stuff a bit difficult… usually I find a way though.
Here’s the top of my
log_entry.py
file. The rest of the code are the tasks and flows. I can show them to you if you feel they are useful, but I think this bit at the top is the relevant bit.
Copy code
# -*- coding: utf-8
from __future__ import unicode_literals

import casefy
import csv
import datetime
import django
import os
from pathlib import Path
import sys
from dateutil.relativedelta import relativedelta
from django.db.models import Q
from prefect import flow, task, get_run_logger
from typing import List, Dict


# set the default Django settings module for this module.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "ppower.settings")
ppower_path = Path.cwd().parent
sys.path.append(str(ppower_path.absolute()))
django.setup()

from ppower.base.models import Organization
The path that
ppower_path
returns (during the
prefect build
) is
/user/stuff/Development/tenzinga
where
ppower
is a folder within that directory. That’s my local setup. On production it is similar, but obviously slightly different.
r
Ok, makes sense. Since the agent is running your flow code in a subprocess that uses a different working directory, it looks like perhaps the
Path.cwd().parent
call is adding the wrong directory to the sys path when the agent runs it.
j
@Ryan Peden I hadn’t thought of that…
I was originally trying to go from the
__file__
. I wasn’t able to get it to build until I used the
cwd
bit on
Path
.
Trying a rebuild … to see what other path options I have. I just recalled after discussing this with you that I set a BASE_PATH setting in the settings of Django. I’m doing that with
str(Path(__file__).resolve().parent)
… I would need a couple of extra parent calls, but testing now to see what I get.
r
If you expect your Django code to always be in the same directory in production, putting the absolute directory in an environment variable would also be an easy way to make sure any subprocess the agent starts can find it easily. I'm a Django fan myself, so I'm going to experiment more with Django-Prefect integration. Ideally I'd like to create a collection of helpers that make it easy to use them together
j
I may already have an environment variable in production… didn’t think of that either. 🙂
I would be happy to help in anyway I can with the Django-Prefect integration.
I think it would greatly assist with adoption of Prefect within the Django community if it was easier to replace Celery with it.
So that still failed even though I got the build to work… I had it print the path this second time… checking now if it is correct.
Well … I can’t seem to see the output… here’s the error. Same as before from what I can tell…
Copy code
14:31:16.782 | INFO    | prefect.agent - Submitting flow run 'cb62d36d-860a-4ec2-9445-6f0968ade880'
14:31:17.275 | INFO    | prefect.infrastructure.process - Opening process 'omega6-oxkintok-ring'...
14:31:17.284 | INFO    | prefect.agent - Completed submission of flow run 'cb62d36d-860a-4ec2-9445-6f0968ade880'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
14:31:32.870 | ERROR   | Flow run 'omega6-oxkintok-ring' - Flow could not be retrieved from deployment.
Traceback (most recent call last):
  File "<frozen importlib._bootstrap_external>", line 843, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "base/flows/log_entry.py", line 20, in <module>
    logger = get_run_logger()
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/logging/loggers.py", line 109, in get_run_logger
    raise RuntimeError("There is no active flow or task run context.")
RuntimeError: There is no active flow or task run context.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/engine.py", line 257, in retrieve_flow_then_begin_flow_run
    flow = await load_flow_from_flow_run(flow_run, client=client)
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/client/orion.py", line 80, in with_injected_client
    return await fn(*args, **kwargs)
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/deployments.py", line 70, in load_flow_from_flow_run
    flow = await run_sync_in_worker_thread(import_object, str(import_path))
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 57, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/utilities/importtools.py", line 193, in import_object
    module = load_script_as_module(script_path)
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/utilities/importtools.py", line 156, in load_script_as_module
    raise ScriptError(user_exc=exc, path=path) from exc
prefect.exceptions.ScriptError: Script at 'base/flows/log_entry.py' encountered an exception
14:31:36.891 | INFO    | prefect.infrastructure.process - Process 'omega6-oxkintok-ring' exited cleanly.
Also here’s that relevant section of the
log_entry
flow file…
Copy code
# set the default Django settings module for this module.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "ppower.settings")
ppower_path = str(Path(__file__).resolve().parent.parent.parent.parent)
logger = get_run_logger()
<http://logger.info|logger.info>(ppower_path)
print(ppower_path)
sys.path.append(ppower_path)
django.setup()

from ppower.base.models import Organization
I didn’t see the
.info()
or
print()
output in the tasks results I pasted above.
I recall seeing somewhere I can have the flow run re-direct stdout (print) to the log?
I don’t recall where I would set that up…
@Ryan Peden I have gotten a bit further. It is definitely a pathing issue with the agent process. Here’s the code bit…
Copy code
try:

    # ensure we are in the path and are the working directory
    ppower_path = str(Path(__file__).resolve().parent.parent.parent.parent)
    os.chdir(ppower_path)
    sys.path.append(ppower_path)

    # set the default Django settings module for this module.
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "ppower.settings")
    django.setup()

except:

    print("**** Path ****", ppower_path)
Here’s the error output… (I added the ** to the Path after this run)…
Copy code
19:25:35.497 | INFO    | prefect.agent - Submitting flow run '32f36e7e-a51a-42d2-9138-a320330ffad4'
19:25:35.650 | INFO    | prefect.infrastructure.process - Opening process 'omicron941-hupyria'...
19:25:35.657 | INFO    | prefect.agent - Completed submission of flow run '32f36e7e-a51a-42d2-9138-a320330ffad4'
19:25:51.106 | ERROR   | Flow run 'omicron941-hupyria' - Flow could not be retrieved from deployment.
Traceback (most recent call last):
  File "<frozen importlib._bootstrap_external>", line 843, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "base/flows/log_entry.py", line 31, in <module>
    from ppower.base.models import Organization
ModuleNotFoundError: No module named 'ppower'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/engine.py", line 257, in retrieve_flow_then_begin_flow_run
    flow = await load_flow_from_flow_run(flow_run, client=client)
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/client/orion.py", line 82, in with_injected_client
    return await fn(*args, **kwargs)
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/deployments.py", line 173, in load_flow_from_flow_run
    flow = await run_sync_in_worker_thread(import_object, str(import_path))
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/utilities/importtools.py", line 193, in import_object
    module = load_script_as_module(script_path)
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/utilities/importtools.py", line 156, in load_script_as_module
    raise ScriptError(user_exc=exc, path=path) from exc
prefect.exceptions.ScriptError: Script at 'base/flows/log_entry.py' encountered an exception
Path /private/var/folders/sm/c1qt38h11hj0psj9bgjg4vy80000gn/T
19:25:55.121 | INFO    | prefect.infrastructure.process - Process 'omicron941-hupyria' exited cleanly.
r
Thanks for sharing that! I'm going to experiment with this in one of my Django apps over the weekend.
j
@Ryan Peden Feel free to reach out if you need anything or if I can be of assistance! I appreciate your efforts on this!
@Ryan Peden I hope all is well with you and yours. No worries if you didn’t have any time over the weekend, I know mine was busy. If you have any further ideas even if not tested, let me know. I’m still actively working to get this solved. Thank you for your time and efforts to help!!