https://prefect.io logo
Title
j

Jarvis Stubblefield

03/27/2023, 6:13 PM
So I created another flow … and I’m trying to run the deployment on my local box so I can then start an agent on my local box to test the flow. However, I’m receiving an error and it doesn’t matter if I add
@flow(name="whatever")
or if I leave it blank. Same error persists. What brought it up as odd is that I have had a similar issue in the past where it complained about duplicate task / flow names. The error is in the thread along with the deployment code that’s being run.
└─[$] <git:(feature/pei-forms*)> python -m leadership.flows.deployments.early_intervention
TRACE [2023-03-27 03:43:33] httpx._config - load_ssl_context verify=True cert=None trust_env=True http2=False
TRACE [2023-03-27 03:43:33] httpx._config - load_verify_locations cafile=/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.10/site-packages/certifi/cacert.pem
/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.10/site-packages/prefect/tasks.py:275: UserWarning: A task named 'send_stream_notification_leadership' and defined at '/Users/ballisticpain/Development/tenzinga/ppower/leadership/flows/counseling_form_followup.py:19' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:

 `@task(name='my_unique_name', ...)`
  warnings.warn(
/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.10/site-packages/prefect/tasks.py:275: UserWarning: A task named 'create_dashboard_alert_leadership' and defined at '/Users/ballisticpain/Development/tenzinga/ppower/leadership/flows/counseling_form_followup.py:45' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:

 `@task(name='my_unique_name', ...)`
  warnings.warn(
/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.10/site-packages/prefect/tasks.py:275: UserWarning: A task named 'send_email_notification_leadership' and defined at '/Users/ballisticpain/Development/tenzinga/ppower/leadership/flows/counseling_form_followup.py:67' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:

 `@task(name='my_unique_name', ...)`
  warnings.warn(
/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.10/site-packages/prefect/flows.py:214: UserWarning: A flow named 'alert-cf-followup-needed' and defined at '/Users/ballisticpain/Development/tenzinga/ppower/leadership/flows/counseling_form_followup.py:91' conflicts with another flow. Consider specifying a unique `name` parameter in the flow definition:

 `@flow(name='my_unique_name', ...)`
  warnings.warn(
Traceback (most recent call last):
  File "/Users/ballisticpain/.pyenv/versions/3.10.2/lib/python3.10/runpy.py", line 196, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/Users/ballisticpain/.pyenv/versions/3.10.2/lib/python3.10/runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "/Users/ballisticpain/Development/tenzinga/ppower/leadership/flows/deployments/early_intervention.py", line 18, in <module>
    deployment = Deployment.build_from_flow(
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 260, in coroutine_wrapper
    return call()
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 245, in __call__
    return self.result()
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.future.result(timeout=timeout)
  File "/Users/ballisticpain/.pyenv/versions/3.10.2/lib/python3.10/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/Users/ballisticpain/.pyenv/versions/3.10.2/lib/python3.10/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
  File "/Users/ballisticpain/.local/share/virtualenvs/ppower-D20JDXzL/lib/python3.10/site-packages/prefect/deployments.py", line 738, in build_from_flow
    deployment.flow_name = flow.name
AttributeError: module 'ppower.leadership.flows.alert_early_intervention' has no attribute 'name'
Code…
# -*- coding: utf-8
from __future__ import unicode_literals

import datetime
import json
import traceback

from collections import defaultdict
from dateutil.relativedelta import relativedelta
from django.conf import settings
from django.core.serializers.json import DjangoJSONEncoder
from django.db import transaction
from django.db.models import Q
from django.template.loader import render_to_string
from django.urls import reverse
from prefect import flow, task, get_run_logger

from tenzinga_django_setup import prepare_django_and_paths
tenzinga_path, ppower_path = prepare_django_and_paths()

from ppower.base.enums import AlertedDates
from ppower.base.models import HREUser
from ppower.base.signals.util import send_email_notification
from ppower.base.views.api.formio import get_json_submissions
from ppower.performance.notifications import EmailEarlyIntervention


@flow(name="alert_pei")
def alert_early_intervention(user_id):
    """
    The flow that alerts leaders when their employees may need early intervention to help
    @param user_id:
    @return:
    """
    logger = get_run_logger()

    # TODO: Fill these from a configuration??
    alert_days = 7
    observation_length = 30
    observation_timeframe = "days"
    observation_alert_limit = 3
    timeframe = {observation_timeframe: observation_length}

    user = HREUser.objects.select_related("org", "team", "team__parent").get(pk=user_id)

    try:
        pei_observations = get_json_submissions("personnelearlyintervention", {
            "data.organization": user.org_id,
            "data.dateOfBehavior__gt": datetime.date.today() - relativedelta(**timeframe),
            "data.employee": <http://user.pk|user.pk>,
        })
        observations = json.loads(pei_observations)
        observation_count = len(observations)
    except Exception:
        observation_count = -1
        observations = []

    observation_dates = [str(observation["data"]["dateOfBehavior"])
                         for observation in observations]

    if observation_count >= observation_alert_limit and not user.has_been_alerted(
        AlertedDates.early_intervention.value,
        recency=relativedelta(days=alert_days)
    ):
        url = get_user_leadership_url(<http://user.pk|user.pk>)
        ctx = get_email_context(observation_count, user.name, observation_length, observation_dates, url)
        body = render_to_string("emails/early_intervention.tpl", ctx)
        merge_data = get_merge_data_for_leaders(user.leaders(), ctx)
        leader_count = len(merge_data.keys())
        send_email_notification(
            EmailEarlyIntervention,
            about_item=user,
            context=merge_data,
            body=body
        )
        user.set_alerted(AlertedDates.early_intervention.value)
        user.save()

        <http://logger.info|logger.info>(
            f"{leader_count} leaders were notified of Personnel Early Intervention logs for {user.name}"
        )


@task
def get_user_leadership_url(user_id):
    return settings.ACTIVATION_HOST + reverse(
        "ppower.leadership.views.summary",
        kwargs={"user_id": user_id}
    )


@task
def get_email_context(log_count, username, num_days, log_dates, url):
    return {
        "count": log_count,
        "employee": username,
        "days": num_days,
        "dates": ", ".join(log_dates),
        "url": url,
    }


@task
def get_merge_data_for_leaders(leaders, ctx):
    merge_data = {}
    for leader in leaders:
        if leader.has_email_notifications():
            merge_data[leader.email] = dict(**{"name": leader.name}, **ctx)

    return merge_data
Deployment Code…
# -*- coding: utf-8
from __future__ import unicode_literals
import os
from pathlib import Path
from prefect.deployments import Deployment
from prefect.infrastructure import Process

from tenzinga_django_setup import prepare_django_and_paths
prepare_django_and_paths()

from ppower.leadership.flows import alert_early_intervention

# For this to work by picking up environment variables, deployments would need to be executed from production.
# Otherwise, we will need a mechanism to indicate an "environment" path.
command = ["pipenv", "run", "python", "-m", "prefect.engine"]
process = Process(command=command, working_dir=Path(os.environ.get("SRCDIR")).absolute())

deployment = Deployment.build_from_flow(
    flow=alert_early_intervention,
    name="alert_early_intervention",
    description="Checks whether an employee's leaders should be notified about possible early intervention",
    tags=("pei", "Personnel Early Intervention"),
    work_queue_name="tenzinga_django_prod",
    infrastructure=process
)
deployment.apply()
Previous but likely related thread… https://prefect-community.slack.com/archives/CL09KU1K7/p1671147721702699 I did not create a ticket for this issue at that time.
Prefect version…
Version:             2.8.7
API version:         0.8.4
Python version:      3.10.2
Git commit:          a6d6c6fc
Built:               Thu, Mar 23, 2023 3:27 PM
OS/Arch:             darwin/x86_64
Profile:             tenzinga
Server type:         cloud