I’m at a dead end. I really need to get Prefect ru...
# ask-community
j
I’m at a dead end. I really need to get Prefect running in Django. If anyone has any resources please share them… So far everything I have tried has failed. At this point I’ll re-try anything.
1
c
What exactly are you trying to setup
Django is a web framework, so I’m not exactly sure what you are setting up, and what isn’t working
j
Right now I’m simply trying to run a local agent (eventually to be on the server) orchestrated by Prefect Cloud.
I am using the Django ORM within my flows.
Or at least I’m attempting to.
c
ok, and what is the issue you are facing?
j
If I run the prefect agent (it starts up fine)… I have also printed the value of
os.environ.items()
and I definitely have my appropriate variables there… however, when it tries to run the flow it fails saying it cannot find
ppower.settings
.
I might attempt to pull out the
django.setup()
that seems to be required to run a flow within the framework to have access to my models and the ORM…
c
lets focus on a linear path - we are kind of jumping around here already. What exactly do you need to run ORM in Django? I’m guessing you are suggesting you need some environment variables set - why the agent though?
j
That post you mentioned is the next step after I solve this one. That one won’t matter if I cannot run Prefect with my Django.
c
the agent is responsible for submitting the execution of flow runs to the appropriate infrastructure, not executing them directly
j
Let me link you to my other post… it might help with clarity… 🙂
c
so any environment variables you need would need to be in the actual executing environment
j
O_o interesting… how do I control the executing environment? I need it all to be “Local” (at least I think I do).
This is the thread where I am discussing making this work and why with my Django… https://prefect-community.slack.com/archives/CL09KU1K7/p1665003068432039
👀 1
I’m looking to replace Celery with Prefect.
r
Hi Jarvis! After we last discussed this, I ran into a few issues when trying to make it work, but am going to try a few other things. I haven't had the chance to push through to a solution but am still working on it
j
@Ryan Peden Thanks for the update!
c
So if I’m understanding correctly, you are running a flow as a subprocess, locally using the agent, on your system. The agent is running out of here:
Copy code
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/engine.py", line 257, in retrieve_flow_then_begin_flow_run
Where is your flow located, and how was it registered?
Copy code
File "base/flows/log_entry.py", line 31, in <module>
    from ppower.base.models import Organization
ModuleNotFoundError: No module named 'ppower'
also, you can set up a logger to print by adding the import here: https://docs.prefect.io/concepts/logs/ You can add one to the flow and log, as well as to the task
Is there the possibility of running this in a docker image to isolate your runtime away so we can remove packages and paths from the equation, or is there a need to run this locally
j
So I don’t need to run it “locally” I need it to run on a production server.
We have not dockerized any of our environments so I wasn’t trying to crack that open just yet.
I use Docker in my local dev to run lots of things so I’m familiar and somewhat comfortable, but haven’t deployed anything Docker to production by myself.
I don’t really care where it runs so long as it has access to my Django environment so it can use the ORM …
If I cannot do that, then it makes no sense to use Prefect as I would have to re-write my models and model-logic into another system (SQLAlchemy)…
So… I execute my agent within the folder …
/user/files/tenzinga/ppower
… my flow code is located at
/user/files/tenzinga/ppower/base/flows/log_entry.py
I do have the logger setup within the tasks and flows.
I can’t get that far down my path.
This is the first 34 lines of the file… (before the tasks and flows) … this is where it fails when it attempts to import my Model…
Copy code
# -*- coding: utf-8
from __future__ import unicode_literals

import casefy
import csv
import datetime
import django
import os
from pathlib import Path
import sys
from dateutil.relativedelta import relativedelta
from django.db.models import Q
from prefect import flow, task, get_run_logger
from typing import List, Dict, Union
from prefect.cli import agent, app

try:

    # ensure we are in the path and are the working directory
    ppower_path = str(Path(__file__).resolve().parent.parent.parent.parent)
    os.chdir(ppower_path)
    sys.path.append(ppower_path)

    # set the default Django settings module for this module.
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "ppower.settings")
    django.setup()

except:

    print("**** Path ****", ppower_path)

