https://prefect.io logo
Join Slack
Powered by
# ask-community
  • b

    Bjarne Johannsen

    08/05/2025, 10:28 PM
    How do you suggest handling large amounts of files per workflow (e.g. 50k), I feel like the assets are not really made for that, I can't find any docu on it.
  • g

    gee

    08/06/2025, 11:20 AM
    Marvin's not really helping on this one. I found this package https://pypi.org/project/prefect-client/ but can't get it to work. Any ideas?
  • n

    Nick Torba

    08/06/2025, 1:12 PM
    Hello! Getting a couple errors for my work pools like this: Prefect work pool 'default-worker' has entered status 'NOT_READY' And in the UI, my work pools will not load. Is there some sort of issue with work pools on the platform?
  • a

    Akash N

    08/06/2025, 1:18 PM
    I am also facing the same issue
    n
    • 2
    • 2
  • k

    Kiran

    08/06/2025, 1:51 PM
    hey @Marvin, how can i get the metadata of a flow run using client methods
    m
    • 2
    • 5
  • b

    Ben

    08/06/2025, 2:39 PM
    If anyone has successfully created an Amazon ECS worker with open source Prefect 3, please ping me. No matter what I try, whenever a deployment gets scheduled, the worker picks it up and immediately crashes due to AWS credential issues. I can't seem to figure it out.
    Copy code
    16:19:01.204 | INFO    | prefect.flow_runs.worker - Reported flow run 'f6740f0f-e635-4017-a73c-c8907010f99e' as crashed: Flow run could not be submitted to infrastructure:
    agent-aws-esc-1  | ProfileNotFound('The config profile (prefect) could not be found')
  • m

    Michael Hadorn

    08/06/2025, 4:25 PM
    One question about concurrency limits together with "task-groups". How to achieve I'm building a ETL pipeline. I'm only focus on the Extract in this question. • extraction contains different source systems, which needs an external tool with a limited resource, using tags for tasks concurrency limits ◦ the limited resource allows only 4 concurrent connections • lets say we load 5 tables for source A, and 5 tables for source B ◦ for simplification, each task loads one table and lets say needs exact the same time of 5 seconds • I start to load source A. ◦ 0s: submit all 5 A tables ◦ 5s: limit allows the execution of 4 in parallel ◦ 10s: then it will execute the last A table • I want to load B as soon as possible, but not before, the last Table of A has started. At the same time, there are more tasks running, which do not use the limited resources. How can I achieve this?
  • z

    Zhongyang Xia

    08/06/2025, 7:47 PM
    Hi, is prefect cloud experiencing reliability issues? We are getting
    Copy code
    19:40:23.179 | WARNING | prefect.events.clients - Unable to connect to '<wss://api.prefect.cloud/api/accounts/[uuid]/workspaces/[uuid]/events/in>'. Please check your network settings to ensure websocket connections to the API are allowed. Otherwise event data (including task run data) may be lost. Reason: timed out during opening handshake. Set PREFECT_DEBUG_MODE=1 to see the full error.
  • s

    Samuel Hinton

    08/07/2025, 5:53 AM
    Hi all! We're using Prefect for academic purposes, and the org has set up a paid Prefect Cloud platform. We've taken some videos to run through how it all works, and shared with students. However, when students are running example scripts connected to a local Prefect via a
    docker run
    command, the UI is very different in the flow runs. Prefect Cloud has tracing, log trees, better artifact viewing, etc. Is there a way to unify the UIs, or is Prefect Cloud having a more advanced UI one of the gatekeepers to encourage subscription?
  • a

    António Domingues

    08/07/2025, 9:13 AM
    Hi all, new here and to prefect, but I wrote / used pipelines in other workflow managers in the past (bpipe, nextflow if it matters). I am in process of writing some flows in and have a question for y'all: what are the strategies you are using to re-use tasks in multiple flows? For example, a
    load_data
    task could be fairly generic and used in two different flows such as
    apply_model
    or
    train_model
    . How to go about it? Write each task in a file and load as needed in each flow file? Cheers!
    s
    • 2
    • 1
  • n

    Naren K

    08/07/2025, 11:22 AM
    👋 Hey Prefect community! I’ve built a solution for enabling flow-level dependencies in Prefect OSS using a simple decorator:
    @wait_for_deployments
    . It allows one flow to wait for other deployments to complete before running — no need for nesting flows or using Cloud Automations. ✅ Async/sync compatible ✅ Configurable timeout & retry ✅ Lightweight + fully OSS-friendly 🔗 GitHub Issue: https://github.com/PrefectHQ/prefect/issues/18665 🔗 Repo: https://github.com/nrjangir/prefect-flow-dependency Would love your feedback — happy to improve or open a PR if there’s interest! 🙌 #CL09KU1K7
  • d

    David Martin Calalang

    08/07/2025, 3:10 PM
    Hi everyone! I've been following the ECS Worker Guide on the documentation, and have a Prefect worker service hosted on ECS. I also happen to have the Prefect server hosted as an ECS service as well. I'm able to set a work pool and stage deployments. However, when actually running flows, all that happens is that the Prefect worker spawns a new task in the cluster. That task ends up crashing, either with
    Exited with non-zero code. (Error Code: 1
    or
    RuntimeError: Timed out after 301.83046412467957s while watching task for status RUNNING.
    Has anyone else deployed Prefect flows through Prefect workers hosted on ECS and faced any similar issues or have any insights?
  • r

    Ricardo Garcia Silva

    08/07/2025, 5:53 PM
    Hi all I have setup an airgapped prefect cluster and I'm now trying to work up a strategy for deploying flows. I am required to use work pools of type
    process
    and since the cluster cannot access the Internet, I'm also storing flows locally on each node. So I have setup a couple of nodes, each running
    prefect worker
    with a work pool of type
    process
    . I have successfully came up with an ansible playbook which deploys my sample flow to the cluster. However, I would like to be able to use an isolated environment for each flow - how can I achieve this when using the
    process
    work pool type? I have already created a venv for my flow and would now like to configure the prefect deployment with instructions to use it. This is my
    prefect.yaml
    file:
    Copy code
    deployments:
      - name: "my_workflow"
        entrypoint: "main.py:my_workflow"
        work_pool:
          name: green
          work_queue_name: low
        job_variables:
          command: |
            echo "About to activate" && . /opt/prefect/flows/demo1/activate.sh && /opt/prefect/flows/demo1/env/bin/python -m prefect.engine
          working_dir: "/opt/prefect/flows/demo1"
    unfortunately this does not seem to work - the flow is deployed successfully and I can trigger workflow runs just fine. However they do not use the custom environment, but rather run with the same environment being used by the prefect worker process. I was hoping to be able to override the python env with that
    job_variables.command
    property of the deployment, but it seems to be silently ignored. Is there some other way to do this, maybe using env variables? Thanks for helping out!
  • a

    Andrew Allen

    08/07/2025, 5:55 PM
    Hello Prefect Community I would like to understand how others are sub-flows in their own K8s pod without using
    RayTaskRunner
    or
    DaskTaskRunner
    on kubernetes.
    b
    • 2
    • 5
  • d

    Daniel Rodriguez Ciotti

    08/07/2025, 7:36 PM
    Hey everyone, About two hours ago, one of my deployments started consistently failing with the following error: ‘’’ prefect.exceptions.PrefectHTTPStatusError: Client error '422 Unprocessable Entity' for URL 'https://api.prefect.cloud/api/accounts/*****/workspaces/******/flow_runs/' Response: { 'exception_message': 'Invalid request received. Please check your input for NULL characters or values that may be outside normal ranges.' } More info: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/422 ‘’’ When I run the flow locally against the Prefect Cloud API, I get the same error. But when I run it using a local Prefect server, everything works as before. Has anything changed on Prefect Cloud recently? And does anyone know how to fix or troubleshoot this? Thanks in advance!
    n
    • 2
    • 2
  • n

    Nicolas

    08/07/2025, 9:51 PM
    hi hi hi i have a question
  • n

    Nicolas

    08/07/2025, 9:52 PM
    trying to set prefect behind a nginx proxy, If i put /api under authentication, it fails. Whats the way to keep my prefect server secure???
    b
    • 2
    • 2
  • j

    Jaco Waes

    08/08/2025, 6:00 AM
    I have this pull steps defined in my prefect 3 deployments file. I seem to have issues with the the git_clone steps or the pip step, it often fails on first try when i have flows with sub flows fan-out, thus concurrency. Seems like these deployment steps do not handle concurrency very well. A retry of these failed flows, then works (thus at a time where there are no concurrency issues). What would be the best way to fix this. I do not find a way to enforce concurrency locks during this deployment phase.
    Copy code
    pull: &id001
          - prefect.deployments.steps.run_shell_script:
              id: ensure-directory
              script: mkdir -p /opt/prefect/repos && echo "Directory ready"
              stream_output: false
          - prefect.deployments.steps.set_working_directory:
              directory: "/opt/prefect/repos"
          - prefect.deployments.steps.git_clone:
              id: clone_step
              repository: <https://github.com/MY_ORG/bess-prefect.git>
              branch: "{{ $PREFECT_VAR_deployment_branch }}"
              access_token: "{{ prefect.blocks.secret.github-access-token }}"
          - prefect.deployments.steps.set_working_directory:
              directory: "/opt/prefect/repos/{{ clone_step.directory }}"
          - prefect.deployments.steps.pip_install_requirements:
              requirements_file: requirements.txt
              stream_output: false
  • a

    António Domingues

    08/08/2025, 7:46 AM
    Does anyone have experience calling R scripts or using code in prefect task? Any tips of things to look out for. And no, this task cannot be done with pure python.
    n
    • 2
    • 2
  • a

    Amith M

    08/09/2025, 5:50 AM
    hi, has anyone successfully deployed a
    .serve
    deployment on gcp. I have everything setup and the deployment gets the job and reports it back to the prefect server, but it never starts the job, it goes to "pending" state. It then goes back for listening for jobs again. the worker/runner never starts. and sometimes every other run, there is some form of httpx error in the logs. i have attached the logs
    n
    b
    • 3
    • 5
  • y

    Yaron Levi

    08/11/2025, 10:38 AM
    We just had many Flows crashing. Using Prefect Cloud + ECS push work pools
    n
    b
    • 3
    • 10
  • r

    Robyn H

    08/11/2025, 3:49 PM
    Hi #CL09KU1K7, I'm running a self-hosted server with PostgreSQL and Redis, testing starting a flow which tries to schedule tens of thousands of tasks concurrently as we want to evaluate using Prefect for a type of workflow which will run hundreds of thousands (up to millions) of tasks. One of those tasks in our use-case would have to have a concurrency limit as it queries an external system. I've set
    PREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS=1
    . In my test flow, if I add a concurrency limit on one task (tag-based), I notice the execution time of the flow increasing significantly. I'm testing it on a flow where I use asyncio to start many executions of task1 concurrently (I'm using the default task runner), and task1 just calls task2 which has a tag with a concurrency limit. Even if I set the number of slots in the concurrency limit to be higher than the number of tasks (e.g. 1000 tasks, 1001 slots), I see a significant runtime increase (from 7s to 33s, in my case). I assume this increased latency is due to database round trips and lock contention on acquiring slots, is this correct? In this test, we're not even waiting on slots to become free - there are always free slots (I set the slot value this high to verify that we're seeing significant latency despite not actually waiting for tasks to free up slots, as this was my suspicion). For e.g. 50k tasks, the execution time goes from 5min to 30min. What I'm wondering is if there is any way to optimize this so we don't see such major increases? Can I tune my database?
    b
    j
    • 3
    • 8
  • g

    Gabor Hosszu

    08/12/2025, 8:08 AM
    Hi guys, do you know if the prefect developers gave up on ControlFlow? I see the last release was on Feb 6 and they seem to be pushing FastMCP instead. I really liked the approach with ControlFlow which just builds on the serverless / stateless paradigm of Prefect, what happened?
    n
    • 2
    • 10
  • d

    David Martin Calalang

    08/12/2025, 3:28 PM
    So I've been trying to deploy my flow code to an existing work pool. However, I get an error stemming from
    Copy code
    requests.exceptions.HTTPError: 500 Server Error: Internal Server Error for url: <http+docker://localnpipe/v1.41/images/{ecr_registry}/{image}/push?tag=latest>
    Has anyone else encountered this in the past? My .yaml configuration for the build and push steps are below...
    Copy code
    # build section allows you to manage and build docker images
    build:
    - prefect_docker.deployments.steps.build_docker_image:
        id: build_image
        requires: prefect-docker>=0.3.1
        image_name: {ecr_registry}/{image}
        tag: latest
        dockerfile: Dockerfile
    
    # push section allows you to manage if and how this project is uploaded to remote locations
    push:
    - prefect_docker.deployments.steps.push_docker_image:
        requires: prefect-docker>=0.3.1
        image_name: '{{ build_image.image_name }}'
        tag: '{{ build_image.tag }}'
    I've made sure that my AWS user has the correct permissions, and that I've logged into ecr prior to deployment.
    b
    • 2
    • 3
  • p

    pavel hrablis

    08/12/2025, 5:10 PM
    Hi #CL09KU1K7! I’m just starting to work with Prefect 3 and I’ve run into a task for which, as far as I can tell, there is no built-in mechanism — defining dependencies between deployments. My pipeline has three stages: 1. Parsers are launched for a set of datasets (there can be ~40). 2. The result is a list of which datasets were successfully collected and which were not. 3. Failed ones need to be retried (run the parser again for those datasets), and successful ones move to the next step. 4. For each successfully collected dataset, a converter (separate flow) is run. 5. In the third stage, "jobs" are launched that may depend on several datasets from different providers. 6. For example, a single job may depend on datasets #1 and #30 — only when both are converted should the job be launched. The problem: how do I restart this without rerunning everything? • I need to be able to re-run only the failed parsers. • Converters should start only for newly successful parsers. • Jobs should run only when all their dependencies are ready. In Prefect 3 I haven’t found a built-in way to define such dependencies between deployments. Right now I’m thinking about writing my own "orchestrator" flow that triggers deployments via the API and monitors their status, but that feels rather heavy. I considered something like this with
    asyncio
    (for example ): import asyncio import random from typing import AsyncGenerator, Callable, Awaitable async def run_jobs_async(jobs: list[Callable[[], Awaitable]]) -> AsyncGenerator: tasks = [asyncio.create_task(job()) for job in jobs] for completed in asyncio.as_completed(tasks): result = await completed yield result async def sample_job(i: int) -> str: await asyncio.sleep(random.random() * 2) # simulate work return f"Job {i} done" async def main(): jobs = [lambda i=i: sample_job(i) for i in range(5)] async for outcome in run_jobs_async(jobs): print(outcome) if name == "__main__": asyncio.run(main()) If I implement my logic in this way, will I lose key Prefect benefits — such as automatic logging, dependency tracking, the ability to restart only parts of the pipeline, etc.? Or is there a recommended way to implement my use case in Prefect 3 so that I can keep those benefits?
    b
    • 2
    • 1
  • a

    Andres Mora

    08/13/2025, 11:58 AM
    Hello #CL09KU1K7 I have setup a Webhook that will receive notifications from AWS SNS. The notifications will be about new files being created in a S3 Bucket. I want to capture the name of the new files created in the Bucket using the Webhook's template. The body of the notification looks like this (I've removed some fields for simplicity and formatted it for clarity):
    Copy code
    {
    'Type': 'Notification',
    'MessageId': 'd4aa09a0-6d51-5219-bcb2-68b12345bd7b',
    'TopicArn': 'arn:aws:sns:us-east-1:1234567890:sns_topic',
    'Subject': 'Amazon S3 Notification',
    'Message': '{
      "Records":
        [
          {
            "eventVersion":"2.1",
            "eventSource":"aws:s3",
            "eventTime":"2025-08-12T16:25:24.066Z",
            "s3":{"s3SchemaVersion":"1.0","configurationId":"New file",
              "bucket":{"name":"testing.data","arn":"arn:aws:s3:::testing.data"},
              "object":{"key":"data_file.csv","size":34}
            }
          }
        ]
      }',
    'Timestamp': '2025-08-12T16:25:24.883Z',
    }
    I want to capture the object key "_data_file.csv_" but i haven't been able to. What I've tested is: In the Webhook's template, if I extract, for example, the
    MessageId
    like this:
    "messageId": "{{ body.MessageId }}"
    There is no issue. This works fine. But if I try to extract something from inside the
    Message
    of the body using something like this:
    "file_name": "{{ body.Message.Records[0].s3.object.key }}"
    Nothing happens, file_name ends up being empty. I thought that maybe the whole
    Message
    was being parsed as a single string so I did this just to test:
    "file_name": "{{ body.Message }}"
    This made the Webhook fail with this error: "_unexpected character: line 8 column 23 (char 425)_" It seems like I just cannot access
    Message
    , not even as a string(?) Does anyone understand this issue and know how to extract the value I'm trying to obtain? Thanks for the help!
    b
    • 2
    • 3
  • k

    Kiran

    08/13/2025, 12:37 PM
    @Marvin does this code work?from prefect import get_client from prefect.deployments import run_deployment import asyncio async def trigger_flow(): # Trigger deployment flow_run = await run_deployment( name="my_flow/my_deployment", # flow/deployment parameters={"client": "S0011I"} # your params ) print("Flow run triggered:", flow_run.id) return flow_run.id async def check_flow_once(flow_run_id): async with get_client() as client: flow_run = await client.read_flow_run(flow_run_id) print("Flow run state:", flow_run.state_name) if flow_run.state_type.value == "CRASHED": print("Flow crashed — you can retry now.") elif flow_run.state_type.value == "COMPLETED": print("Flow completed successfully.") else: print("Flow not in a final state yet.") async def main(): # Step 1: Trigger flow_run_id = await trigger_flow() # --- At this point you could stop the script --- # Later, when you want to check: await check_flow_once(flow_run_id) asyncio.run(main())
    m
    • 2
    • 8
  • a

    Adrien Besnard

    08/13/2025, 1:02 PM
    @Marvin I'm using
    prepare_flow_for_deployment
    within a CLI to deploy some flows. How to avoid the
    attempted relative import beyond top-level package
    issue if my flow imports some stuff from other relative packages?
    m
    • 2
    • 17
  • y

    Yaron Levi

    08/13/2025, 3:09 PM
    Hi 👋 We want to limit the access to our Github account only for specific IPs. We are using Prefect Cloud. Is there any static list of IPs that Prefect Cloud uses when it pulls our code from Github? Thank you.
    ✅ 1
    j
    • 2
    • 8
  • t

    Tri

    08/13/2025, 7:10 PM
    hi everyone, I have external executable that needs to be scheduled to run by Prefect. What is the best way to go about this? so far I've used python subprocess.Popen to call it, but I have to manually stream the stdout for logging, that's very clunky. I saw
    prefect-shell
    and the
    shell operation block
    , are they better suited for calling external applications?
    b
    • 2
    • 2