https://prefect.io logo
Join Slack
Powered by
# ask-community
  • m

    Melek Alan

    03/24/2025, 10:49 AM
    Hi everyone, How do I set an annotation on the kubernetes job pods that prefect-worker runs? I deployed prefect via helm chart (2024.12.11173517) on aws eks cluster. I am using
    selfHostedServer
    .
    r
    j
    m
    • 4
    • 7
  • f

    Fabian Gruben

    03/24/2025, 11:04 AM
    Hello, is the limit to five deployed workflows (=Deployments) a recent restriction for the Prefect Cloud Free Tier? Does this also apply to free tier accounts that were established before the restriction or is there something like a grace period? Best regards Fabian
    a
    • 2
    • 1
  • m

    Marko T

    03/24/2025, 1:00 PM
    Hello, is repo
    PrefectHQ/prefect-hightouch
    still being maintained by Prefect? It doesn't seem to support Prefect 3 because of Pydantic version problem: https://github.com/PrefectHQ/prefect-hightouch/issues/39 . I'm wondering if we really have to downgrade to Prefect 2 to use it in our pipelines.
  • t

    Theo Bouwman

    03/24/2025, 1:24 PM
    Hi, we have successfully deployed Prefect on k8s with additional worker pods. Works amazing! Unfortunately when I took a look at the dashboard again today it looks like all settings are removed, like the default namespace for the worker pool, and no previous runs are visible, deployments have been removed etc. Looks like a clean install again. We deployed by following https://docs.prefect.io/v3/deploy/infrastructure-examples/kubernetes. So anyone else having this issue as well?
  • s

    Stefan

    03/24/2025, 1:45 PM
    Hi! I'm not getting docs or any kind of recognition for the flow decorator.
    flow: FlowDecorator
    Task and every other import works fine. Latest version.
    n
    j
    • 3
    • 10
  • j

    Jeremy Karn

    03/24/2025, 4:31 PM
    Hi Everyone! I was wondering if anyone knew of a way to get a parameter from a Flow Run to display in the Prefect Cloud "Runs" UI list. It has a bunch of generic information (time, deployment, work pool, etc.) but each of our flow runs has one specific parameter that differs and it would be really nice to see it in the top level list but I couldn't figure out a way to do that. (Edit: I know I can click the parameters link to see the dictionary of parameters, but I'd like a way to just have it display without clicking each job).
    j
    • 2
    • 2
  • b

    Ben Epstein

    03/24/2025, 4:45 PM
    I'm trying to setup a programatic notification on slack when my flows fail. I'm looking at the docs for SlackWebhook and they say to use it like so
    Copy code
    from prefect.blocks.notifications import SlackWebhook
    
    slack_webhook_block = SlackWebhook.load("BLOCK_NAME")
    slack_webhook_block.notify("Hello from Prefect!")
    this is working for me. I wrote a function to do that as per this thread and added the
    on_failure=[notify_slack]
    but i'm getting the error (and mypy is showing the same)
    Copy code
    packages/flows/notify.py", line 7, in notify_slack
        slack_webhook_block.notify(
        ^^^^^^^^^^^^^^^^^^^^^^^^^^
    AttributeError: 'coroutine' object has no attribute 'notify'
    I'm curious why this is working when I just run this code directly, but within prefect flow infrastructure it's failing. I'm seeing that in pylance the variable is of type
    slack_webhook_block: SlackWebhook | Coroutine[Any, Any, SlackWebhook]
    -- so perhaps because my flow is async, this
    notify_slack
    function needs to be async as well?
    c
    • 2
    • 13
  • a

    Abyaya Lamsal

    03/24/2025, 5:41 PM
    Hi team, recently I am trying to migrate from prefect agent to worker in Prefect. At the same, I am also upgrading from
    2.14.16
    to
    2.20.16
    . I started seeing some intermittent issues with a subset of flows. This seems to happen randomly; hence, not every flow run has this issue. For reference, I am using a custom image. Attached logs below:
    Copy code
    13:29:55.058 | INFO    | prefect.flow_runs.worker - Worker 'KubernetesWorker a5d26a51-ff36-4697-8daf-f8aa3a0fea54' submitting flow run '855ead39-db97-4fa6-85b0-723ddd90b7c8'
    13:29:55.236 | INFO    | prefect.flow_runs.worker - Creating Kubernetes job...
    13:29:55.314 | INFO    | prefect.flow_runs.worker - Completed submission of flow run '855ead39-db97-4fa6-85b0-723ddd90b7c8'
    13:29:55.349 | INFO    | prefect.flow_runs.worker - Job 'adept-hog-hwcjq': Pod has status 'Pending'.
    13:30:55.327 | ERROR   | prefect.flow_runs.worker - Job 'adept-hog-hwcjq': Pod never started.
    13:30:55.570 | INFO    | prefect.flow_runs.worker - Pod event 'Scheduled' at 2025-03-18 13:29:55+00:00: Successfully assigned [OUR_NAMESPACE]/adept-hog-hwcjq-pqhbc to <INTERNAL_NODE>
    13:30:55.571 | INFO    | prefect.flow_runs.worker - Job event 'SuccessfulCreate' at 2025-03-18 13:29:55+00:00: Created pod: adept-hog-hwcjq-pqhbc
    13:30:55.572 | INFO    | prefect.flow_runs.worker - Pod event 'Pulling' at 2025-03-18 13:29:56+00:00: Pulling image "<CUSTOM_IMAGE>"
    13:30:55.572 | INFO    | prefect.flow_runs.worker - Pod event 'Pulled' at 2025-03-18 13:30:33+00:00: Successfully pulled image "<CUSTOM_IMAGE>" in 37.16s (37.16s including waiting). Image size: <SIZE> bytes.
    13:30:55.716 | INFO    | prefect.flow_runs.worker - Reported flow run '855ead39-db97-4fa6-85b0-723ddd90b7c8' as crashed: Flow run infrastructure exited with non-zero status code -1.
    <NORMAL EXECUTION>
    ...
    The job eventually runs. The issue is if I subscribe to any failure notification, then I get randomly bombarded with crash notifications, which is not very helpful. Would appreciate any pointers here. Here is a sample of the job logs:
    • 1
    • 1
  • s

    Stefan

    03/24/2025, 6:54 PM
    Hi! How are you handling monorepos in GitHub? I have several Flows where their own repo would be maintenance overkill, so I’d like to have them in their own repo. Today we commit to git and the worker pulls from git, but we would like to use Docker moving forward. How can we achieve: • Having multiple projects in a repo, each with their own build instructions and dependencies (preferably their own isolated uv-projects) • Only deploy and build projects that have changed to avoid building all of them every time. • …preferably without writing a .yaml-file for each project?
    n
    • 2
    • 8
  • t

    Tim Galvin

    03/25/2025, 8:02 AM
    @Marvin - I am trying to run a pytest unit test that runs a very simple
    flow
    . I am doing this inside a docker container. My test fails in the container with the following error:
    Copy code
    FAILED tests/test_prefect_helpers.py::test_enable_loguru_support - sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such column: "Debug Print Notification" - should this be a string literal in single-quotes?
    @Marvin - what is going wrong and causing the error?
    m
    • 2
    • 3
  • c

    Claudiu

    03/25/2025, 11:02 AM
    Hi everyone, my team and I are considering using Prefect to orchestrate tasks for a cognition system of general purpose humanoid robot. We have a POC that works, but we've run into some big bottlenecks and we're wondering if what we're doing makes sense. Issue nr 1: we can't schedule/pause/stop flows: • from the documentation it's very clear that we need to create a
    work pool
    in order to schedule flows, but since our deployment will be tied to some specific hardware (Jetson) we have to support emergency use-case without any cloud or work pool access. Our orchestration and infrastructure layers are on the same hardware so we don't need to separate them. What's the solution for us to be able to schedule flows without work pools? • is there a way to pause/resume flows without work pools? Rn it seems we NEED to create a work pool to have this functionality. •
    prefect.deploy
    or
    prefect.serve
    seem like good tools for remote deployment, but that just isn't our usecase. • Do work pools make sense for our specific use-case or is there an entity that we can use instead? Currently we have a yaml file that provides the scheduling details for a flow, but it's a very convoluted process. having the ability to directly scheduling a task when needed would simplify our process (more details in the thread) Issue nr 2: serialization issues • we have some entities that can't be easily serialized and custom serialisation logic will require additional parts of the system, that aren't implemented in the scope of POC . We know you have some serializers, but they don't work for our entities. • we also have some singleton classes that act as a "syncing" element in our system. Is there a better alternative to manage the state for a single-machine all in one deployment? • we're currently using the
    default task runner
    , is there any benefit to using another one (like
    DaskTaskRunner
    )? given that we don't need distributed cognition for POC
    👀 2
    👍 1
    ➕ 1
    • 1
    • 2
  • m

    Martin Tomovič

    03/25/2025, 11:39 AM
    Hi, I use prefect for flow recurring at a interval using deployments. But now I have a list of thousands of specific times e.g. 2025/3/25 at 112312:303 and I need to schedule a flow to run at those exact times. Is this somehow possible? Thank you for your advice.
    j
    • 2
    • 2
  • c

    Chris

    03/25/2025, 12:35 PM
    Hi all, I’m running a flow on ECS and have encountered a strange issue. After setting my deployment configurations (mainly network configurations) in
    prefect.yaml
    , the job variables appear in my deployment configuration but are not passed through to my ECS Task. A task definition is created with the name of my work pool and a UUID (e.g.,
    prefect_ecs-dev-pool_79227824-ac2e-48de-8639-58bc6fae1f6c
    ), but it does not include the configured variables. As a result, network settings and IAM roles are missing (managed to bypass this by allowing my worker to pass roles to my task… for now.). I’ve already discussed this with @Bianca Hoch but am still unsure why this is happening. Hoping the community can help us troubleshoot! Thanks in advance. Secondary ask; I’m using webhooks in Prefect Cloud and would like to validate incoming webhooks. To do this, I need access to the raw, unprocessed request body to generate a signature and match it against the signature in the headers. However, it seems that Prefect modifies the raw body in some way, making it impossible to validate my signature. While I appreciate the ability to extract webhook parameters directly into my flow, it would be helpful to have an option to pass the full request payload so I can handle it myself - without manipulation by prefect. I’ve tried several approaches to pass the raw body to my function, but I either get validation errors in my automation or signatures that don’t match. Has anyone else encountered this, or found a workaround? Would love any insights. Thanks!
    j
    • 2
    • 4
  • a

    antonk

    03/25/2025, 2:58 PM
    Hi all, For some reason my schedules deactivate whenever I do a deploy. I tried explicitly setting the schedule to active, but it keeps happening. See images below. Any ideas?
    j
    • 2
    • 1
  • e

    Emon Datta

    03/25/2025, 5:13 PM
    hmm I created a prefect deployment, and it's not picking up the correct parameters. It only shows "args" and "kwargs" in the parameters section instead of the actual expected argument names. When I try to run it, I get the following error because it doesn't pick up the expected parameter names:
    Response: {'detail': "Error creating flow run: Validation failed. Failure reason: 'args' is a required property"}
    any advice for how to fix this?
  • s

    sadath

    03/25/2025, 5:38 PM
    Hi , how can I list failed / success flows and rerun them ?
  • l

    Luis Henrique Dallelaste Bernartt

    03/25/2025, 5:46 PM
    Guys, I am currently facing an issue with using AWS + Prefect, wich boils down to a problematic query. I downloaded the source code of prefect and have been trying for some time to find this query to modify it for better performance. Below is the query in question: UPDATE deployment SET last_polled=$: : TIMESTAMP WITH TIME ZONE, status=$2: :deployment_status, updated=now() WHERE deployment.in IN (NULL) AND (1 !=1) or deployment.work_queue_id IN (3$: :UUID) If you could help me find and fix this query, I Would be grateful! The goal would be to remove the following part: “(NULL) AND (1!=1) OR”. Thank you in advance!
    n
    • 2
    • 2
  • s

    sadath

    03/26/2025, 5:25 AM
    Hi need some help for comparing https://github.com/tensorlakeai/indexify vs prefect . Not feature wise . But on the core engine design wise
  • v

    Vlad

    03/26/2025, 9:31 AM
    Hello there 👋, could someone please suggest a contact from the Prefect team (ideally UK-based) whom I could contact for a technical partnership? We are interested in adopting the Prefect solution.
  • s

    Sam Greenwood

    03/26/2025, 11:46 AM
    @Bianca Hoch Just wanted to drop in to say I've been trying to troubleshoot a
    prefect deploy
    issue which was reliably producing badly built images for ECR which EKS then refused to run (EKS reported on bad hash on image pull, imagePullErr). I had had this issue for over a week and tried everything with installing/re-installing Docker, different package manager venvs, different project layouts for prefect.yaml, etc. In the end, the solution was to disable the containerd image store in Docker Desktop. With that checkbox unticked, the
    prefect deploy
    worked first time, with it checked it would reliably not work. This is Prefect 3.2 with Python 3.12.
  • a

    Abdul Raheem Siddiqui

    06/27/2025, 10:26 PM
    Thank you. I am reading this.
  • m

    Martin Klefas

    06/27/2025, 11:15 PM
    was the copy/clone run option removed? I was using that for debugging small sections of my code that had failed without rerunning the whole flow!
  • b

    Ben Epstein

    06/29/2025, 12:34 AM
    Hey @Nate sorry to direct tag but are there any updates here (on being able to respect the prefect version in your lockfile)? Because this is happening again now https://app.prefect.cloud/account/2e97a726-e32b-48ab-90fe-8c542523240f/workspace/66cf6c8b-748e-4cb7-9448-c079d54e0e36/runs/flow-run/0685fcb9-f33e-7bca-8000-20ab00ec6ee1?entity_id=0685fcb9-f33e-7bca-8000-20ab00ec6ee1 It’s a pretty big blocker for us because we can’t currently trust that our jobs will run reliably.
  • m

    Mohammad Nassar

    06/29/2025, 10:22 AM
    Hi all, assume we have Prefect flows with long tasks and we want to support a cancel request that stops or kills "immediately" the flow and the running tasks. What is the best way to do so? Thanks! (From my understanding, I see that the default cancel flow didn't affect the current running tasks and they continue to work. Also, when using "set_task_run_state()" or "set_flow_run_state()" it is just labeling with "cancel" not really stopping the run.)
  • m

    Mohammed Noureldin

    06/29/2025, 5:40 PM
    Hi guys, it is kind of pain to debug docker build errors in Prefect when built using
    .deploy
    , as Prefect does not output anything but the very latest error, which is mostly missing the context and the text of the real fatal error. While building (in the deploy step) a custom image, it only shows building, without the steps, and if an error happens, show the "error" message, while a the real message, when using docker build, is in the fatal message, which prefect just does not show. Can we make prefect show all of the building steps, just like docker build?
    c
    • 2
    • 3
  • r

    Royzac

    06/29/2025, 10:54 PM
    Anyone using Prefect along with Pulsar? Curious if any particulars issue you may have come across.
  • m

    Mohammed Noureldin

    06/29/2025, 11:48 PM
    # UPDATE: check my reply for answer! Maybe this will help someone. Is there a way to rebuild the whole image with no cache? I am not able to find what I have to pass to
    .deploy
    to build the whole image again. I already tried
    Copy code
    image=DockerImage(
                name="myworkflow",
                tag="latest",
                dockerfile="Dockerfile",
                build_kwargs={"nocache": True},
    But for some reason the build_kwargs is not recognized (I am getting an error)
    ✅ 1
    • 1
    • 1
  • t

    Tobias Bjormyr

    06/30/2025, 11:26 AM
    With the new Prefect Tier system we are trying to upgrade our
    Legacy Free Tier
    to
    Starter Tier
    , but when we try to initiate the process by clicking on "Upgrade" we get this error. I've already emailed
    <mailto:help@prefect.io|help@prefect.io>
    early on Friday - still waiting for answers. Does anyone know the normal response time? I haven't received a notification that the support case has been received. Is this a normal bug people have seen before?
    a
    k
    +2
    • 5
    • 11
  • r

    Robin

    06/30/2025, 1:11 PM
    Hey all, there probably is a way to set retention time for prefect server for logs etc? We already have 46 GB of the prefect server related database see
    p
    • 2
    • 1
  • p

    Pav Staz

    06/30/2025, 5:00 PM
    Hi guys, I'm trying to format logs at the end of my master flow using client.read_logs, and from that using flow_run_id and task_run_id to get flow and task names programmatically (and other information). Problem It seems, however, that on the UI (and within a python flow client) that I can't get all the logs of child flows within parent flows to be together. Only the tasks with the parent flow show logs and not subflows. This means I can't get ALL logs of a parent flow and then format it for like a 'success email' for the entire flow. Unless someone has a better idea that gives me an 'overall report' of a flow and all of it is subflows I am open to ideas here is my code
    Copy code
    @task
    async def check_logs():
        async with get_client() as client:
            # Ensure any pending logs are sent
            await APILogHandler.aflush()
            logs = await client.read_logs(
                LogFilter(flow_run_id={"any_": [runtime.flow_run.id]})
            )
    
            records = []
            for log in logs:
                
                # Gets task run and flow run info
                if log.task_run_id is not None:
                    task_runs = await client.read_task_runs(
                        task_run_filter=TaskRunFilter(id=TaskRunFilterId(any_=[log.task_run_id]))
                    )
                    task_run = task_runs[0]
                    task_run_name = task_run.name
                    print("-------------------- Task Run Details ----------------------------")
                    print(task_run)
                    print("------------------------------------------------------------------")
                    if task_run.flow_run_id is not None:
                        flow_runs = await client.read_flow_runs(
                            flow_run_filter=FlowRunFilter(id=FlowRunFilterId(any_=[task_run.flow_run_id]))
                        )
                        flow_run = flow_runs[0]
                        print("-------------------- Flow Run Details ----------------------------")
                        print(flow_run)
                        print("------------------------------------------------------------------")
                        if flow_run.flow_id is not None:
                            flow = await client.read_flow(flow_run.flow_id)
                            print(f"Flow name is {flow.name}")
    
                print(log)
    
    
    @flow(log_prints=True)
    def daily_flow():
        
        ## A subflow
        some_subflow()
        
        ## A task
        some_task()
        
        check_logs.submit()
        
        # only logs of the some_task are printed by the check_logs function, not the some_subflow function