from ppower.base.models import Organization
c
are you running with a virtual environment?
j
I am within a virtual environment.
c
and the agent?
j
prefect is installed within that environment.
So the agent is also running within there… I confirmed that by modifying the agent startup code and printing the `os.environ.items()`…
Let me create a flow run and give you the current output… the error has to do with some private
/tmp
directory…
Okay, now when I log into
<https://app.prefect.cloud>
my workspace and therefore my flow runs are all gone?
Give me a moment to sort that issue.
Shew… apparently it signed me in with a different email address (I have many google accounts lol)
Current output…
Copy code
Starting v2.5.0 agent connected to <https://api.prefect.cloud/api/accounts/5626ffe9-0140-4e88-babc-4a4fc614bb99/workspaces/ee8a533d-2754-420e-87f2-2d6b084984af>...

  ___ ___ ___ ___ ___ ___ _____     _   ___ ___ _  _ _____
 | _ \ _ \ __| __| __/ __|_   _|   /_\ / __| __| \| |_   _|
 |  _/   / _|| _|| _| (__  | |    / _ \ (_ | _|| .` | | |
 |_| |_|_\___|_| |___\___| |_|   /_/ \_\___|___|_|\_| |_|


Agent started! Looking for work from queue(s): tenzinga_django_dev...
14:21:48.308 | INFO    | prefect.agent - Submitting flow run 'b1f71fb1-16bb-491a-af91-94c82bf0eb23'
14:21:48.492 | INFO    | prefect.infrastructure.process - Opening process 'alpha11-sadr'...
14:21:48.499 | INFO    | prefect.agent - Completed submission of flow run 'b1f71fb1-16bb-491a-af91-94c82bf0eb23'
14:22:15.125 | ERROR   | Flow run 'alpha11-sadr' - Flow could not be retrieved from deployment.
Traceback (most recent call last):
  File "<frozen importlib._bootstrap_external>", line 843, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "base/flows/log_entry.py", line 31, in <module>
    from ppower.base.models import Organization
ModuleNotFoundError: No module named 'ppower'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/engine.py", line 257, in retrieve_flow_then_begin_flow_run
    flow = await load_flow_from_flow_run(flow_run, client=client)
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/client/orion.py", line 82, in with_injected_client
    return await fn(*args, **kwargs)
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/deployments.py", line 173, in load_flow_from_flow_run
    flow = await run_sync_in_worker_thread(import_object, str(import_path))
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/utilities/importtools.py", line 193, in import_object
    module = load_script_as_module(script_path)
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/prefect/utilities/importtools.py", line 156, in load_script_as_module
    raise ScriptError(user_exc=exc, path=path) from exc
prefect.exceptions.ScriptError: Script at 'base/flows/log_entry.py' encountered an exception
**** Path **** /private/var/folders/sm/c1qt38h11hj0psj9bgjg4vy80000gn/T
14:22:19.243 | INFO    | prefect.infrastructure.process - Process 'alpha11-sadr' exited cleanly.
Notice here the path is not what I expected…
**** Path **** /private/var/folders/sm/c1qt38h11hj0psj9bgjg4vy80000gn/T
So maybe part of my deployment I need to get more specific instead of letting it default to LocalStorage assuming it would use the location I deployed from.
c
So what I suspect is happening - if you didn’t configure any sort of storage block (even locally) for your deployment, you are leaving the agent to decide
The agent then tries to run your flow in a separate sub-path, and it has no contextual awareness of your site-packages, or your django environment, because it’s not running from your install / venv path, it’s running elsewhere
Have you configured this as a deployment? If you have, did you register / use a local storage block?
lastly, can you increase your debug level for the agent
export PREFECT_LOGGING_LEVEL="DEBUG"
j
I have configured this as a Deployment … currently just using the CLI, but I do believe I need to use code for my deployments as I need different “Deployments” due to a different parameter.
I didn’t do any registering of Storage nor the Infrastructure.
c
right, so I think that’s the issue
Copy code
import prefect
import sys
import os.path
from prefect import task, flow
from prefect import get_run_logger
from prefect.filesystems import Azure

@task
def log_platform_info():
    az_block = Azure.load("boydblock")
    import platform
    import sys
    from prefect.orion.api.server import ORION_API_VERSION
    logger = get_run_logger()
    <http://logger.info|logger.info>("Host's network name = %s", platform.node())
    <http://logger.info|logger.info>("Python version = %s", platform.python_version())
    <http://logger.info|logger.info>("Platform information (instance type) = %s ", platform.platform())
    <http://logger.info|logger.info>("OS/Arch = %s/%s", sys.platform, platform.machine())
    <http://logger.info|logger.info>("Prefect Version = %s 🚀", prefect.__version__)
    <http://logger.info|logger.info>("Prefect API Version = %s", ORION_API_VERSION)
    <http://logger.info|logger.info>(sys.path)
    <http://logger.info|logger.info>(os.path)
@flow
def healthcheck():
    log_platform_info()
if __name__ == "__main__":
    healthcheck()
j
Those were all defaulting to “Local / Process”…
c
this prints just some basic info, and your sys.path, and your os.path
right
you’re in a venv
and your agent is defaulting elsehwere because it has no storage or infra configured
which is why it works locally running it through prefect, but not through the agent
j
Thanks! That is helpful!!!
c
so my suggestion as next steps would be - run this, and see where your agent is running from. Generally, storage is registered with your flow so it can be retrieved along with the modules even locally
The output when I run this a local process (through an agent):
Copy code
16:04:34.515 | INFO    | Task run 'log_platform_info-afea9710-0' - Host's network name = Christophers-MacBook-Pro.local
16:04:34.516 | INFO    | Task run 'log_platform_info-afea9710-0' - Python version = 3.9.12
16:04:34.517 | INFO    | Task run 'log_platform_info-afea9710-0' - Platform information (instance type) = macOS-10.16-x86_64-i386-64bit
16:04:34.517 | INFO    | Task run 'log_platform_info-afea9710-0' - OS/Arch = darwin/x86_64
16:04:34.518 | INFO    | Task run 'log_platform_info-afea9710-0' - Prefect Version = 2.4.5 🚀
16:04:34.519 | INFO    | Task run 'log_platform_info-afea9710-0' - Prefect API Version = 0.8.1
16:04:34.521 | INFO    | Task run 'log_platform_info-afea9710-0' - ['.', '/private/var/folders/_s/1vczgxc97y5003d22b40kpd80000gn/T/tmpy3xcwy4wprefect', '/Users/christopherboyd/opt/anaconda3/lib/python39.zip', '/Users/christopherboyd/opt/anaconda3/lib/python3.9', '/Users/christopherboyd/opt/anaconda3/lib/python3.9/lib-dynload', '/Users/christopherboyd/opt/anaconda3/lib/python3.9/site-packages', '/Users/christopherboyd/opt/anaconda3/lib/python3.9/site-packages/aeosa']
16:04:34.522 | INFO    | Task run 'log_platform_info-afea9710-0' - <module 'posixpath' from '/Users/christopherboyd/opt/anaconda3/lib/python3.9/posixpath.py'>
16:04:34.846 | INFO    | Task run 'log_platform_info-afea9710-0' - Finished in state Completed()
16:04:35.192 | INFO    | Flow run 'glossy-gibbon' - Finished in state Completed('All states completed.')
16:04:37.743 | INFO    | prefect.infrastructure.process - Process 'glossy-gibbon' exited cleanly.
notice my /private/var/ path - because I ran it it without storage
if I add an import to this code, it will fail because it’s not at that path
j
So the above healthcheck I should remove the Azure block part right?
c
sure, that’ sjust residual from another test
j
Hehe no worries 🙂
So I need to create a deployment for this healthcheck…
c
prefect deployment build ./healthcheck.py:healthcheck -i process -n local_run -a
prefect deployment run healthcheck/local_run
j
O_o so that runs it without the agent right?
O_o I didn’t specify the process…
Copy code
Starting v2.5.0 agent connected to <https://api.prefect.cloud/api/accounts/5626ffe9-0140-4e88-babc-4a4fc614bb99/workspaces/ee8a533d-2754-420e-87f2-2d6b084984af>...
15:13:56.419 | DEBUG   | prefect.client - Connecting to API at <https://api.prefect.cloud/api/accounts/5626ffe9-0140-4e88-babc-4a4fc614bb99/workspaces/ee8a533d-2754-420e-87f2-2d6b084984af/>

  ___ ___ ___ ___ ___ ___ _____     _   ___ ___ _  _ _____
 | _ \ _ \ __| __| __/ __|_   _|   /_\ / __| __| \| |_   _|
 |  _/   / _|| _|| _| (__  | |    / _ \ (_ | _|| .` | | |
 |_| |_|_\___|_| |___\___| |_|   /_/ \_\___|___|_|\_| |_|


Agent started! Looking for work from queue(s): tenzinga_django_dev...
15:13:56.420 | DEBUG   | prefect.agent - Checking for flow runs...
15:13:56.573 | INFO    | prefect.agent - Submitting flow run '3a0840f6-9090-4615-9255-4091838469c8'
15:13:56.740 | INFO    | prefect.infrastructure.process - Opening process 'natural-skylark'...
15:13:56.741 | DEBUG   | prefect.infrastructure.process - Process 'natural-skylark' running command: /Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/bin/python -m prefect.engine in /var/folders/sm/c1qt38h11hj0psj9bgjg4vy80000gn/T/tmpmw68c6a0prefect
15:13:56.749 | INFO    | prefect.agent - Completed submission of flow run '3a0840f6-9090-4615-9255-4091838469c8'
15:13:58.416 | DEBUG   | prefect.client - Connecting to API at <https://api.prefect.cloud/api/accounts/5626ffe9-0140-4e88-babc-4a4fc614bb99/workspaces/ee8a533d-2754-420e-87f2-2d6b084984af/>
15:14:01.573 | DEBUG   | prefect.agent - Checking for flow runs...
15:14:06.626 | DEBUG   | prefect.agent - Checking for flow runs...
15:14:11.719 | DEBUG   | prefect.agent - Checking for flow runs...
15:14:13.649 | DEBUG   | Flow run 'natural-skylark' - Loading flow for deployment 'healthcheck'...
15:14:13.748 | DEBUG   | Flow run 'natural-skylark' - Starting 'ConcurrentTaskRunner'; submitted tasks will be run concurrently...
15:14:13.748 | DEBUG   | prefect.task_runner.concurrent - Starting task runner...
15:14:13.793 | DEBUG   | prefect.client - Connecting to API at <https://api.prefect.cloud/api/accounts/5626ffe9-0140-4e88-babc-4a4fc614bb99/workspaces/ee8a533d-2754-420e-87f2-2d6b084984af/>
15:14:14.281 | DEBUG   | Flow run 'natural-skylark' - Executing flow 'healthcheck' for flow run 'natural-skylark'...
15:14:14.281 | DEBUG   | Flow run 'natural-skylark' - Beginning execution...
15:14:14.401 | INFO    | Flow run 'natural-skylark' - Created task run 'log_platform_info-afea9710-0' for task 'log_platform_info'
15:14:14.401 | INFO    | Flow run 'natural-skylark' - Executing 'log_platform_info-afea9710-0' immediately...
15:14:15.384 | DEBUG   | Task run 'log_platform_info-afea9710-0' - Beginning execution...
15:14:15.384 | INFO    | Task run 'log_platform_info-afea9710-0' - Host's network name = <http://BallisticDevelopment.attlocal.net|BallisticDevelopment.attlocal.net>
15:14:15.384 | INFO    | Task run 'log_platform_info-afea9710-0' - Python version = 3.8.12
15:14:15.398 | INFO    | Task run 'log_platform_info-afea9710-0' - Platform information (instance type) = macOS-10.16-x86_64-i386-64bit
15:14:15.400 | INFO    | Task run 'log_platform_info-afea9710-0' - OS/Arch = darwin/x86_64
15:14:15.400 | INFO    | Task run 'log_platform_info-afea9710-0' - Prefect Version = 2.5.0 🚀
15:14:15.400 | INFO    | Task run 'log_platform_info-afea9710-0' - Prefect API Version = 0.8.2
15:14:15.400 | INFO    | Task run 'log_platform_info-afea9710-0' - ['.', '/private/var/folders/sm/c1qt38h11hj0psj9bgjg4vy80000gn/T/tmpmw68c6a0prefect', '/Users/ballisticpain/.pyenv/versions/anaconda3-2020.11/lib/python38.zip', '/Users/ballisticpain/.pyenv/versions/anaconda3-2020.11/lib/python3.8', '/Users/ballisticpain/.pyenv/versions/anaconda3-2020.11/lib/python3.8/lib-dynload', '/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages']
15:14:15.401 | INFO    | Task run 'log_platform_info-afea9710-0' - <module 'posixpath' from '/Users/ballisticpain/.pyenv/versions/anaconda3-2020.11/lib/python3.8/posixpath.py'>
15:14:15.463 | INFO    | Task run 'log_platform_info-afea9710-0' - Finished in state Completed()
15:14:15.526 | DEBUG   | prefect.task_runner.concurrent - Shutting down task runner...
15:14:15.527 | INFO    | Flow run 'natural-skylark' - Finished in state Completed('All states completed.')
15:14:15.574 | DEBUG   | prefect.client - Connecting to API at <https://api.prefect.cloud/api/accounts/5626ffe9-0140-4e88-babc-4a4fc614bb99/workspaces/ee8a533d-2754-420e-87f2-2d6b084984af/>
15:14:19.446 | INFO    | prefect.infrastructure.process - Process 'natural-skylark' exited cleanly.
So maybe I should start creating a deployment script? … this first flow I have created I need it to run once a month (schedule) for a few organizations… so I need to create a deployment per org…
c
do you have any sort of remote storage
s3, azure something like that
j
We are operating on AWS and I do have an S3 bucket for Dev/Staging and Prod.
c
here’s what I would do
Create an s3 block:
you’ll need a name, the bucket path, access key, and secret access key
once that’s saved and registered, go to where your flow lives
locally, in your terminal
build and apply your deployment - you can still run with the process , and just specify the storageblock via -sb s3/<name>
when the flow run executes at that point, the process will still divert to
/private/var/folders/sm/c1qt38h11hj0psj9bgjg4vy80000gn/T/tmpmw68c6a0prefect
, but it will pull the flow code from your storage block
j
So that would mean I would need to move my entire codebase into the S3 storage block?
c
where does it live currently
j
Local, Servers, and GitLab.
The S3 block would then need to be updated…
This is where I think it ties into that discussion that you brought up..
c
yea, I think maybe we just table this discussion until we can have a call
j
I’m not sure setting up another place to host the code is ideal, but I’m willing to test and make it work 🙂
I have a call scheduled for tomorrow…
Can I setup a LocalStorageBlock that simply points to my current code location?
c
sure
j
Okay, I’ll try that. 🙂
c
I think I can check with the team the value of a youtube video or a short tutorial on setting up an entirely local dev environment with local blocks and imports
👍 1
j
I would love to have this part working by the call tomorrow so I can focus on the next steps 🙂
That would be immensely helpful I think… 🙂
This is the part of the documentation of Deployments that makes it seem that I do not need to specify a LocalStorageBlock to make this work…
Copy code
storage: An optional remote storage block used to store and retrieve this workflow;
            if not provided, will default to referencing this flow by its local path
c
what is the path for this module:
Copy code
ppower.base.models
or
ppower
j
~/Development/tenzinga/ppower
So then the modes are in…
Copy code
~/Development/tenzinga/ppower/base/models
Some of this headache could be due to the structure of the Django project being slightly off… usually it would be something more like this …
Copy code
~/Development/tenzinga/ppower/ppower
and then the models would be…
Copy code
~/Development/tenzinga/ppower/ppower/base/models
I’ve always been able to work around that..
c
When I look at your path, none of those are in your path
Copy code
15:14:15.400 | INFO    | Task run 'log_platform_info-afea9710-0' - [
'.', 
'/private/var/folders/sm/c1qt38h11hj0psj9bgjg4vy80000gn/T/tmpmw68c6a0prefect', 
'/Users/ballisticpain/.pyenv/versions/anaconda3-2020.11/lib/python38.zip', 
'/Users/ballisticpain/.pyenv/versions/anaconda3-2020.11/lib/python3.8', 
'/Users/ballisticpain/.pyenv/versions/anaconda3-2020.11/lib/python3.8/lib-dynload', '/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages']
If that’s where that module lives, then this should probably be part of your path
Copy code
~/Development/tenzinga/ppower/
j
If you notice at the top of my flow I’m trying to make that happen.
Specifically this bit…
Copy code
try:

    # ensure we are in the path and are the working directory
    ppower_path = str(Path(__file__).resolve().parent.parent.parent.parent)
    os.chdir(ppower_path)
    sys.path.append(ppower_path)

    # set the default Django settings module for this module.
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "ppower.settings")
    django.setup()

except:

    print("**** Path ****", ppower_path)
c
have you tried hardcoding it?
explicitly, sys.path.append(‘~/Development/tenzinga/ppower/’)
j
I’ll try that…
c
then printing your sys.path to see it take ; also updating your actual pythonpath, to ensure the agent picks it up
j
On production it’d be something different, but if we can get something working it’ll help me move this forward 🙂
Okay… now that looks a bit like success … I added the absolute path…
/Users/ballisticpain/Development/tenzinga
… it needs that path so it can find
ppower.settings
Now I have another issue … 🙂
This is a positive step…
🎉 1
So the output is too long…
This is the bottom…
Copy code
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/fastapi/encoders.py", line 117 in jsonable_encoder
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/fastapi/encoders.py", line 161 in jsonable_encoder
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/fastapi/encoders.py", line 117 in jsonable_encoder
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/fastapi/encoders.py", line 161 in jsonable_encoder
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.8/site-packages/fastapi/encoders.py", line 117 in jsonable_encoder
  ...
16:19:08.905 | ERROR   | prefect.infrastructure.process - Process 'alpha5-nishina-x' exited with status code: -6
That’s the end of a thread dump?
I’m also still not sure why it is using part of the system python versus my project python.
I’m guessing that’s where I need to build the Process bit to use the appropriate stuff…
Here’s the initial output before the “thread” stuff…
Copy code
16:18:30.283 | INFO    | prefect.agent - Submitting flow run '484b0891-c63d-40a3-bfac-213bd8e6b1a5'
16:18:30.448 | INFO    | prefect.infrastructure.process - Opening process 'alpha5-nishina-x'...
16:18:30.448 | DEBUG   | prefect.infrastructure.process - Process 'alpha5-nishina-x' running command: /Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/bin/python -m prefect.engine in /var/folders/sm/c1qt38h11hj0psj9bgjg4vy80000gn/T/tmplb8ye0anprefect
16:18:30.456 | INFO    | prefect.agent - Completed submission of flow run '484b0891-c63d-40a3-bfac-213bd8e6b1a5'
16:18:32.078 | DEBUG   | prefect.client - Connecting to API at <https://api.prefect.cloud/api/accounts/5626ffe9-0140-4e88-babc-4a4fc614bb99/workspaces/ee8a533d-2754-420e-87f2-2d6b084984af/>
16:18:35.284 | DEBUG   | prefect.agent - Checking for flow runs...
16:18:40.662 | DEBUG   | prefect.agent - Checking for flow runs...
16:18:44.983 | DEBUG   | Flow run 'alpha5-nishina-x' - Loading flow for deployment 'logs-for-month-LBS'...
21:18:45.413 | DEBUG   | prefect.client - Connecting to API at <https://api.prefect.cloud/api/accounts/5626ffe9-0140-4e88-babc-4a4fc614bb99/workspaces/ee8a533d-2754-420e-87f2-2d6b084984af/>
16:18:45.750 | DEBUG   | prefect.agent - Checking for flow runs...
21:18:46.809 | DEBUG   | Flow run 'alpha5-nishina-x' - Starting 'ConcurrentTaskRunner'; submitted tasks will be run concurrently...
21:18:46.809 | DEBUG   | prefect.task_runner.concurrent - Starting task runner...
21:18:46.850 | DEBUG   | prefect.client - Connecting to API at <https://api.prefect.cloud/api/accounts/5626ffe9-0140-4e88-babc-4a4fc614bb99/workspaces/ee8a533d-2754-420e-87f2-2d6b084984af/>
21:18:47.233 | DEBUG   | Flow run 'alpha5-nishina-x' - Executing flow 'logs-for-month' for flow run 'alpha5-nishina-x'...
21:18:47.234 | DEBUG   | Flow run 'alpha5-nishina-x' - Beginning execution...
21:18:47.315 | INFO    | Flow run 'alpha5-nishina-x' - Created task run 'get_org_from_id-df9a85aa-0' for task 'get_org_from_id'
21:18:47.316 | INFO    | Flow run 'alpha5-nishina-x' - Executing 'get_org_from_id-df9a85aa-0' immediately...
21:18:47.464 | DEBUG   | Task run 'get_org_from_id-df9a85aa-0' - Beginning execution...
21:18:47.548 | DEBUG   | prefect.client - Connecting to API at <https://api.prefect.cloud/api/accounts/5626ffe9-0140-4e88-babc-4a4fc614bb99/workspaces/ee8a533d-2754-420e-87f2-2d6b084984af/>
21:18:47.591 | INFO    | Task run 'get_org_from_id-df9a85aa-0' - Finished in state Completed()
21:18:47.591 | INFO    | Flow run 'alpha5-nishina-x' - Creating report for Leonard Bus Sales for 07-2021
21:18:47.661 | INFO    | Flow run 'alpha5-nishina-x' - Created task run 'get_users_from_org-9dc7ab54-0' for task 'get_users_from_org'
21:18:47.661 | INFO    | Flow run 'alpha5-nishina-x' - Executing 'get_users_from_org-9dc7ab54-0' immediately...
21:18:47.828 | DEBUG   | Task run 'get_users_from_org-9dc7ab54-0' - Beginning execution...
21:18:48.077 | INFO    | Task run 'get_users_from_org-9dc7ab54-0' - Finished in state Completed()
21:18:48.142 | INFO    | Flow run 'alpha5-nishina-x' - Created task run 'get_leaders_from_users-fd44ef78-0' for task 'get_leaders_from_users'
21:18:48.143 | INFO    | Flow run 'alpha5-nishina-x' - Executing 'get_leaders_from_users-fd44ef78-0' immediately...
21:18:48.296 | DEBUG   | Task run 'get_leaders_from_users-fd44ef78-0' - Beginning execution...
21:18:49.677 | DEBUG   | prefect.client - Connecting to API at <https://api.prefect.cloud/api/accounts/5626ffe9-0140-4e88-babc-4a4fc614bb99/workspaces/ee8a533d-2754-420e-87f2-2d6b084984af/>
16:18:50.853 | DEBUG   | prefect.agent - Checking for flow runs...
21:18:51.154 | INFO    | Task run 'get_leaders_from_users-fd44ef78-0' - Finished in state Completed()
21:18:51.328 | DEBUG   | Flow run 'alpha5-nishina-x' - Resolving inputs to 'get-log-report'
21:18:51.856 | DEBUG   | prefect.client - Connecting to API at <https://api.prefect.cloud/api/accounts/5626ffe9-0140-4e88-babc-4a4fc614bb99/workspaces/ee8a533d-2754-420e-87f2-2d6b084984af/>
21:18:51.984 | DEBUG   | prefect.flows - Parameter 'leader' for flow 'get-log-report' is of unserializable type 'HREUser' and will not be stored in the backend.
16:18:55.945 | DEBUG   | prefect.agent - Checking for flow runs...
16:19:01.034 | DEBUG   | prefect.agent - Checking for flow runs...
Fatal Python error: Cannot recover from stack overflow.
Python runtime state: initialized
Now with that output it may be easier to have the code of the flows/tasks…
Copy code
# -*- coding: utf-8
from __future__ import unicode_literals

import casefy
import csv
import datetime
import django
import os
from pathlib import Path
import sys
from dateutil.relativedelta import relativedelta
from django.db.models import Q
from prefect import flow, task, get_run_logger
from typing import List, Dict, Union

try:

    # ensure we are in the path and are the working directory
    # ppower_path = str(Path(__file__).resolve().parent.parent.parent.parent)
    # os.chdir(ppower_path)
    # ppower_path = Path("/Users/ballisticpain/Development/tenzinga/ppower")
    sys.path.append("/Users/ballisticpain/Development/tenzinga")
    sys.path.append("/Users/ballisticpain/Development/tenzinga/ppower")

    # set the default Django settings module for this module.
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "ppower.settings")
    django.setup()

