https://prefect.io logo
Join Slack
Powered by
# ask-community
  • s

    Simon Ouellette

    10/23/2025, 4:13 PM
    Hi, I'm running Prefect on a RunPod instance. When I'm running a task that takes a bit of time (say, 20+ minutes), sometimes I get a worker pool crash: + Exception Group Traceback (most recent call last): | File "/usr/local/lib/python3.11/dist-packages/prefect/cli/_utilities.py", line 44, in wrapper | return fn(*args, **kwargs) | ^^^^^^^^^^^^^^^^^^^ | File "/usr/local/lib/python3.11/dist-packages/prefect/cli/_types.py", line 156, in sync_fn | return asyncio.run(async_fn(*args, **kwargs)) | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | File "/usr/lib/python3.11/asyncio/runners.py", line 190, in run | return runner.run(main) | ^^^^^^^^^^^^^^^^ | File "/usr/lib/python3.11/asyncio/runners.py", line 118, in run | return self._loop.run_until_complete(task) | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | File "/usr/lib/python3.11/asyncio/base_events.py", line 654, in run_until_complete | return future.result() | ^^^^^^^^^^^^^^^ | File "/usr/local/lib/python3.11/dist-packages/prefect/cli/worker.py", line 168, in start | await worker.start( | File "/usr/local/lib/python3.11/dist-packages/prefect/workers/process.py", line 158, in start | async with self as worker: | File "/usr/local/lib/python3.11/dist-packages/prefect/workers/process.py", line 320, in aexit | await super().__aexit__(*exc_info) | File "/usr/local/lib/python3.11/dist-packages/prefect/workers/base.py", line 1600, in aexit | raise exceptions[0] from None | File "/usr/lib/python3.11/contextlib.py", line 728, in aexit | cb_suppress = await cb(*exc_details) | ^^^^^^^^^^^^^^^^^^^^^^ | File "/usr/local/lib/python3.11/dist-packages/prefect/runner/runner.py", line 1610, in aexit | await self._exit_stack.__aexit__(*exc_info) | File "/usr/lib/python3.11/contextlib.py", line 745, in aexit | raise exc_details[1] | File "/usr/lib/python3.11/contextlib.py", line 728, in aexit | cb_suppress = await cb(*exc_details) | ^^^^^^^^^^^^^^^^^^^^^^ | File "/usr/local/lib/python3.11/dist-packages/anyio/_backends/_asyncio.py", line 736, in aexit | raise BaseExceptionGroup( | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception) +-+---------------- 1 ---------------- | Exception Group Traceback (most recent call last): | File "/usr/local/lib/python3.11/dist-packages/prefect/workers/process.py", line 162, in start | async with anyio.create_task_group() as loops_task_group: | File "/usr/local/lib/python3.11/dist-packages/anyio/_backends/_asyncio.py", line 736, in aexit | raise BaseExceptionGroup( | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception) +-+---------------- 1 ---------------- | Traceback (most recent call last): | File "/usr/local/lib/python3.11/dist-packages/prefect/utilities/services.py", line 64, in critical_service_loop | await workload() | File "/usr/local/lib/python3.11/dist-packages/prefect/workers/base.py", line 1107, in sync_with_backend | await self._update_local_work_pool_info() | File "/usr/local/lib/python3.11/dist-packages/prefect/workers/base.py", line 1004, in _update_local_work_pool_info | work_pool = await self._client.create_work_pool(work_pool=wp) | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | File "/usr/local/lib/python3.11/dist-packages/prefect/client/orchestration/_work_pools/client.py", line 478, in create_work_pool | response = await self.request( | ^^^^^^^^^^^^^^^^^^^ | File "/usr/local/lib/python3.11/dist-packages/prefect/client/orchestration/base.py", line 53, in request | return await self._client.send(request) | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | File "/usr/local/lib/python3.11/dist-packages/prefect/client/base.py", line 379, in send | response.raise_for_status() | File "/usr/local/lib/python3.11/dist-packages/prefect/client/base.py", line 163, in raise_for_status | raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.cause | prefect.exceptions.PrefectHTTPStatusError: Client error '404 Not Found' for url 'https://rdzs18c4k9ub52-4200.proxy.runpod.net/api/work_pools/' | For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/404 What can be causing this?
  • b

    Ben

    10/23/2025, 4:20 PM
    Hey community and @Marvin - I have a question about running tasks concurrently. I have a flow that I want to execute tasks concurrently, but I also need to access the data returned from those downstream tasks. My current solution is a synchronous
    for loop
    . E.g.
    Copy code
    for object in objects:
      data = tasks.operation1(data=object)
      tasks.operation2(data=data)
    I'm reading the docs which mention using either
    .submit()
    or
    .map()
    to run tasks concurrently, however i'm not sure how to access the data returned and pass them to a secondary task (which can also be run concurrently if possible)
    m
    • 2
    • 28
  • s

    skrawczyk

    10/23/2025, 5:24 PM
    Does anyone know of a way to subscribe to perfect version changelog updates?
    n
    • 2
    • 1
  • j

    Janet Carson

    10/24/2025, 5:45 PM
    Does anyone know if the prefect slack integrations can be used with gov-slack?
    n
    • 2
    • 2
  • m

    Marcelo de Souza

    10/24/2025, 8:07 PM
    Can someone please share a contact of a Prefect SalesRep?
    n
    • 2
    • 1
  • m

    Mitch Zink

    10/24/2025, 10:52 PM
    Hi, I have prefect enterprise and need to talk to an account manager about our contract. Can someone connect me?
    a
    • 2
    • 1
  • t

    Tushar Mahajan

    10/25/2025, 8:15 PM
    Hi, I am using self hosted prefect server/workers via helm charts. I am using
    kubernetes
    type work pool and trying to pass env variables using the method described here (via prefect yaml) -> https://github.com/zzstoatzz/prefect-pack/blob/4cfc53ae94c6bd65ff0eefcb1c5ce72b48446179/prefect.yaml#L106 But I always get env variable not found error, anything I might be missing here ?
    n
    • 2
    • 2
  • j

    Jeff Rose

    10/28/2025, 5:58 PM
    Hey all, wondering if anyone else is experiencing issues with duplicate flow runs intermittently showing up? Is prefect working on a fix for this? https://github.com/PrefectHQ/prefect/issues/18523 https://github.com/PrefectHQ/prefect/issues/18894
  • b

    Ben

    10/30/2025, 9:23 PM
    Hey team, I am trying to add external storage to to store the results of my tasks and flows. Although it seems to be working locally, when I try to deploy a flow using
    prefect deploy
    , I get this error:
    Copy code
    prefect.exceptions.ScriptError: Script at 'src/providers/Tapology/flows/data_ingestion.py' encountered an exception: TypeError('Result storage configuration must be persisted server-side. Please call `.save()` on your block before passing it in.')
    An exception occurred.
    The thing is that I have created the storage block directly in the Prefect web UI, and I'm loading it in my code, something like this:
    Copy code
    from prefect import flow
    from prefect.filesystems import RemoteFileSystem
    from prefect.serializers import JSONSerializer
    from prefect.futures import as_completed
    from prefect.task_runners import ThreadPoolTaskRunner
    
    # Load the storage block you saved previously
    r2_storage = RemoteFileSystem.load("r2-persistent-storage")
    
    # Choose serializer for your payloads
    json_ser = JSONSerializer()
    
    @flow(
        log_prints=True,
        task_runner=ThreadPoolTaskRunner(max_workers=3),
        persist_result=True,
        result_storage=r2_storage,
        result_serializer=json_ser,
    )
    def my_flow_function():
        [...]
    Another thing to note is that this code is working for already existing deployments, the error is only thrown when trying to deploy a new deployment. Any idea whats going on?
    m
    • 2
    • 6
  • t

    Tom Collom

    10/31/2025, 8:32 AM
    Hello @Marvin I am getting started with self-hosted and have deployed worker and server to docker etc and all works! I can deploy simple scripts with no dependencies using the below code, however - I cannot anywhere find how to extend this to add in my project dependencies? E.g how do i tell prefect to install either my requirements.txt or simply specify what the dependencies are? I notice there is a yaml version of deployment that might enable this, but this seems long winded and i have set everything up below so wanted to ideally just extend the below? if name == "__main__": flow.from_source( source=f"https://{access_token}@github.com/tom/{repo}.git", entrypoint=entrypoint, ).deploy( name=pipeline_name, work_pool_name="local-pool", )
    b
    m
    • 3
    • 17
  • m

    Miguel Moncada

    10/31/2025, 9:23 AM
    @Marvin I have a flow deployed to a k8s work pool, the flow is crashing and from the pod logs I can see this error:
    Copy code
    Unable to authenticate to the event stream. Please ensure the provided api_key or auth_token you are using is valid for this environment. Reason: Actor is not authenticated
    Do you know what could be going on?
    m
    • 2
    • 2
  • p

    Pierre L

    10/31/2025, 10:57 AM
    @Marvin, I am encountering an error similar to the one described here when my flow is deployed on Prefect server OSS (latest version) installed with helm on a managed kubernetes cluster. The only difference with this question is that the parameter that doesn't pass for me is a
    date.date
    object. Same error when using a
    datetime.datetime
    . Here is my exact error :
    Copy code
    Task run failed with exception: PrefectHTTPStatusError('Client error \'409 Conflict\' for url \'<http://prefect-server.prefectoss.svc.cluster.local:4200/api/deployments/d6150396-f839-4084-8091-401f5a6c53ca/create_flow_run>\'\nResponse:
     {\'detail\': "Error creating flow run: Validation failed for field \'time_start\'. Failure reason: 1704067200.0 is not of type \'string\'"}\n
    The bug doesnt occur when using Prefect cloud. Why ?
    m
    n
    • 3
    • 8
  • a

    Ahmad Bilal Khalid

    11/03/2025, 12:01 PM
    @Marvin Prefect 3.4.21 broke deployment parameters for me. I created an issue on Github as well, but posting it here because I suspect I might be doing something wrong as no one has pointed it out. etl.py
    Copy code
    from datetime import date
    
    from workflows.flows.etl import extraction_and_transformation
    
    if __name__ == "__main__":
        extraction_and_transformation.serve(
            name="Extraction Transformation Loading",
            parameters={
                "source_name": "ABC",
                "database_export_date": date(2025, 8, 27),
                "bucket_name": "data-migration",
            },
        )
    workflows/flows/etl.py
    Copy code
    @flow(name="Data Extraction and Transformation")
    def extraction_and_transformation(
        source_name: str, database_export_date: date, bucket_name: str
    ):
        from prefect.runtime import flow_run, deployment, task_run
    
        print("prefect version", prefect.__version__)
        print("Deployment Parameters", deployment.parameters)
        print("Flow Run Parameters", flow_run.parameters)
        print("Task Run Parameters", task_run.parameters)
        print("Flow Run Parent Deployment ID", flow_run.parent_deployment_id)
    Copy code
    prefect version 3.4.20
    Deployment Parameters {'bucket_name': 'data-migration', 'source_name': 'ABC', 'database_export_date': '2025-08-27'}
    Flow Run Parameters {'source_name': 'ABC', 'database_export_date': datetime.date(2025, 8, 27), 'bucket_name': 'data-migration'}
    Task Run Parameters {}
    Copy code
    prefect version 3.4.21
    Deployment Parameters {}
    Flow Run Parameters {'source_name': 'ABC', 'database_export_date': datetime.date(2025, 8, 27), 'bucket_name': 'data-migration'}
    Task Run Parameters {}
    m
    n
    • 3
    • 19
  • m

    Mattia Sappa

    11/04/2025, 10:22 AM
    Hello All! Have you seen the error
    ERROR | prefect.server.events.services.event_persister - Error trimming events and resources
    ?
    🙅‍♂️ 1
    n
    • 2
    • 7
  • j

    José Agretti

    11/04/2025, 3:20 PM
    Hey team, am I the only one that finds the https://github.com/PrefectHQ/prometheus-prefect-exporter metrics hard to use? They seem buggy, and I'm not able to have useful dashboards. Important to mention that I'm using GCP-managed Prometheus, but still should be pretty much the same
  • r

    raj

    11/05/2025, 11:58 AM
    @Marvin httpx.LocalProtocolError: Invalid input ConnectionInputs.RECV_PING in state ConnectionState.CLOSED I am ettign this error can anyone help me with that .
    m
    • 2
    • 2
  • r

    raj

    11/05/2025, 12:00 PM
    @Marvin ❗ Issue Summary: The outage was caused by an unexpected use of an HTTP/2 protocol feature (
    RECV_PING
    ) on a closed connection
    , which is invalid according to the HTTP/2 protocol specification. At the core, the *`httpx` client*—used internally by *Prefect's Cloud API client*—attempted to send a PING frame (
    RECV_PING
    ) on an HTTP/2 connection that had already transitioned to a
    CLOSED
    state
    . This caused the underlying
    httpcore
    transport layer
    to raise a
    LocalProtocolError
    due to the mismatch in expected connection state. 🧾 Root Cause Analysis (RCA): • The system attempted to reuse an HTTP/2 connection that had already been closed, leading to an invalid protocol state. • This is most likely due to a stale connection being kept in the async connection pool or a bug in the connection lifecycle management by
    httpcore
    . • Prefect’s cloud client attempted communication over this dead connection, resulting in failure.
    m
    • 2
    • 3
  • ø

    Øyvind Monsen

    11/05/2025, 3:34 PM
    @Marvin we are strugling a lot with error renewing concurrency release for longer running flows. It typically occurs after a somewhat heavy task involving both some CPU bound stuff with lots of async IO. We are using the default task runner, and mostly just awaiting an asyncio gather for several tasks. We updated the PREFECT_RUNNER_HEARTBEAT_FREQUENCY=30 and I can confirm that even during the longer task it is sending frequent heartbeats. However after the long task, consistently we get error renewing the lease when it switches to a new task. For context we are on prefect 3.4.25 now (have tested several versions), and we have a global limit on the deployment (to avoid hitting resource limits on number of containers) We run deployments on azure container instances. They have 1CPU available each
    m
    • 2
    • 8
  • a

    Arthur Ren

    11/05/2025, 5:23 PM
    @Marvin can you give me a example of using .deploy() method to deploy a flow targeting a prefect k8 worker with customization on the k8 job template
    m
    • 2
    • 9
  • s

    Slackbot

    11/06/2025, 11:34 AM
    This message was deleted.
    m
    • 2
    • 3
  • s

    Sergio Luceno

    11/06/2025, 11:38 AM
    @Marvin The problem: We are running server side prefect with k8s oficial helm charts. the problem we are trying to solve is the following: • Our flows are simple, they start, perform http request (can take long to respond) and stop. • We are running kubernetes workers, so we create a new pod for every execution. Every pod consumes arround 300MB of RAM • In other words, we are currently using prefect to run background jobs that we could run with Celery • It's quite expensive to us if every job, takes 300MB of RAM. What we have looked into: We've looked at alternatives, like using prefect background tasks: https://docs.prefect.io/v3/advanced/background-tasks But we are missing primitives to tell concurrency and so on to background tasks. Our new proposal: Instead of using a kubernetes worker, just use a process worker for this workload. This worker will run flows as subprocesses, we can scale the worker by CPU/RAM if that's a problem. We can benefit from deployment definitions, concurrency, and all the things. I have the impression this is a better solution for our specific case. In the future, if we have a different workload that we want to execute one flow in one kubernetes pod, we can create a new kubernetes worker for it. Is this solution an antipattern? are we doing something wrong? It seems process worker pools are meant for development or try things in local
    m
    j
    • 3
    • 4
  • v

    Vít Brunner

    11/06/2025, 1:34 PM
    Hi, I'm a newb, using Prefect 3.4.24, self-hosting the server. While task caching works, for a task whose result has been taken from the cache, I can't quite say where the result is coming from: in the UI, the "Cache Key" in the "task-run" details is always empty (for both "Completed" tasks that have created a cache entry and "Cached" tasks which were taken from the cache). Halp please? (@Marvin?)
    m
    • 2
    • 3
  • a

    Arthur Ren

    11/07/2025, 2:34 AM
    I’m trying to setup a basic hello-world prefect docker worker but when worker spin up the container it throws a 401 error inside the container as it attempt to call /api/flow_runs/{flow_run_uuid}, I confirmed the worker process the sufficient credential to call prefect server but why can’t the container (@Marvin)
    m
    • 2
    • 9
  • k

    Kiran

    11/07/2025, 5:49 AM
    @Marvin how long data does the prefect-promethus exporter store in itssanapshot
    m
    • 2
    • 2
  • p

    Pierre L

    11/07/2025, 9:41 AM
    Hi @Marvin I am considering using the dragonfly k8s operator as a replacement for bitnamilegacy/redis in Prefect server OSS. Has it ever been tested ? Will it work ?
    m
    • 2
    • 2
  • g

    Giacomo Chiarella

    11/07/2025, 2:26 PM
    Hi everyone! I have a deprecation warning The state property of PrefectFuture is deprecated and will be removed in a future release. If you are subclassing PrefectFuture, please implement the state property in your subclass or subclass PrefectTaskRunFuture instead. what I am doing is
    my_task = task.submit()
    wait for it to finish and
    my_task.state
    how should I change the last statement to solve the warning? Same question related to my_task.task_run_id
    • 1
    • 2
  • a

    Amrit Amar

    11/07/2025, 4:56 PM
    Hi everyone! My team and I are looking into using Prefect to manage our workflows on AWS. I saw the
    prefect-aws
    library and I'm excited to start using it, however most of the patterns for running flows seem to evolve around using another EC2 instance. I was wondering if it was possible to have a Prefect server as an orchestrator (on Fargate/EC2) that would run defined flows and deploy a 'task' on a lambda (I would have already configured the lambda, just need to send a task to it and then retrieve results to pass onto the next task which would be another lambda call). I see that lambda invoke that can happen 'within' a task but is it possible deploy the entire task as a lambda itself rather than invoking it from the main prefect instance? Also, will the workers, if invoking a lambda from the main instance, actually wait the full 15 minutes for a response from the lambda or do I need to poll the status of the lambda job with another task? And is this scalable to having 1 prefect server instance with 4 defined flows having 10s-100s of concurrent flows running? My main use case is to have an orchestrator that, upon running a flow with defined tasks, will send info to a lambda, and then get the result from it to pass it on to the next task until the end of a flow. Thank you! (@Marvin)
    m
    n
    • 3
    • 21
  • q

    Quinton Nickum

    11/07/2025, 9:46 PM
    Hi all, Wondering how assets scale for large numbers of files - simplifying a ton here, but my team has multiple separate files in S3 (think about 10 or so files some as images) that represent different components of a single piece of data at different stages (identified by a single uuid e.g. uuid-a/file1, uuid-a/file2.......). Anyone implement something like this before? Are there limits on the number of assets Prefect is able to track? Standard ways to handle cases like this? UI limitations in Prefect cloud? Scaling concerns with events being emitted?
  • a

    Aaron

    11/08/2025, 11:56 AM
    Does this dev/prod setup make sense? (I’ve no experience setting something like this up and limited prefect experience). We're using Prefect Server hosted on a VM. 1. Two VMs in GCP, each with their own Prefect server instance running (one for Prod, one for Dev). 2. On each VM's prefect instance, the variable "Environment" is set. For the dev VM instance, it will be set to "DEV" and in prod, "PROD" 3. Each pipeline we have will read in this variable at the start, and there will be some conditional logic basically that will decide what target tables, databases, etc to look at based on whether it's DEV or PROD. We'll have it read from the dev prefect instance when developing on our local machines by setting PREFECT_API_URL env variable on to the correct api endpoint of the dev instance. 4. Developers work on feature branches, test locally, and then merge into a development branch in Github. A Github actions job runs our deployment.py script (where all our deployments are written) to deploy to prefect server (setting PREFECT_API_URL environment variable appropriately before running the script) 5. When development branch is merged into main, a second Github actions script runs which does the same as the first, except this time it changes PREFECT_API_URL to the prod instance of prefect, so that when the deployments script runs, everything gets deployed there. EDIT: actually I may only need one branch in GitHub. Once a feature branch merges into main, the GitHub actions script deploys to both dev & prod
    n
    • 2
    • 7
  • r

    Riya Sinha

    11/08/2025, 7:33 PM
    sorry, is anyone else having trouble launching subflows when running on prefect managed infrastructure / prefect cloud? i have an hourly flow that launches some subflows, and that used to work, but for the past 12 hours the prefect python api is giving an http read error when trying to launch the subflows eg:
    Copy code
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/usr/local/lib/python3.12/site-packages/prefect/flow_engine.py", line 1079, in create_flow_run
        return await client.create_flow_run(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/usr/local/lib/python3.12/site-packages/prefect/client/orchestration/_flow_runs/client.py", line 568, in create_flow_run
        response = await self.request("POST", "/flow_runs/", json=flow_run_create_json)
    full trace in thread to avoid large message — just wanted to ask if this is an issue others are currently facing recently
    n
    • 2
    • 3