https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • p

    Paco Ibañez

    02/22/2023, 10:29 PM
    Hello! when calling
    run_deployment
    how can I minimize the time that the agent will take to pick up the new flow run?
    r
    2 replies · 2 participants
  • j

    John Horn

    02/22/2023, 11:40 PM
    # prefect version
    23:39:04.743 | DEBUG   | prefect.client - Connecting to API at <https://api.prefect.cloud/api/accounts/57e0246d-d752-4737-b29a-192e8f6eb886/workspaces/6d439078-f534-4a0a-89e0-9c4ced644d6a/>
    Version:             2.5.0
    API version:         0.8.2
    Python version:      3.8.16
    Git commit:          eac37918
    Built:               Thu, Oct 6, 2022 12:41 PM
    OS/Arch:             linux/aarch64
    Profile:             default
    Server type:         cloud
    I am trying to create a deployment that calls a flow with a flow_run_name parameter. I am guessing that my prefect version needs to be updated however I don't know how to upgrade the prefect cloud version. My actual local prefect agent is using the latest 2.8.2 but I'm guessing it is prefect cloud that is stuck on 2.5.0 and I see no documentation on how to update that.
    @flow(flow_run_name='testing123')
    TypeError: flow() got an unexpected keyword argument 'flow_run_name'
    n
    19 replies · 2 participants
  • j

    Jarvis Stubblefield

    02/23/2023, 1:15 AM
    Okay… so I’ve been running flows successfully for about 2 months or so … last night I decided to try to create a new one … have run into some issues, etc. Fixed all of that… now I received on what I believed to be my final run of this flow an error on the Prefect Cloud side of things… I don’t know what it means so I’m posting it here for some insight. Do I just retry my flow run or have I hit some limit?
    prefect.exceptions.PrefectHTTPStatusError: Server error '500 Internal Server Error' for url '<https://api.prefect.cloud/api/accounts/5626ffe9-0140-4e88-babc-4a4fc614bb99/workspaces/ee8a533d-2754-420e-87f2-2d6b084984af/flow_runs/>'
    Response: {'exception_message': 'Internal Server Error'}
    For more information check: <https://httpstatuses.com/500>
    n
    4 replies · 2 participants
  • s

    Samuel Hinton

    02/23/2023, 2:06 AM
    Hi team! I think I may have found a bug or improvement for flow deployments. When defining a schedule, one of the options is using the RRule as per https://docs.prefect.io/api-ref/server/schemas/schedules/#prefect.server.schemas.schedules.RRuleSchedule However, this doesn’t appear compatible with python’s ZoneInfo way of defining timezones:
    from zoneinfo import ZoneInfo
    from dateutil.rrule import MINUTELY, rrule
    from prefect import flow
    from prefect.deployments import Deployment
    from prefect.server.schemas.schedules import RRuleSchedule
    
    london = ZoneInfo("Europe/London")
    
    @flow
    def some_flow():
        print("whoa")
    
    Deployment.build_from_flow(
        flow=some_flow, 
        schedule=RRuleSchedule.from_rrule(
            rrule(
                freq=MINUTELY,
                interval=10,
                dtstart=dt(2020, 1, 1, tzinfo=london),
                byhour=range(8, 9),
            )
        ),
    )
    Naively, I’d expect this to work, but it fails because ZoneInfo doesnt have a name attribute
    schedule=RRuleSchedule.from_rrule(
      File "/Users/sh/Projects/flows/.venv/lib/python3.9/site-packages/prefect/server/schemas/schedules.py", line 410, in from_rrule
        timezone = rrule._dtstart.tzinfo.name
    AttributeError: 'zoneinfo.ZoneInfo' object has no attribute 'name'
    Theres an easy workaround here (like setting the timezone name explicitly like is done in schedules.py:413, but thought it might be good to get this working with pythons new standard way of handling timezones, especially because the old
    from datetime import timezone
    requires specifying an hour delta which actually doesnt get used by prefect2
  • s

    Samuel Hinton

    02/23/2023, 2:26 AM
    Another question - what is the best practise about setting PREFECT_API_URL from code?
    prefect.settings.PREFECT_API_URL
    has getters but not setters and I dont want to just go under the hood. Using
    os.environ
    doesnt seem to work, because the env var is set after importing prefect (use case is we have dev and prod versions of prefect and we want to loop over envs and upload a common flow to all of them)
    n
    2 replies · 2 participants
  • s

    Samuel Hinton

    02/23/2023, 2:58 AM
    Another fun one with multiple deployments in one project. It seems like if we set up an S3 bucket for storage, each deployment uploads the entire project. For example, if I have
    flows
    folder filled with 20 python files, each with, say, 2 flows inside them, if I try to deploy the 20 flows, every single deployment will reupload all 20 files, which is incredibly slow. Is there a nicer way of doing this, such that I can upload everything just a single time? I see theres a
    skip_upload
    I could use, but it feels a bit of an anti-pattern for me to use a bool flag like “already_uploaded” to toggle the skip_upload flag
  • j

    Jenia Varavva

    02/23/2023, 3:23 AM
    Hi, I like the flow run timeline view: it’s slick and useful. However, when a flow goes over a few hundred tasks, it hangs the UI. Is there some option to disable it? Or is the best option to downgrade?
  • w

    Walter Cavinaw

    02/23/2023, 5:11 AM
    trying to log into prefect cloud, but a number of errors:
    j
    2 replies · 2 participants
  • d

    Dmytro Ponomarchuk

    02/23/2023, 8:36 AM
    Hi there, Could anybody help me to reset an
    Unhealthy
    Work Queue? It just stopped processing any new flows without any reason. And no errors in the Agent log which is running on AWS ECS infrastructure
  • t

    Tolga Karahan

    02/23/2023, 9:04 AM
    Hi guys. In what format base job manifest should be supplied in the UI? I'm trying this one got validation error while deploying: `RuntimeError: Unable to load 'kubernetes-custom-job' of block type None due to failed validation. To load without validation, try loading again with
    validate=False
    .`
    2 replies · 1 participant
  • s

    Samuel Bunce

    02/23/2023, 9:15 AM
    hi prefect, we had the following error last night
    prefect\client\base.py", line 130, in raise_for_status
        raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
    prefect.exceptions.PrefectHTTPStatusError: Server error '502 Bad Gateway' for url
    '<https://api.prefect.cloud/api/accounts/xxxxx/flow_runs/filter>'
    For more information check: <https://httpstatuses.com/502> 
    Agent stopped!
    which we believe is caused by a failure to communicate with the cloud instance. We have a default argument for number of tries, which is set here to 3 https://github.com/PrefectHQ/prefect/blob/42e5a978390575b8bd6ef4c258ba703f7e28fca2/src/prefect/utilities/services.py#L18 - ideally we would like to be able to reconfigure it or set it to infinite so that when our agents cannot communicate with cloud, they don't all collapse - we had to restart everything this morning. As another point, this obviously generated hundreds of late flows. Why is there no 'select all' function on the UI, or capability to remove all late flows. The best I have been able to do so far is to go through and manually click to remove all late flows. Feel like there should be more functionality for actions on batches of flow runs
  • s

    Stephen Lloyd

    02/23/2023, 11:32 AM
    How are folks managing versions of custom utilities modules. We build a separate docker image for every flow. We don't want to rebuild every image for any change to the utilities module. Does anyone have a similar setup and how are you handling changes?
  • t

    Tolga Karahan

    02/23/2023, 2:03 PM
    Hi guys. How can I use CLI to create a block from a custom template like doing in Python? https://prefect-community.slack.com/archives/CL09KU1K7/p1677150966414349?thread_ts=1677143084.199779&amp;cid=CL09KU1K7
  • f

    flapili

    02/23/2023, 2:32 PM
    Hi, how can I get task result from api ? I only get a reference from api and I don't know how to retreve the state from pure python code :/
    "data": {
        "type": "reference",
        "serializer_type": "compressed/json",
        "storage_block_id": "e3311389-34e1-491c-aaef-cbcf6e6ea977",
        "storage_key": "e4940b1d2ada42a7a1ac77710cae4d18"
      },
    n
    4 replies · 2 participants
  • s

    Sam Garvis

    02/23/2023, 3:02 PM
    Where are the values.yaml examples in the Prefect2.0 GitHub? I see that there were recent changes to work queues/pools etc and I'm guessing it would break new values.yaml files
    1 reply · 1 participant
  • s

    Steph Clacksman

    02/23/2023, 3:31 PM
    How can I create an s3 storage block with a session token? Is it possible to create an s3 storage block without using the UI?
    d
    2 replies · 2 participants
  • m

    Marcel

    02/23/2023, 3:46 PM
    Hi all, is there any guide on handling db migrations - like up-/downgrading the prefect database after updating prefect? I haven't found one in the current docs or on GH. Unfortunately, the @Ryan Peden's linked thread is already archived. So, any tips would be greatly appreciated! https://prefect-community.slack.com/archives/C03D12VV4NN/p1671204457213779?thread_ts=1671203596.984439&amp;cid=C03D12VV4NN
  • k

    Kyle Austin

    02/23/2023, 3:46 PM
    I have a question about working with the ConcurrentTaskRunner. I have a flow where I have a potentially large number (in test runs I am using 5k many) of independent implementations of a simple async task that I would like to submit all to the task runner so it can work on them concurrently. The flow is submitting all 5k to the task runner at the same time and I am getting many many warning messages like
    10:44:28.065 | WARNING | Task run 'email_send_message-357' - Task run '52fa4b9c-deb0-407c-a992-ccde4685dfcd' received abort during orchestration: The enclosing flow must be running to begin task execution. Task run is in PENDING state.
    I almost always I am getting the following error message
    prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
    I am setting
    persist_result=True
    in all my task decorators too. Plus I have set concurrency limits tags so none of these tasks have more than 30 running at once. But all 5k tasks are still being submitted and created all at once! Here is kinda how the code looks like in the flow now
    humana_smtp = EmailServerCredentials.load("some-smtp-server")
    for email in emails:
        email_send_message.submit(
            subject=email.subject,
            msg=email.rendered_html_template,
            email_server_credentials=humana_smtp,
            email_from=email.email_from,
            email_to=<http://email.to|email.to>,
            email_to_cc=<http://email.cc|email.cc>,
            email_to_bcc=email.bcc,
            attachments=email.attachments,
            images=email.images,
            dry_run=dry_run,
        )
    I have done something like this to prevent it from submitting all 5k at once and throttle it down to working with 50 at a time
    email_chunks_for_sending = chunkify(emails, 50)
    humana_smtp = EmailServerCredentials.load("some-smtp-server")
    for chunk in email_chunks_for_sending:
        wait_for_complete_object = []
        for email in chunk:
            sent = email_send_message.submit(
                subject=email.subject,
                msg=email.rendered_html_template,
                email_server_credentials=humana_smtp,
                email_from=email.email_from,
                email_to=<http://email.to|email.to>,
                email_to_cc=<http://email.cc|email.cc>,
                email_to_bcc=email.bcc,
                attachments=email.attachments,
                images=email.images,
                dry_run=dry_run,
            )
            wait_for_complete_object.append(sent)
        [future.result() for future in wait_for_complete_object]
    here chunkify I stole from another post on slack looks like
    def chunkify(xs, size):
    
        return (xs[pos : pos + size] for pos in range(0, len(xs), size))
    Is there a way to set a limit to the number of tasks that are submitted to the task runner at a given time? Task concurrency didnt do the trick for me -- it only prevented the number of tasks running at given time.
    👀 1
    ✅ 1
    t
    a
    +1
    17 replies · 4 participants
  • j

    Jean-Michel Provencher

    02/23/2023, 4:21 PM
    Is it possible to create notifications using the Python API ? I don't see any references to Notifications in the Python Doc.
    ✅ 1
    d
    j
    4 replies · 3 participants
  • l

    Leon Kozlowski

    02/23/2023, 5:42 PM
    Is the prefect agent supposed to clean up completed pods when running on kubernetes? I see tons of pods in a completed state (prefect version 2.7.10 for my agent)
    ✅ 1
    m
    1 reply · 2 participants
  • j

    Jacob Bedard

    02/23/2023, 6:09 PM
    Is there a "migrating to prefect 2" channel?
  • j

    Jacob Bedard

    02/23/2023, 6:15 PM
    I'm having trouble wrapping my head around all these various blocks, and there isn't a lot of great content out there, in my opinion. Everyone who does the prefect 2 videos sounds like they're very close to the product, and it's honestly a bit off-putting because they don't use any terminology from prefect cloud v1. Am I correct in understanding a setup could work like this: • I set up a repo for my prefect flows • I tie that repo to an S3 bucket or other storage location so it receives all updates and new files from the master/prod branch • I save a reference to this storage location as a prefect block • when I make a new flow, I can still test locally (I'm clear on this part at least), then when I'm ready to push to production, I push my code to my repo ◦ my repo updates the code storage location (S3 in my example) • I then create a deployment, and in the deployment, I state which named flow function is to be used from which file within the storage location, and I do this using a prefect "storage block" • the prefect cloud will then tell my agent (let's do a simple example and say I have an EC2 worker) that there is a new job to run • the worker, using the storage location specified in the deployment, will reach into the s3 bucket, and execute that flow
    r
    n
    +1
    21 replies · 4 participants
  • a

    Aric Huang

    02/23/2023, 7:15 PM
    Hi, any known issues accessing cloud.prefect.io right now? It's stuck on a loading indicator for me, and I see the following errors in console. Same issue after clearing cache/trying in incognito mode, and on different devices.
    :upvote: 3
    ✅ 1
    m
    a
    7 replies · 3 participants
  • g

    Ghislain Picard

    02/23/2023, 7:42 PM
    Prefect observability
  • b

    Bianca Hoch

    02/23/2023, 8:16 PM
    Hello team, there is a partial outage for the Cloud 1.0 UI, and our internal team is looking into it. You can see more on our status page here : https://prefect.status.io/.
    🙌 2
    ✅ 1
  • b

    Bianca Hoch

    02/23/2023, 9:19 PM
    This has been resolved, thank you all for your patience. Should anything appear to be amiss, please post in the thread above.
    🙏 4
    ✅ 1
    a
    1 reply · 2 participants
  • b

    Billy McMonagle

    02/23/2023, 9:59 PM
    Hello there, I am seeing my flow logs showing up intermixed with my agent logs, and am wondering why. I'm running Prefect 2.8.0 (cloud), running on EKS with
    KubernetesJob
    infrastructure.
    ✅ 1
    r
    10 replies · 2 participants
  • t

    Tomás Emilio Silva Ebensperger

    02/23/2023, 11:00 PM
    Prefect 2.0
    Is it possible to run a deployment from one server and have that flow picked up by an agent that is running on a different server?
  • z

    Zeeshan Nazar

    02/23/2023, 11:32 PM
    Hey Everyone! How can I set up prefect service in Linux within a conda environment? I created this initiate.sh shell file:
    #!/bin/bash
    eval "$(conda shell.bash hook)"
    conda activate ETL
    prefect server start
    And created this systemd service:
    [Unit]
    Description=Prefect
    
    [Service]
    Type=simple
    ExecStart=/home/zeephremia/files_required/initate.sh
    User=root
    Group=root
    
    [Install]
    WantedBy=multi-user.target
    Whenever I run the shell script locally, it works but whenever the systemd service triggers it, it throws this error:
    prefect_service.service - Prefect
         Loaded: loaded (/etc/systemd/system/prefect_service.service; enabled; vendor preset: enabled)
         Active: inactive (dead) since Thu 2023-02-23 23:29:43 UTC; 3s ago
        Process: 2233 ExecStart=/home/zeephremia/files_required/initate.sh (code=exited, status=0/SUCCESS)
       Main PID: 2233 (code=exited, status=0/SUCCESS)
    
    Feb 23 23:29:43 linux-server systemd[1]: Started Prefect.
    Feb 23 23:29:43 linux-server initate.sh[2233]: Starting the script...
    Feb 23 23:29:43 linux-server initate.sh[2234]: /home/zeephremia/files_required/initate.sh: line 5: conda: command not found
    Feb 23 23:29:43 linux-server initate.sh[2235]: /home/zeephremia/files_required/initate.sh: line 6: conda: command not found
    Feb 23 23:29:43 linux-server initate.sh[2237]: /home/zeephremia/files_required/initate.sh: line 7: prefect: command not found
    Feb 23 23:29:43 linux-server initate.sh[2233]: Script completed.
    Feb 23 23:29:43 linux-server systemd[1]: prefect_service.service: Succeeded.
    Would really appreciate any help, thank you! 🙂
    ✅ 1
    r
    1 reply · 2 participants
  • m

    muhammad

    02/24/2023, 12:05 AM
    Hi everyone, I am trying to create custom automation that will only send me a notification on Slack in case of
    n
    successive flow failures of a deployment within a
    specified
    time frame or without any time frame. Don't need alerts for every flow failure. I went through the documentation here and it's not super clear.
    <https://docs.prefect.io/ui/automations/#triggers>
    Has anyone else tried doing something like this before and could offer some advice or tips? Thanks!
    a
    1 reply · 2 participants
Powered by Linen
Title
m

muhammad

02/24/2023, 12:05 AM
Hi everyone, I am trying to create custom automation that will only send me a notification on Slack in case of
n
successive flow failures of a deployment within a
specified
time frame or without any time frame. Don't need alerts for every flow failure. I went through the documentation here and it's not super clear.
<https://docs.prefect.io/ui/automations/#triggers>
Has anyone else tried doing something like this before and could offer some advice or tips? Thanks!
a

Andrew Huang

02/24/2023, 12:36 AM
That sounds interesting! I don’t think it’s possible to do it with automation yet, but you could potentially write code like having a variable / block holding counts of failures that decrements at defined intervals, and then if it exceeds, warn here’s a starting point https://prefecthq.github.io/prefect-slack/#capture-exceptions-and-notify-by-slack-message
View count: 4