Jarvis Stubblefield
03/27/2023, 6:13 PM@flow(name="whatever")
or if I leave it blank. Same error persists. What brought it up as odd is that I have had a similar issue in the past where it complained about duplicate task / flow names. The error is in the thread along with the deployment code that’s being run.└─[$] <git:(feature/pei-forms*)> python -m leadership.flows.deployments.early_intervention
TRACE [2023-03-27 03:43:33] httpx._config - load_ssl_context verify=True cert=None trust_env=True http2=False
TRACE [2023-03-27 03:43:33] httpx._config - load_verify_locations cafile=/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.10/site-packages/certifi/cacert.pem
/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.10/site-packages/prefect/tasks.py:275: UserWarning: A task named 'send_stream_notification_leadership' and defined at '/Users/ballisticpain/Development/tenzinga/ppower/leadership/flows/counseling_form_followup.py:19' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:
`@task(name='my_unique_name', ...)`
warnings.warn(
/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.10/site-packages/prefect/tasks.py:275: UserWarning: A task named 'create_dashboard_alert_leadership' and defined at '/Users/ballisticpain/Development/tenzinga/ppower/leadership/flows/counseling_form_followup.py:45' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:
`@task(name='my_unique_name', ...)`
warnings.warn(
/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.10/site-packages/prefect/tasks.py:275: UserWarning: A task named 'send_email_notification_leadership' and defined at '/Users/ballisticpain/Development/tenzinga/ppower/leadership/flows/counseling_form_followup.py:67' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:
`@task(name='my_unique_name', ...)`
warnings.warn(
/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.10/site-packages/prefect/flows.py:214: UserWarning: A flow named 'alert-cf-followup-needed' and defined at '/Users/ballisticpain/Development/tenzinga/ppower/leadership/flows/counseling_form_followup.py:91' conflicts with another flow. Consider specifying a unique `name` parameter in the flow definition:
`@flow(name='my_unique_name', ...)`
warnings.warn(
Traceback (most recent call last):
File "/Users/ballisticpain/.pyenv/versions/3.10.2/lib/python3.10/runpy.py", line 196, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/Users/ballisticpain/.pyenv/versions/3.10.2/lib/python3.10/runpy.py", line 86, in _run_code
exec(code, run_globals)
File "/Users/ballisticpain/Development/tenzinga/ppower/leadership/flows/deployments/early_intervention.py", line 18, in <module>
deployment = Deployment.build_from_flow(
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 260, in coroutine_wrapper
return call()
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 245, in __call__
return self.result()
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
return self.future.result(timeout=timeout)
File "/Users/ballisticpain/.pyenv/versions/3.10.2/lib/python3.10/concurrent/futures/_base.py", line 439, in result
return self.__get_result()
File "/Users/ballisticpain/.pyenv/versions/3.10.2/lib/python3.10/concurrent/futures/_base.py", line 391, in __get_result
raise self._exception
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
result = await coro
File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.10/site-packages/prefect/deployments.py", line 738, in build_from_flow
deployment.flow_name = flow.name
AttributeError: module 'ppower.leadership.flows.alert_early_intervention' has no attribute 'name'
# -*- coding: utf-8
from __future__ import unicode_literals
import datetime
import json
import traceback
from collections import defaultdict
from dateutil.relativedelta import relativedelta
from django.conf import settings
from django.core.serializers.json import DjangoJSONEncoder
from django.db import transaction
from django.db.models import Q
from django.template.loader import render_to_string
from django.urls import reverse
from prefect import flow, task, get_run_logger
from tenzinga_django_setup import prepare_django_and_paths
tenzinga_path, ppower_path = prepare_django_and_paths()
from ppower.base.enums import AlertedDates
from ppower.base.models import HREUser
from ppower.base.signals.util import send_email_notification
from ppower.base.views.api.formio import get_json_submissions
from ppower.performance.notifications import EmailEarlyIntervention
@flow(name="alert_pei")
def alert_early_intervention(user_id):
"""
The flow that alerts leaders when their employees may need early intervention to help
@param user_id:
@return:
"""
logger = get_run_logger()
# TODO: Fill these from a configuration??
alert_days = 7
observation_length = 30
observation_timeframe = "days"
observation_alert_limit = 3
timeframe = {observation_timeframe: observation_length}
user = HREUser.objects.select_related("org", "team", "team__parent").get(pk=user_id)
try:
pei_observations = get_json_submissions("personnelearlyintervention", {
"data.organization": user.org_id,
"data.dateOfBehavior__gt": datetime.date.today() - relativedelta(**timeframe),
"data.employee": <http://user.pk|user.pk>,
})
observations = json.loads(pei_observations)
observation_count = len(observations)
except Exception:
observation_count = -1
observations = []
observation_dates = [str(observation["data"]["dateOfBehavior"])
for observation in observations]
if observation_count >= observation_alert_limit and not user.has_been_alerted(
AlertedDates.early_intervention.value,
recency=relativedelta(days=alert_days)
):
url = get_user_leadership_url(<http://user.pk|user.pk>)
ctx = get_email_context(observation_count, user.name, observation_length, observation_dates, url)
body = render_to_string("emails/early_intervention.tpl", ctx)
merge_data = get_merge_data_for_leaders(user.leaders(), ctx)
leader_count = len(merge_data.keys())
send_email_notification(
EmailEarlyIntervention,
about_item=user,
context=merge_data,
body=body
)
user.set_alerted(AlertedDates.early_intervention.value)
user.save()
<http://logger.info|logger.info>(
f"{leader_count} leaders were notified of Personnel Early Intervention logs for {user.name}"
)
@task
def get_user_leadership_url(user_id):
return settings.ACTIVATION_HOST + reverse(
"ppower.leadership.views.summary",
kwargs={"user_id": user_id}
)
@task
def get_email_context(log_count, username, num_days, log_dates, url):
return {
"count": log_count,
"employee": username,
"days": num_days,
"dates": ", ".join(log_dates),
"url": url,
}
@task
def get_merge_data_for_leaders(leaders, ctx):
merge_data = {}
for leader in leaders:
if leader.has_email_notifications():
merge_data[leader.email] = dict(**{"name": leader.name}, **ctx)
return merge_data
# -*- coding: utf-8
from __future__ import unicode_literals
import os
from pathlib import Path
from prefect.deployments import Deployment
from prefect.infrastructure import Process
from tenzinga_django_setup import prepare_django_and_paths
prepare_django_and_paths()
from ppower.leadership.flows import alert_early_intervention
# For this to work by picking up environment variables, deployments would need to be executed from production.
# Otherwise, we will need a mechanism to indicate an "environment" path.
command = ["pipenv", "run", "python", "-m", "prefect.engine"]
process = Process(command=command, working_dir=Path(os.environ.get("SRCDIR")).absolute())
deployment = Deployment.build_from_flow(
flow=alert_early_intervention,
name="alert_early_intervention",
description="Checks whether an employee's leaders should be notified about possible early intervention",
tags=("pei", "Personnel Early Intervention"),
work_queue_name="tenzinga_django_prod",
infrastructure=process
)
deployment.apply()
Version: 2.8.7
API version: 0.8.4
Python version: 3.10.2
Git commit: a6d6c6fc
Built: Thu, Mar 23, 2023 3:27 PM
OS/Arch: darwin/x86_64
Profile: tenzinga
Server type: cloud