Jarvis Stubblefield
10/05/2022, 8:51 PMprefect agent
to execute within my Django environment (using Django ORM in the flow). I finally got the deployment created by adding my project to the sys path before running the Django setup. That works for running the flow manually, but now I’m getting another error when the Agent tries to run the flow. Full error in the thread message. So my idea at this point is to create a Django Management command (which executes in the Django environment) to run the agent through Python code. I have an example of doing this in Prefect v1, but I’m using Prefect v2 and haven’t see where this is clearly documented. LocalAgent
no longer seems to exist in Prefect v2 to run an agent through code. Any ideas of other ways to run this or how to run the agent from within Python would be super helpful and wholly appreciated!└─[$] <git:(feature/prefect-tasks*)> prefect agent start -q "tenzinga_django_dev"
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Starting v2.4.1 agent connected to <https://api.prefect.cloud/api/accounts/5626ffe9-0140-4e88-babc-4a4fc614bb99/workspaces/ee8a533d-2754-420e-87f2-2d6b084984af>...
___ ___ ___ ___ ___ ___ _____ _ ___ ___ _ _ _____
| _ \ _ \ __| __| __/ __|_ _| /_\ / __| __| \| |_ _|
| _/ / _|| _|| _| (__ | | / _ \ (_ | _|| .` | | |
|_| |_|_\___|_| |___\___| |_| /_/ \_\___|___|_|\_| |_|
Agent started! Looking for work from queue(s): tenzinga_django_dev...
14:25:57.159 | INFO | prefect.agent - Submitting flow run '92c51db6-de80-4b48-9abc-8c5228e03658'
14:25:57.399 | INFO | prefect.infrastructure.process - Opening process 'mahogany-wildebeest'...
14:25:57.408 | INFO | prefect.agent - Completed submission of flow run '92c51db6-de80-4b48-9abc-8c5228e03658'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
14:26:26.401 | ERROR | Flow run 'mahogany-wildebeest' - Flow could not be retrieved from deployment.
Traceback (most recent call last):
File "<frozen importlib._bootstrap_external>", line 843, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "base/flows/log_entry.py", line 22, in <module>
django.setup()
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/django/__init__.py", line 19, in setup
configure_logging(settings.LOGGING_CONFIG, settings.LOGGING)
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/django/conf/__init__.py", line 79, in __getattr__
self._setup(name)
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/django/conf/__init__.py", line 66, in _setup
self._wrapped = Settings(settings_module)
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/django/conf/__init__.py", line 157, in __init__
mod = importlib.import_module(self.SETTINGS_MODULE)
File "/Users/ballisticpain/.pyenv/versions/anaconda3-2020.11/lib/python3.8/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
File "<frozen importlib._bootstrap>", line 991, in _find_and_load
File "<frozen importlib._bootstrap>", line 961, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
File "<frozen importlib._bootstrap>", line 991, in _find_and_load
File "<frozen importlib._bootstrap>", line 973, in _find_and_load_unlocked
ModuleNotFoundError: No module named 'ppower'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/engine.py", line 257, in retrieve_flow_then_begin_flow_run
flow = await load_flow_from_flow_run(flow_run, client=client)
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/client/orion.py", line 80, in with_injected_client
return await fn(*args, **kwargs)
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/deployments.py", line 70, in load_flow_from_flow_run
flow = await run_sync_in_worker_thread(import_object, str(import_path))
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 57, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/utilities/importtools.py", line 193, in import_object
module = load_script_as_module(script_path)
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/utilities/importtools.py", line 156, in load_script_as_module
raise ScriptError(user_exc=exc, path=path) from exc
prefect.exceptions.ScriptError: Script at 'base/flows/log_entry.py' encountered an exception
14:26:32.275 | INFO | prefect.infrastructure.process - Process 'mahogany-wildebeest' exited cleanly.
OrionAgent
and have attempted to run a Django Command using that… however that’s using async/await
and I have zero experience there. Here’s what I am trying so far…# -*- coding: utf-8
from __future__ import unicode_literals
"""
Custom Django management command to work with Prefect.
Via various flags, you can run a specific flow on demand,
register flows with the server, or run a local agent.
"""
import os
from typing import Any
from django.core.management.base import BaseCommand, CommandError, CommandParser
from prefect.deployments import LocalFileSystem, Deployment
from prefect.agent import OrionAgent
# from deployments.flows.reset_timeseries_end_times import (
# flow as reset_timeseries_end_times,
# )
# from deployments.flows import default_dataset_refresh
flows = {
# "reset_timeseries_end_times": reset_timeseries_end_times,
# default_dataset_refresh.flow_name: default_dataset_refresh.flow
}
# daily = Schedule(clocks=[CronClock("5 6 * * *")])
class Command(BaseCommand):
help = "Run Prefect flow registration, agent, or a single flow"
def add_arguments(self, parser: CommandParser) -> None:
parser.add_argument(
"--agent", help="Run local Prefect Agent", action="store_true"
)
parser.add_argument("--run_flow", help="Run a specified flow by name")
def handle(self, *args: Any, **options: Any):
if options["run_flow"]:
try:
flow = flows[options["run_flow"]]
except KeyError:
raise CommandError(
f"Specified flow ({options['run_flow']}) does not exist"
)
flow.run()
if options["agent"]:
self.agent()
else:
self.stdout.write("Not running local Prefect Agent")
async def agent(self):
""" Run a local Prefect agent """
self.stdout.write("Running local Prefect Agent")
agent = OrionAgent(["tenzinga_django_dev"])
await agent.start()
async/await
to the various places based on the error output, but I don’t know how to turn a Django command into one that works with async/await…Ryan Peden
10/05/2022, 11:09 PMJarvis Stubblefield
10/06/2022, 12:06 AMRyan Peden
10/06/2022, 12:31 AMJarvis Stubblefield
10/07/2022, 7:02 PMlog_entry.py
file. The rest of the code are the tasks and flows. I can show them to you if you feel they are useful, but I think this bit at the top is the relevant bit.
# -*- coding: utf-8
from __future__ import unicode_literals
import casefy
import csv
import datetime
import django
import os
from pathlib import Path
import sys
from dateutil.relativedelta import relativedelta
from django.db.models import Q
from prefect import flow, task, get_run_logger
from typing import List, Dict
# set the default Django settings module for this module.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "ppower.settings")
ppower_path = Path.cwd().parent
sys.path.append(str(ppower_path.absolute()))
django.setup()
from ppower.base.models import Organization
ppower_path
returns (during the prefect build
) is /user/stuff/Development/tenzinga
where ppower
is a folder within that directory. That’s my local setup. On production it is similar, but obviously slightly different.Ryan Peden
10/07/2022, 7:11 PMPath.cwd().parent
call is adding the wrong directory to the sys path when the agent runs it.Jarvis Stubblefield
10/07/2022, 7:14 PM__file__
. I wasn’t able to get it to build until I used the cwd
bit on Path
.str(Path(__file__).resolve().parent)
… I would need a couple of extra parent calls, but testing now to see what I get.Ryan Peden
10/07/2022, 7:21 PMJarvis Stubblefield
10/07/2022, 7:22 PM14:31:16.782 | INFO | prefect.agent - Submitting flow run 'cb62d36d-860a-4ec2-9445-6f0968ade880'
14:31:17.275 | INFO | prefect.infrastructure.process - Opening process 'omega6-oxkintok-ring'...
14:31:17.284 | INFO | prefect.agent - Completed submission of flow run 'cb62d36d-860a-4ec2-9445-6f0968ade880'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
Failed to parse annotation from 'Name' node: 'NoneType' object has no attribute 'resolve'
14:31:32.870 | ERROR | Flow run 'omega6-oxkintok-ring' - Flow could not be retrieved from deployment.
Traceback (most recent call last):
File "<frozen importlib._bootstrap_external>", line 843, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "base/flows/log_entry.py", line 20, in <module>
logger = get_run_logger()
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/logging/loggers.py", line 109, in get_run_logger
raise RuntimeError("There is no active flow or task run context.")
RuntimeError: There is no active flow or task run context.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/engine.py", line 257, in retrieve_flow_then_begin_flow_run
flow = await load_flow_from_flow_run(flow_run, client=client)
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/client/orion.py", line 80, in with_injected_client
return await fn(*args, **kwargs)
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/deployments.py", line 70, in load_flow_from_flow_run
flow = await run_sync_in_worker_thread(import_object, str(import_path))
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 57, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/utilities/importtools.py", line 193, in import_object
module = load_script_as_module(script_path)
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/utilities/importtools.py", line 156, in load_script_as_module
raise ScriptError(user_exc=exc, path=path) from exc
prefect.exceptions.ScriptError: Script at 'base/flows/log_entry.py' encountered an exception
14:31:36.891 | INFO | prefect.infrastructure.process - Process 'omega6-oxkintok-ring' exited cleanly.
log_entry
flow file…
# set the default Django settings module for this module.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "ppower.settings")
ppower_path = str(Path(__file__).resolve().parent.parent.parent.parent)
logger = get_run_logger()
<http://logger.info|logger.info>(ppower_path)
print(ppower_path)
sys.path.append(ppower_path)
django.setup()
from ppower.base.models import Organization
.info()
or print()
output in the tasks results I pasted above.try:
# ensure we are in the path and are the working directory
ppower_path = str(Path(__file__).resolve().parent.parent.parent.parent)
os.chdir(ppower_path)
sys.path.append(ppower_path)
# set the default Django settings module for this module.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "ppower.settings")
django.setup()
except:
print("**** Path ****", ppower_path)
19:25:35.497 | INFO | prefect.agent - Submitting flow run '32f36e7e-a51a-42d2-9138-a320330ffad4'
19:25:35.650 | INFO | prefect.infrastructure.process - Opening process 'omicron941-hupyria'...
19:25:35.657 | INFO | prefect.agent - Completed submission of flow run '32f36e7e-a51a-42d2-9138-a320330ffad4'
19:25:51.106 | ERROR | Flow run 'omicron941-hupyria' - Flow could not be retrieved from deployment.
Traceback (most recent call last):
File "<frozen importlib._bootstrap_external>", line 843, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "base/flows/log_entry.py", line 31, in <module>
from ppower.base.models import Organization
ModuleNotFoundError: No module named 'ppower'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/engine.py", line 257, in retrieve_flow_then_begin_flow_run
flow = await load_flow_from_flow_run(flow_run, client=client)
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/client/orion.py", line 82, in with_injected_client
return await fn(*args, **kwargs)
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/deployments.py", line 173, in load_flow_from_flow_run
flow = await run_sync_in_worker_thread(import_object, str(import_path))
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/utilities/importtools.py", line 193, in import_object
module = load_script_as_module(script_path)
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/utilities/importtools.py", line 156, in load_script_as_module
raise ScriptError(user_exc=exc, path=path) from exc
prefect.exceptions.ScriptError: Script at 'base/flows/log_entry.py' encountered an exception
Path /private/var/folders/sm/c1qt38h11hj0psj9bgjg4vy80000gn/T
19:25:55.121 | INFO | prefect.infrastructure.process - Process 'omicron941-hupyria' exited cleanly.
Ryan Peden
10/08/2022, 12:35 AMJarvis Stubblefield
10/08/2022, 1:07 AM