except:

    # print("**** Path ****", ppower_path)
    print("**** SYS Path ****", sys.path)

from ppower.base.models import Organization


@task
def get_org_from_id(org_id):
    return Organization.objects.get(pk=org_id)


@task
def get_users_from_org(org):
    return org.hreuser_set.all()


@task
def get_leaders_from_users(users) -> List:
    return [person for person in users if person.is_leader()]


@task
def get_org_name_snakecase(org) -> str:
    state_abbreviations = [
        "AK", "AL", "AR", "AZ", "CA", "CO", "CT", "DC", "DE", "FL", "GA",
        "HI", "IA", "ID", "IL", "IN", "KS", "KY", "LA", "MA", "MD", "ME",
        "MI", "MN", "MO", "MS", "MT", "NC", "ND", "NE", "NH", "NJ", "NM",
        "NV", "NY", "OH", "OK", "OR", "PA", "RI", "SC", "SD", "TN", "TX",
        "UT", "VA", "VT", "WA", "WI", "WV", "WY"
    ]
    keep_together = state_abbreviations + [
        "IAFC", "EMS", "ETSB", "FFCA", "SHARP", "EMT", "ESD", "FM2", "TAUD", "SEAFC", "US", "WASCON", "MMY", "M25",
        "CUDRC",
    ]

    return casefy.snakecase(org.name, keep_together=keep_together)


