https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • f

    flapili

    02/23/2023, 2:32 PM
    Hi, how can I get task result from api ? I only get a reference from api and I don't know how to retreve the state from pure python code :/
    "data": {
        "type": "reference",
        "serializer_type": "compressed/json",
        "storage_block_id": "e3311389-34e1-491c-aaef-cbcf6e6ea977",
        "storage_key": "e4940b1d2ada42a7a1ac77710cae4d18"
      },
    n
    • 2
    • 4
  • s

    Sam Garvis

    02/23/2023, 3:02 PM
    Where are the values.yaml examples in the Prefect2.0 GitHub? I see that there were recent changes to work queues/pools etc and I'm guessing it would break new values.yaml files
    • 1
    • 1
  • s

    Steph Clacksman

    02/23/2023, 3:31 PM
    How can I create an s3 storage block with a session token? Is it possible to create an s3 storage block without using the UI?
    d
    • 2
    • 2
  • m

    Marcel

    02/23/2023, 3:46 PM
    Hi all, is there any guide on handling db migrations - like up-/downgrading the prefect database after updating prefect? I haven't found one in the current docs or on GH. Unfortunately, the @Ryan Peden's linked thread is already archived. So, any tips would be greatly appreciated! https://prefect-community.slack.com/archives/C03D12VV4NN/p1671204457213779?thread_ts=1671203596.984439&cid=C03D12VV4NN
  • k

    Kyle Austin

    02/23/2023, 3:46 PM
    I have a question about working with the ConcurrentTaskRunner. I have a flow where I have a potentially large number (in test runs I am using 5k many) of independent implementations of a simple async task that I would like to submit all to the task runner so it can work on them concurrently. The flow is submitting all 5k to the task runner at the same time and I am getting many many warning messages like
    10:44:28.065 | WARNING | Task run 'email_send_message-357' - Task run '52fa4b9c-deb0-407c-a992-ccde4685dfcd' received abort during orchestration: The enclosing flow must be running to begin task execution. Task run is in PENDING state.
    I almost always I am getting the following error message
    prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
    I am setting
    persist_result=True
    in all my task decorators too. Plus I have set concurrency limits tags so none of these tasks have more than 30 running at once. But all 5k tasks are still being submitted and created all at once! Here is kinda how the code looks like in the flow now
    humana_smtp = EmailServerCredentials.load("some-smtp-server")
    for email in emails:
        email_send_message.submit(
            subject=email.subject,
            msg=email.rendered_html_template,
            email_server_credentials=humana_smtp,
            email_from=email.email_from,
            email_to=<http://email.to|email.to>,
            email_to_cc=<http://email.cc|email.cc>,
            email_to_bcc=email.bcc,
            attachments=email.attachments,
            images=email.images,
            dry_run=dry_run,
        )
    I have done something like this to prevent it from submitting all 5k at once and throttle it down to working with 50 at a time
    email_chunks_for_sending = chunkify(emails, 50)
    humana_smtp = EmailServerCredentials.load("some-smtp-server")
    for chunk in email_chunks_for_sending:
        wait_for_complete_object = []
        for email in chunk:
            sent = email_send_message.submit(
                subject=email.subject,
                msg=email.rendered_html_template,
                email_server_credentials=humana_smtp,
                email_from=email.email_from,
                email_to=<http://email.to|email.to>,
                email_to_cc=<http://email.cc|email.cc>,
                email_to_bcc=email.bcc,
                attachments=email.attachments,
                images=email.images,
                dry_run=dry_run,
            )
            wait_for_complete_object.append(sent)
        [future.result() for future in wait_for_complete_object]
    here chunkify I stole from another post on slack looks like
    def chunkify(xs, size):
    
        return (xs[pos : pos + size] for pos in range(0, len(xs), size))
    Is there a way to set a limit to the number of tasks that are submitted to the task runner at a given time? Task concurrency didnt do the trick for me -- it only prevented the number of tasks running at given time.
    đź‘€ 1
    âś… 1
    t
    a
    m
    • 4
    • 17
  • j

    Jean-Michel Provencher

    02/23/2023, 4:21 PM
    Is it possible to create notifications using the Python API ? I don't see any references to Notifications in the Python Doc.
    âś… 1
    d
    j
    • 3
    • 4
  • l

    Leon Kozlowski

    02/23/2023, 5:42 PM
    Is the prefect agent supposed to clean up completed pods when running on kubernetes? I see tons of pods in a completed state (prefect version 2.7.10 for my agent)
    âś… 1
    m
    • 2
    • 1
  • j

    Jacob Bedard

    02/23/2023, 6:09 PM
    Is there a "migrating to prefect 2" channel?
  • j

    Jacob Bedard

    02/23/2023, 6:15 PM
    I'm having trouble wrapping my head around all these various blocks, and there isn't a lot of great content out there, in my opinion. Everyone who does the prefect 2 videos sounds like they're very close to the product, and it's honestly a bit off-putting because they don't use any terminology from prefect cloud v1. Am I correct in understanding a setup could work like this: • I set up a repo for my prefect flows • I tie that repo to an S3 bucket or other storage location so it receives all updates and new files from the master/prod branch • I save a reference to this storage location as a prefect block • when I make a new flow, I can still test locally (I'm clear on this part at least), then when I'm ready to push to production, I push my code to my repo ◦ my repo updates the code storage location (S3 in my example) • I then create a deployment, and in the deployment, I state which named flow function is to be used from which file within the storage location, and I do this using a prefect "storage block" • the prefect cloud will then tell my agent (let's do a simple example and say I have an EC2 worker) that there is a new job to run • the worker, using the storage location specified in the deployment, will reach into the s3 bucket, and execute that flow
    r
    n
    b
    • 4
    • 21
  • a

    Aric Huang

    02/23/2023, 7:15 PM
    Hi, any known issues accessing cloud.prefect.io right now? It's stuck on a loading indicator for me, and I see the following errors in console. Same issue after clearing cache/trying in incognito mode, and on different devices.
    :upvote: 3
    âś… 1
    m
    a
    • 3
    • 7
  • g

    Ghislain Picard

    02/23/2023, 7:42 PM
    Prefect observability
  • b

    Bianca Hoch

    02/23/2023, 8:16 PM
    Hello team, there is a partial outage for the Cloud 1.0 UI, and our internal team is looking into it. You can see more on our status page here : https://prefect.status.io/.
    🙌 2
    âś… 1
  • b

    Bianca Hoch

    02/23/2023, 9:19 PM
    This has been resolved, thank you all for your patience. Should anything appear to be amiss, please post in the thread above.
    🙏 4
    âś… 1
    a
    • 2
    • 1
  • b

    Billy McMonagle

    02/23/2023, 9:59 PM
    Hello there, I am seeing my flow logs showing up intermixed with my agent logs, and am wondering why. I'm running Prefect 2.8.0 (cloud), running on EKS with
    KubernetesJob
    infrastructure.
    âś… 1
    r
    • 2
    • 10
  • t

    Tomás Emilio Silva Ebensperger

    02/23/2023, 11:00 PM
    Prefect 2.0
    Is it possible to run a deployment from one server and have that flow picked up by an agent that is running on a different server?
  • z

    Zeeshan Nazar

    02/23/2023, 11:32 PM
    Hey Everyone! How can I set up prefect service in Linux within a conda environment? I created this initiate.sh shell file:
    #!/bin/bash
    eval "$(conda shell.bash hook)"
    conda activate ETL
    prefect server start
    And created this systemd service:
    [Unit]
    Description=Prefect
    
    [Service]
    Type=simple
    ExecStart=/home/zeephremia/files_required/initate.sh
    User=root
    Group=root
    
    [Install]
    WantedBy=multi-user.target
    Whenever I run the shell script locally, it works but whenever the systemd service triggers it, it throws this error:
    prefect_service.service - Prefect
         Loaded: loaded (/etc/systemd/system/prefect_service.service; enabled; vendor preset: enabled)
         Active: inactive (dead) since Thu 2023-02-23 23:29:43 UTC; 3s ago
        Process: 2233 ExecStart=/home/zeephremia/files_required/initate.sh (code=exited, status=0/SUCCESS)
       Main PID: 2233 (code=exited, status=0/SUCCESS)
    
    Feb 23 23:29:43 linux-server systemd[1]: Started Prefect.
    Feb 23 23:29:43 linux-server initate.sh[2233]: Starting the script...
    Feb 23 23:29:43 linux-server initate.sh[2234]: /home/zeephremia/files_required/initate.sh: line 5: conda: command not found
    Feb 23 23:29:43 linux-server initate.sh[2235]: /home/zeephremia/files_required/initate.sh: line 6: conda: command not found
    Feb 23 23:29:43 linux-server initate.sh[2237]: /home/zeephremia/files_required/initate.sh: line 7: prefect: command not found
    Feb 23 23:29:43 linux-server initate.sh[2233]: Script completed.
    Feb 23 23:29:43 linux-server systemd[1]: prefect_service.service: Succeeded.
    Would really appreciate any help, thank you! 🙂
    âś… 1
    r
    • 2
    • 1
  • m

    muhammad

    02/24/2023, 12:05 AM
    Hi everyone, I am trying to create custom automation that will only send me a notification on Slack in case of
    n
    successive flow failures of a deployment within a
    specified
    time frame or without any time frame. Don't need alerts for every flow failure. I went through the documentation here and it's not super clear.
    <https://docs.prefect.io/ui/automations/#triggers>
    Has anyone else tried doing something like this before and could offer some advice or tips? Thanks!
    a
    • 2
    • 1
  • s

    Samuel Hinton

    02/24/2023, 1:54 AM
    Hi all, I’ve got a script which is meant to set up a bunch of blocks in prefect server. I was hoping to have a quick check performed before trying to make and save blocks, something along the lines of “Can you connect to the given PREFECT_API_URL and it all looks good?” Is there a nice way in the python API to do this?
    âś… 1
    r
    • 2
    • 2
  • s

    Samuel Hinton

    02/24/2023, 2:41 AM
    Hi team! I might be confused, or there might be an issue with the doco. Please see the screenshot. I’m trying to figure out what the difference is between a work pool and a work queue. The doco gives examples of starting an agent with no queue and pool of “my-pool”, but then talks about how this will great a “my-queue” work queue. I dont see how the two link up and have been assuming this is a doco error. Would anyone be able to confirm/deny this?
    âś… 1
    r
    • 2
    • 12
  • s

    Samuel Hinton

    02/24/2023, 3:23 AM
    Hi team. Another confusing thing on my end - I’m getting warnings about naming conflicts:
    /Users/sh/Projects/flows/.venv/lib/python3.9/site-packages/prefect/tasks.py:270: UserWarning: A task named 'some_task' and defined at 'flows/data/endpoints/example.py:3' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:
    
     `@task(name='my_unique_name', ...)`
      warnings.warn(
    /Users/sh/Projects/flows/.venv/lib/python3.9/site-packages/prefect/flows.py:214: UserWarning: A flow named 'get-some-data' and defined at 'flows/data/endpoints/example.py:9' conflicts with another flow. Consider specifying a unique `name` parameter in the flow definition:
    
     `@flow(name='my_unique_name', ...)`
      warnings.warn(
    13:20:46.203 | INFO    | Flow run 'important-jaguarundi' - Created task run 'some_task-0' for task 'some_task'
    Apparently both my task and my flow conflict with another… This is… not possible? I’m playing around with prefect and have a single
    example.py
    that contains a single flow and task. Is there a way I can figure out why prefect thinks theres a conflict, or just silence the spurious warnings? (Ideally Id like to figure out whats going on rather than ignoring the warnings)
    r
    • 2
    • 9
  • f

    Farid

    02/24/2023, 4:57 AM
    Hi all, How can I handle the failure in task so that the whole flow doesn't fail because of that when it's being called in a mapped mode? For example, when a shell task is being called to run multiple commands in parallel using
    .map
    , the whole flow fails immediately if one of the child tasks fail. I have tried passing
    return_state=True
    but it doesn't seem be effective in mapped mode.
    k
    • 2
    • 1
  • t

    Tanishq Hooda

    02/24/2023, 5:22 AM
    Hi, I'm using Deployment.build_from_flow(.....) to create deployments for my flow and want to mention worker_pool name here as well. I tried work_pool_name parameter but it is not allowed, can anyone please help me what to use? Thanks!
    Deployment.build_from_flow(
        flow=my_flow,
        name="my_flow",
        parameters={},
        infra_overrides={
            "image": SHARED_CONFIG.value["prefect_image_name"],
            "service_account_name": "my-sa",
            "image_pull_policy": "Always",
            "namespace": "mynamespace",
        },
        infrastructure={"block_type_slug": "kubernetes-job"},
        work_queue_name="k8s",
        work_pool_name="my-worker-pool", # doesn't work
        storage=storage,
        .....
        .....
    Even though I can update the worker pool from the UI but I want to do it from code. Prefect version :
    2.7.7
    âś… 1
    s
    • 2
    • 2
  • j

    Jamie Blakeman

    02/24/2023, 10:11 AM
    Hi, our agents run in docker containers with env vars supplied when the containers are stood up. We recently changed one of our API keys, updated the env var, rebuilt the containers and can confirm by executing python inside the container that the env var is updated However, when the prefect flow runs, when it imports the env var value, it is fetching the value it knew when the deployment was made, is this by design? Rather than a quick change in a docker-compose file, it looks like I’ll have to update all the deployments that use that value, even though nothing in the code is changing
  • c

    Christianto

    02/24/2023, 10:39 AM
    Hello. I am trying to create deployment from CLI using GCS storage. When i try to run
    prefect build ...
    it gives me error saying
    ModuleNotFoundError: No module named 'gcsfs'
    . Meanwhile when i did
    conda list
    . The gcsfs module is already installed but somehow the prefect cli can’t find it. Is there any workaround for this? Thanks!
    âś… 1
    k
    • 2
    • 1
  • s

    Stefan

    02/24/2023, 11:06 AM
    Hi, When I decorate a function with the @task, i've been receiving this error: psycopg2.ProgrammingError: no results to fetch This happens when the function/task has a postgres-query that either inserts or updates, so they are not returning anything. If I remove the decorator, it works. A task run is not even initialized or run when using the decorator, it fails before. Any clues?
  • a

    Andrei Nikonov

    02/24/2023, 12:00 PM
    Hi there I’m getting
    Cannot return null for non-nullable field flow_run.flow
    when query GraphQL in Interactive API tab. I want to see flow name alongside with flow_run information, but I can’t get flow name. I see flow_id and I can fetch all names and map programmatically, but I hope there’s more convenient solution.
    • 1
    • 1
  • a

    Aaron Gonzalez

    02/24/2023, 12:43 PM
    Is it possible to pass in custom task name to each of the calls to
    bigquery_query
    here. Not that big of a deal but I just would prefer to see something a little more descriptive. https://prefecthq.github.io/prefect-gcp/bigquery/#prefect_gcp.bigquery.bigquery_query
    if await bigquery_query(
            exists_query,
            creds
        ):
            print(f"Deleting data from the {sync_date} partition.")
            await bigquery_query(
                del_query,
                creds
            )
    
        await bigquery_query(
            load_query,
            creds
        )
    âś… 1
    w
    r
    • 3
    • 5
  • w

    Wellington Braga

    02/24/2023, 2:31 PM
    is there any prefect native function in python which is equivalent to the command:
    prefect deployment ls --flow-name <flow-name>
    âś… 1
    k
    j
    • 3
    • 4
  • v

    Vadym Dytyniak

    02/24/2023, 3:05 PM
    Hi. We are getting the following error:
    httpx.HTTPStatusError: Client error '429 Too Many Requests'
    Where can I read about the limits and best practices?
  • j

    John Mizerany

    02/24/2023, 4:51 PM
    When setting up an automation in Prefect 2 cloud, is there a way to test to make sure it works? Or would you just have to wait for the event to happen to trigger it
    • 1
    • 1
Powered by Linen
Title
j

John Mizerany

02/24/2023, 4:51 PM
When setting up an automation in Prefect 2 cloud, is there a way to test to make sure it works? Or would you just have to wait for the event to happen to trigger it
More specifically I am trying to make sure the slack notification triggers if one of my agents in a work queue goes from healthy to unhealthy
View count: 73