https://prefect.io logo
Join Slack
Powered by
# ask-community
  • m

    Melek Alan

    08/28/2025, 12:41 PM
    Hi everyone! After upgrading our Prefect Helm chart from 2024.12.11173517 to 2025.8.21160848, we noticed that when
    replicaCount=2
    the same flow run is picked by multiple workers. This never happened before the upgrade. Is this a change in worker behavior, or a bug? What’s the recommended way to prevent multiple workers from claiming the same run? Thanks!
    m
    • 2
    • 1
  • s

    Slackbot

    08/28/2025, 2:23 PM
    This message was deleted.
    m
    • 2
    • 2
  • v

    Victor

    08/28/2025, 2:25 PM
    my deployment is not using one more pool that's available to it and I am unable to unset default pool from deployment. If I deploy again using yaml, it auto adds one of the pool as default pool.
  • v

    Victor

    08/28/2025, 2:32 PM
    is there a no way to deploy a flow without attaching it to a worker pool? so that I can keep adding more worker pool to provide more concurrent executions easily if a deployment is not associated with a particular pool. right now I added another worker pool but tasks are still stuck waiting in first pool. any help is appreciated!
    n
    • 2
    • 8
  • v

    Victor

    08/28/2025, 6:07 PM
    I added a temp worker to pool due to high demand. how can I check if this new worker is actually help or if any tasks are going into this new worker? both workers are on two different systems.
    k
    • 2
    • 4
  • s

    Samuel Hinton

    08/28/2025, 11:35 PM
    Hi everyone! I'm having a bit of an issue using
    run_deployment
    which I worry might be a bug and not user error. The summary is that I'm scheduling flow runs on NERSC via slurm, which means the flow is often in pending for many minutes before slurm allocates resources. The
    timeout
    parameter is kept to None, but it seems that if the poll then it errors. Ie in my parent flow logs I can see:
    Copy code
    PrefectHTTPStatusError("Server error '500 Internal Server Error' for url '<http://prefect.prefect-pipelines.production.svc.spin.nersc.org/api/flow_runs/9418ee41-b55c-40ee-a7cb-879df77a766a>'\nResponse: {'exception_message': 'Internal Server Error'}\nFor more information check: <https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/500>")
    Which comes from
    File "/usr/local/lib/python3.13/site-packages/prefect/deployments/flow_runs.py", line 203, in run_deployment
    and
    flow_run = await client.read_flow_run(flow_run_id)
    When I wait a minute for the job to be scheduled, I can hit this endpoint perfectly:
    Copy code
    {
      "id": "9418ee41-b55c-40ee-a7cb-879df77a766a",
      "created": "2025-08-28T23:20:39.035659Z",
      "updated": "2025-08-28T23:20:43.784032Z",
      "name": "preprocess_/data/level=raw/runs/run_id=25_056_084/science_red.fits",
      "flow_id": "344eea82-0256-46b2-bdc2-615a21be48e6",
      "state_id": "0198f2fb-c55b-7ab9-aab6-971eb870d3ff",
      "deployment_id": "c29d351e-ee20-4267-93ee-d157b91f1d6b",
      "deployment_version": "104f2d07",
      "work_queue_id": "b0ac5ee7-ce2f-47ed-8f6d-7ddce9c0a1b0",
      "work_queue_name": "default",
    ...
    }
    So I feel like
    run_deployment
    shouldn't be raising an error in this instance. Any devs around to share their thoughts? Specifically, the code in `run_deployment`:
    Copy code
    with anyio.move_on_after(timeout):
            while True:
                flow_run = await client.read_flow_run(flow_run_id)
                flow_state = flow_run.state
                if flow_state and flow_state.is_final():
                    return flow_run
                await anyio.sleep(poll_interval)
    Uses timeout to wait for flow completion, but it does seem to assume relatively immediate flow registration
  • m

    Matt Alhonte

    08/29/2025, 12:00 AM
    Looking at the new pricing model - does Workflows mean 1 Flow (ie, the code) or 1 Deployment (Code + Resources)? So like if I have 1 Flow that has 5 different versions depending on how big an AWS Instance I want, is that 1 Workflow or 5 Workflows for the purposes of the Billing Tiers?
    k
    • 2
    • 4
  • t

    Tom Han

    08/29/2025, 4:09 AM
    Hi! Has anyone else used the SMB block with Prefect 3.x with success? I'm trying to implement it but facing issues similar to this GitHub Issue: https://github.com/PrefectHQ/prefect/issues/10694 Where I get a fsspec/smbprotocol error like this:
    Copy code
    File "...\.venv\lib\site-packages\smbprotocol\transport.py", line 68, in connect
        raise ValueError(f"Failed to connect to '{self.server}:{self.port}': {err}") from err
    ValueError: Failed to connect to 'my.smb.serber.edusmb::445': [Errno 11001] getaddrinfo failed.
    When I PDB'd into the stack trace it is the same
    _as_unc_path
    double appending server as observed in the issue: In `prefect.filesystem.SMB`:
    Copy code
    # SMB.writepath calls RemoteFileSystem.writepath
    # Around line 520
    path = self._resolve_path(path)
    dirpath = path[: path.rindex("/")]
    # dirpath is <smb://my.smb.server.edu/path/to/my/folder>
    self.filesystem.makedirs(dirpath, exist_ok=True)
    But in `fsspec`:
    Copy code
    #fsspec/implementation/smb.py
    class SMBFileSystem()......
    def makedirs(self, path, exist_ok=False):
            # dirpath prefect passes in already contains `<smb://my.smb.server.edu>`
            if _share_has_path(path):
                wpath = _as_unc_path(self.host, path) # <=== This line double appends it
                # now this line would fail because 
                # wpath = \\my.smb.server.edusmb:\\<http://my.smb.server.edu|my.smb.server.edu>\path\to\my\folder
                smbclient.makedirs(wpath, exist_ok=exist_ok, port=self._port)
    Has there been any workarounds? Thanks!
    m
    • 2
    • 3
  • s

    Samuel Hinton

    08/29/2025, 6:02 AM
    Hey @Marvin what would be the best way to upload a
    json
    artifact to Prefect?
    m
    n
    • 3
    • 12
  • i

    Idriss Bellil

    08/29/2025, 10:57 AM
    Hello! I have two questions: 1. is it possible to to set a custom retry logic per deployment? I am deploying using Python to prefect cloud & using AWS ECS & ECR. Marvin suggested
    my_flow.with_options(retries=100).deploy(...)
    I tested it and I didn't work then Marvin retracted that. 2. (more like a suggestion) it will be nice to see the full name of the deployment and not the dots suffix like
    project-1_flow-x_...
    (as the name has valuable info for us) or at least have an option to display the full deployment name. Another missing info in the UI from what I could see is flow/deployment options (retry logic for instance because I could only see it in the deployment run - like I have to run it then cancel it just to see the setup retry logic)
    n
    • 2
    • 3
  • m

    Michael Schwartz

    08/31/2025, 12:52 AM
    Hi all- I have the following automation in prefect:
    Copy code
    {
      "type": "event",
      "match": {},
      "match_related": {
        "prefect.resource.role": "flow-run"
      },
      "after": [],
      "expect": [
        "cdf.publish_audited_tables"
      ],
      "for_each": [],
      "posture": "Reactive",
      "threshold": 1,
      "within": 0
    }
    the problem I'm having is it consistently triggers the flow it runs 3 times, each within ~30 minutes of each other. What might be causing this? the event is only emitted once @Marvin
    m
    • 2
    • 3
  • a

    ASHIK

    09/01/2025, 5:27 AM
    Hello All, I would request prefect team to release a functionality on retention of meta data Lets say if i configure 7 it should only store last 7 days data inside the prefect db For our use case the metadata is becoming few 100 gbs in a month
  • v

    Vic

    09/01/2025, 8:08 AM
    Hi Prefect Team, we are currently evaluating open-source workflow orchestration tools, Prefect and Windmill are in our shortlist. For the time being, we wish to use the OSS version - neither our size nor our use case would call for the supported enterprise version. One big drawback of Prefect, in our view, is the missing webhook feature in OSS. In November 2024, @Nate wrote here that "_*we do not yet support webhooks in OSS, though we'd like to add this soon!*_" (https://linen.prefect.io/t/23443245/hello-all-is-creating-a-custom-webhook-supported-on-the-loca#f61b408e-be28-401d-808a-83e673626f8e). 9 months have passed since then so I wanted to inquire if this is still something you're planning to do. As far as I was able to see, it was not made part of version 3 which was released only very recently, so the plan may have changed.
    n
    • 2
    • 3
  • m

    Morten Hauge

    09/01/2025, 11:27 AM
    Does anyone have any recommendations on how to do Dependency Injection in Prefect? Ideally I'd like to be able to swap out underlying data interfaces i.e. with in-memory versions without having to resort to import path-level patching. Is there anything similar to FastAPI's dependency injection system available?
  • m

    Michael Schwartz

    09/01/2025, 11:29 AM
    Hi all- I have the following automation in prefect:
    Copy code
    {
      "type": "event",
      "match": {},
      "match_related": {
        "prefect.resource.role": "flow-run"
      },
      "after": [],
      "expect": [
        "cdf.publish_audited_tables"
      ],
      "for_each": [],
      "posture": "Reactive",
      "threshold": 1,
      "within": 0
    }
    the problem I'm having is it consistently triggers the flow it runs 3 times, each within ~30 minutes of each other. What might be causing this? the event is only emitted once
  • k

    Kiran

    09/02/2025, 10:43 AM
    hey @Marvin, if I am running a flow-run with some set of pramters , let's say one of the paramters give is 'today' , iput the run at 11:30 PM , if i rerun the run at 12 AM, wich is teh next day, I wnat the run to be run for yesterday only for manual retries, how to do thath
    m
    • 2
    • 2
  • s

    Stefan

    09/02/2025, 12:46 PM
    Hi 🙂 Is there planned an update to the UI for the OSS version the flows-page that can be seen in Cloud?
    n
    • 2
    • 1
  • a

    Ahmad Bilal Khalid

    09/02/2025, 2:03 PM
    I have two flows A and B. A is parent flow and B is child flow. Both are using RayTaskRunner i.e
    Copy code
    RayTaskRunner(
            init_kwargs={
                "num_cpus": 5,  # No. of Performance cores in M3 Pro 11 Cores
                "runtime_env": {
                    "working_dir": ".",
                    "excludes": ["*.pyc", "*.pyo", ".ruff_cache", ".venv", ".git", "__pycache__"],
                    "worker_process_setup_hook": "backend.migrations.flows.utils.prepare_django_for_prefect",
                },
                "object_store_memory": 75 * 1024 * 1024,  # 75 MB of object store memory, we don't use it
            }
        )
    B returns
    ids: set[int]
    .
    A
    keep calling
    B
    in a while loop until it returns an empty set. The problem is I am getting the following error 3rd time
    B
    is being called. Previously, it was occurring on 2nd time
    AssertionError: The env var, __RAY_WORKER_PROCESS_SETUP_HOOK_ENV_VAR, is not permitted because it is reserved for the internal use.
    What could be the issue here?
  • d

    Denys Y

    09/02/2025, 3:27 PM
    Hi, when we try enable redis on prefect-server for running 2 paralell replicas, start getting next error for worker pools
  • d

    Denys Y

    09/02/2025, 3:27 PM
    Copy code
    prefect.exceptions.PrefectHTTPStatusError: Server error '500 Internal Server Error' for url '<http://prefect-server.prefect.svc.cluster.local:4200/api/work_pools/>'
    Response: {'exception_message': 'Internal Server Error'}
    For more information check: <https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/500>
    An exception occurred.
  • j

    Joe

    09/03/2025, 9:23 AM
    Hi, I'm struggling to log into prefect cloud I keep getting '*Code has expired or is invalid.'* even if i click it straight away. Is anyone else seeing a similar issue?
  • k

    Kiran

    09/03/2025, 12:11 PM
    hey @Marvin doe this code work
    Copy code
    from prefect import flow, get_run_context
    from datetime import date, timedelta, datetime
    
    def resolve_date(date_param: str, run_time: datetime) -> str:
        if date_param == "today":
            return str(run_time.date())
        elif date_param == "yesterday":
            return str((run_time - timedelta(days=1)).date())
        else:
            # assume it's already YYYY-MM-DD
            return date_param
    
    @flow
    def actual_flow(run_date: str):
        ctx = get_run_context()
    
        # resolve only once, based on expected_start_time (not retry time!)
        resolved_date = resolve_date(run_date, ctx.flow_run.expected_start_time)
    
        print(f"Running for resolved date: {resolved_date}")
    m
    • 2
    • 2
  • m

    Mitch

    09/03/2025, 2:24 PM
    We are seeing increased latency when starting multiple prefect jobs when submitting on the sub minute level (or running every minute). In Prefect 1 we never experienced this issue, but now are seeing more instances of conflicting runs starting anywhere from 1 to 3 minutes late. Has anyone had success in improving this? We have unlimited concurrency on our work pools
    n
    m
    • 3
    • 20
  • s

    Sid Bendre

    09/03/2025, 4:23 PM
    Hey I run a self hosted prefect server and I’m seeing this issue:
    Copy code
    Task run failed with exception: ConcurrencySlotAcquisitionError("Unable to acquire concurrency limits ['im-c0Ij4IOaaRYYCwnuAXqUmf', 'audio']") - Retries are exhausted
    ....
    raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
    prefect.exceptions.PrefectHTTPStatusError: Server error '500 Internal Server Error' for url '<http://48.211.249.12/api/concurrency_limits/increment>'
    Response: {'exception_message': 'Internal Server Error'}
    For more information check: <https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/500>
    we got a lot of spikey concurrent traffic, we never set any concurrency limits, what’s causing this? We are also seeing issues when decrementing
    Copy code
    Task run failed with exception: "PrefectHTTPStatusError(\"Server error '500 Internal Server Error' for url '<http://48.211.249.12/api/concurrency_limits/decrement>'\\nResponse: {'exception_message': 'Internal Server Error'}\\nFor more information check: <https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/500>\")" - Retries are exhausted
    👀 1
    m
    • 2
    • 7
  • a

    aadi i

    09/03/2025, 7:17 PM
    How can I dynamically pass parameters to a Prefect flow running as a Kubernetes Job (Pod) or Docker container? I’m currently using the
    flow.from_source()
    method, which downloads the flow code from an S3 bucket, builds the image dynamically, and then runs the flow. However, I’d like to avoid building the Docker image at runtime. Is there a way to use a prebuilt Docker image (pulled from a registry) and still pass parameters dynamically — preferably through a Pythonic method or REST API — without relying on deployment templates or environment variables? I understand that
    job_variables
    can be set dynamically and overridden during a flow run, but I’m looking for an alternative that allows passing parameters (like
    flow_data
    ) more flexibly — ideally at runtime — in a way that automatically maps values from
    flow_data
    into
    job_variables
    and passes them as arguments to the flow entrypoint, while using a pre-built Docker image.
    Copy code
    flow_from_source = await flow.from_source(
        source=s3_bucket_block,
        entrypoint="flows/bill_flow.py:bill_assessment_flow"
    )
    
    flow_dependencies = get_flow_dependencies()
    
    deployment = await flow_from_source.deploy(
        name=PREFECT_DEPLOYMENT_NAME,
        tags=["billing"],
        work_pool_name="kubernetes-pool",
        schedule=None,
        push=False,  # Skip pushing image
        job_variables={
            "finished_job_ttl": 100,
            # "image": "mat/prefect-k8s-worker:15",  # Uncomment to use a custom prebuilt image
            "namespace": "prefect",
            "env": {
                "PREFECT_API_URL": "<http://prefect-server:4200/api>",
                "EXTRA_PIP_PACKAGES": flow_dependencies,
                "PYTHONPATH": "/opt/prefect/"
            }
        }
    )
    
    app.state.deployment_id = deployment
    
    flow_run = await client.create_flow_run_from_deployment(
        deployment_id=request.app.state.deployment_id,
        tags=run_tags,
        parameters={
            "flow_data": {
                "source_provider": source_provider,
                "target_provider": target_provider,
                "company_id": company_id,
                "company_name": company_name,
                "assessment_task_id": assessment_task_id
            }
        }
    )
    
    <http://logger.info|logger.info>(f"Created flow run with ID: {flow_run.id}")
    n
    m
    • 3
    • 7
  • t

    Trey Gilliland

    09/04/2025, 12:11 AM
    I'm running into an issue where around 30% of my prefect runs for one my deployments fails at the git_clone and set_working_directory step because the directory where the code would be cloned into does not exist. This is especially weird as it works most of the time, but I guess 30% of the time the clone fails? I tried adding a shell script to sleep in between the clone and set working directory in case it was due to some race condition from the cloning, to no avail. When I add a debug step to run
    ls /
    , on successful runs the code is cloned to the right place under
    /code-main
    and on failed runs that directory is not there. There is no logs from the git clone step to suggest that the clone fails. The gh token is valid and it does work most of the time. Any ideas? This is on Prefect Cloud using a Modal Work Pool
    m
    • 2
    • 6
  • k

    Kiran

    09/04/2025, 7:26 AM
    hey @Marvin, if iam using retires and not retry_delay_seconds parameter with flow decorator, does retires work ?
    m
    • 2
    • 2
  • s

    Shareef Jalloq

    09/04/2025, 8:17 PM
    Hi all, new Prefect user here. I've managed to set up a simple deployment and can get it running as a dev instance using SQLite. Now I'm trying to run it under supervisord and connect to a Postgresql server. No matter what I do I can't get the server to start successfully under supervisord. I have a simple bash script that sets up the env and runs the server. I have an
    apps
    user that is used to run all apps on this server.
    Copy code
    web-server-01:/var/log/apps# cat /opt/apps/fpga-automation/start_prefect.sh
    #!/bin/bash
    source /opt/apps/fpga-automation/venv/bin/activate
    export PREFECT_API_DATABASE_CONNECTION_URL="<postgresql+asyncpg://prefect_user:prefectdb@10.10.8.114:5432/prefect_automation>"
    export PREFECT_HOME="/opt/apps/prefect"
    export HOME="/home/apps"
    export PREFECT_LOGGING_LEVEL=DEBUG
    exec prefect server start --host 127.0.0.1 --port 4200
    And the error looks like a DNS failure?
    Copy code
    ___ ___ ___ ___ ___ ___ _____
    | _ \ _ \ __| __| __/ __|_   _|
    |  _/   / _|| _|| _| (__  | |
    |_| |_|_\___|_| |___\___| |_|
    
    Configure Prefect to communicate with the server with:
    
        prefect config set PREFECT_API_URL=<http://127.0.0.1:4200/api>
    
    View the API reference documentation at <http://127.0.0.1:4200/docs>
    
    Check out the dashboard at <http://127.0.0.1:4200>
    
    
    
    20:13:29.728 | ERROR   | prefect.server.utilities.postgres_listener - Failed to establish raw asyncpg connection for LISTEN/NOTIFY: [Errno -2] Name does not resolve
    Traceback (most recent call last):
      File "/opt/apps/fpga-automation/venv/lib/python3.12/site-packages/prefect/server/utilities/postgres_listener.py", line 71, in get_pg_notify_connection
        conn = await asyncpg.connect(**connect_args)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/opt/apps/fpga-automation/venv/lib/python3.12/site-packages/asyncpg/connection.py", line 2421, in connect
        return await connect_utils._connect(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/opt/apps/fpga-automation/venv/lib/python3.12/site-packages/asyncpg/connect_utils.py", line 1075, in _connect
        raise last_error or exceptions.TargetServerAttributeNotMatched(
      File "/opt/apps/fpga-automation/venv/lib/python3.12/site-packages/asyncpg/connect_utils.py", line 1049, in _connect
        conn = await _connect_addr(
               ^^^^^^^^^^^^^^^^^^^^
      File "/opt/apps/fpga-automation/venv/lib/python3.12/site-packages/asyncpg/connect_utils.py", line 886, in _connect_addr
        return await __connect_addr(params, True, *args)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/opt/apps/fpga-automation/venv/lib/python3.12/site-packages/asyncpg/connect_utils.py", line 931, in __connect_addr
        tr, pr = await connector
                 ^^^^^^^^^^^^^^^
      File "/opt/apps/fpga-automation/venv/lib/python3.12/site-packages/asyncpg/connect_utils.py", line 802, in _create_ssl_connection
        tr, pr = await loop.create_connection(
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "uvloop/loop.pyx", line 1982, in create_connection
    socket.gaierror: [Errno -2] Name does not resolve
    If I run the script as root or the apps user it works fine. What am I missing?
    n
    • 2
    • 5
  • n

    Nate

    09/04/2025, 9:37 PM
    Hi folks! we've identified and rolled out a fix related to the failure to release deployment concurrency slots in Prefect Cloud the fix should now be in place, but if you're still encountering anything unexpected, please let us know!
  • o

    Owen Boyd

    09/04/2025, 10:25 PM
    I'm seeing this task fail and then not retry - any idea what might be going on or how I can troubleshoot? The failure is likely caused by a downstream resource being overloaded. I'm confused why we are not seeing any retry attempts half an hour later. I suspected that the prefect run might have crashed, but it's still going according to the cloud UI. Nothing else that I'm aware of would prevent the retry from happening - I checked task concurrency limits etc, we don't have any internal concurrency limits that would prevent retry here in our code.
    Copy code
    Task run failed with exception: TaskRunTimeoutError('Scope timed out after 60.0 second(s).') - Retry 1/3 will start 10 second(s) from now 02:29:37 PM
    Finished in state Completed() 02:29:22 PM