@task
def get_filename_for(org_name, month, year) -> str:
    return f"{org_name}_logs_report_{year}_{str(month).zfill(2)}.csv"


@task
def get_measurement(standard, score: int) -> str:
    if score == 1:
        return standard.bsp
    elif score == 3:
        return standard.sp
    elif score == 5:
        return standard.asp
    else:
        return 'Unknown Measurement'


@task
def get_string_score(score: int) -> str:
    if score == 1:
        return 'Below Standard'
    elif score == 3:
        return 'Standard'
    elif score == 5:
        return 'Above Standard'
    else:
        return 'Unknown Score'


@flow
def get_log_report(leader, log) -> Dict:
    return {
        'leader': leader.name,
        'employee': log.employee.name,
        'position': str(log.employee.position),
        'task': log.task.name,
        'standard': log.standard.name,
        'measurement': get_measurement(log.standard, log.score),
        'score': get_string_score(log.score),
        'log_for_date': str(log.date),
        'log_created_date': str(log.last_modified),
        'log_text': log.comment,
        'log_why_edit_text': log.edit_comment,
        'log_employee_text': log.employee_comment,
    }


@task
def write_log_report_to_file(filename: str, fieldnames: List[str], logs_report: List[dict]) -> None:
    with open(filename, 'w') as csvfile:
        # This is for MS Excel
        csvfile.write(u'\ufeff')
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        for report in logs_report:
            writer.writerow(report)


@flow
def logs_for_month(org_id: Union[int, str], month: int = None, year: int = None) -> None:
    logger = get_run_logger()
    last_month_date = datetime.date.today() - relativedelta(months=1)
    month = last_month_date.month if month is None else month
    year = last_month_date.year if year is None else year
    # TODO: Decide if we want this negative engineering for a better looking and precise error?
    # TODO: (cont) Could leave this out as a way to force an error without catching it as inevitably code will do.
    # if len(str(year)) != 4:
    #    raise CommandError("When specified, the '--year' parameter must be a 4-digit year.")

    org = get_org_from_id(org_id)
    # output information about the report we are expecting
    <http://logger.info|logger.info>(f"Creating report for {org.name} for {str(month).zfill(2)}-{year}")

    users = get_users_from_org(org)
    leaders = get_leaders_from_users(users)
    q_filter = Q(date__month=month, date__year=year) | Q(last_modified__month=month, last_modified__year=year)

    logs_report = []
    fieldnames = []
    # For each leader, get the log data
    for leader in leaders:
        for log in leader.entered_logs.filter(q_filter).select_related(
                "task", "standard", "employee", "employee__position"
        ):
            report = get_log_report(leader, log)
            if not fieldnames:
                fieldnames = list(report.keys())
            logs_report.append(report)

    org_name = get_org_name_snakecase(org)
    filename = get_filename_for(org_name, month, year)

    write_log_report_to_file(filename, fieldnames, logs_report)

    <http://logger.info|logger.info>(f"Finished writing report to '{filename}'")

    # TODO: Send an email with the file as an attachment.
Success!! Thank you for your efforts on this with me!! I’m looking forward to my meeting with Prefect today in addition to my meeting next week with my Celery use-case client.