https://prefect.io logo
Join Slack
Powered by
# ask-community
  • k

    Kiran

    10/09/2025, 6:43 AM
    hi @Nate I am calling multiple deployments in a sequential order from my main deployment, can I show the dependency between these sub deployments in the UI?
  • m

    Michal Barer

    10/09/2025, 8:21 AM
    Hey! I’m trying to add a new member to our account via the UI (we use SSO login). He gets the email invite, but when he clicks the link and tries to join the workspace, he sees a message saying the link has expired. Any idea how to fix this? @Marvin
    m
    • 2
    • 3
  • m

    Momo Chimes

    10/09/2025, 12:23 PM
    Hey, is it correct to think that prefect is not made for thousands of tasks in the queue? As I keep getting PoolTimeout after 30-60mins for my agent. Latest prefect version
  • m

    Momo Chimes

    10/09/2025, 12:24 PM
    I wanted to use this solution to parse my data, that i get from sensors, so the amount of tasks is something like this. And for some reason even if there are no tasks schedules, and only pending ones in the past, they do not get picked up by the agent. Or rather server is not providing tasks to the agent according to debug info
  • m

    Momo Chimes

    10/09/2025, 12:26 PM
    exact error on the agent
  • j

    Jared

    10/09/2025, 4:04 PM
    Hi all, I’m trying to set up a prefect log subscriber to demo real-time logging from my prefect server from another client. This is the simple implementation I did for the client: """Stream ALL Prefect logs - no filtering""" import asyncio import os from prefect.logging.clients import get_logs_subscriber     async def stream_all_logs():     os.environ["PREFECT_API_URL"] = "http://10.0.0.81/prefect/server/api"     os.environ["PREFECT_API_AUTH_STRING"] = "admin:your-secure-password"     print("Connecting to Prefect logs...")     print("Streaming ALL logs (no filter)...\n")           # Pass NO filter at all - just get everything     async with get_logs_subscriber() as subscriber:         print("Connected! Waiting for logs...\n")         count = 0         async for log in subscriber:             count += 1             # Simple output             ts = log.timestamp.strftime("%H:%M:%S.%f")[:-3]             level = (log.level_name or "INFO")[:5]             msg = log.message[:100]             print(f"[{ts}] {level:5} | {msg}")             if count % 20 == 0:                 print(f"\n {count} logs...\n")     if name == "__main__":     try:         asyncio.run(stream_all_logs())     except KeyboardInterrupt:         print("\n Stopped")   And I keep getting this error:     _ Connecting to Prefect logs... _ Streaming ALL logs (no filter)...   __ Connected! Waiting for logs...   Traceback (most recent call last): File "/code/constellation/test_prefect_streaming.py", line 35, in <module> asyncio.run(stream_all_logs()) File "/usr/local/lib/python3.10/asyncio/runners.py", line 44, in run return loop.run_until_complete(main) File "/usr/local/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete return future.result() File "/code/constellation/test_prefect_streaming.py", line 19, in stream_all_logs async for log in subscriber: File "/usr/local/lib/python3.10/site-packages/prefect/logging/clients.py", line 288, in anext message = orjson.loads(await self._websocket.recv()) File "/usr/local/lib/python3.10/site-packages/websockets/asyncio/connection.py", line 322, in recv raise self.protocol.close_exc from self.recv_exc websockets.exceptions.ConnectionClosedError: no close frame received or sent Any ideas what I’m doing wrong?
  • m

    Martin Janiczek

    10/10/2025, 8:50 AM
    I'm asking Marvin at the same time in the other channel, but - has anybody experienced weird pauses between concurrency-limited tasks? This flow normally only takes 20-30min, but after update to the latest Prefect library, the flow is taking 4h+
    m
    • 2
    • 4
  • m

    Marvin

    10/10/2025, 1:16 PM
    ✅ thought for 749.2 seconds
  • m

    Marvin

    10/10/2025, 1:28 PM
    Você está perguntando sobre a nova atualização do Prefect — aqui vai um resumo rápido do que chegou na 3.4.23 (lançada em 09/Out/2025): Principais novidades - Suporte a contextos SSL personalizados no cliente Prefect. PR #19106 - (Anunciado) Novo comando de CLI
    prefect api
    para acessar o API diretamente. PR #19131 - Correções: - Suporte ao parâmetro
    day_or
    em cron schedules. PR #19121 - Outras correções em cancelamento assíncrono e serialização de bundles. - Docs: exemplos e links de integrações atualizados (inclui exemplo de dashboard ATProto com Prefect Assets). PR #18984 Changelog completo: 3.4.22 → 3.4.23 Como usar as novidades 1) Cliente com SSL customizado Em Prefect 3.x você pode passar um
    ssl.SSLContext
    via
    httpx_settings
    para o cliente:
    Copy code
    import ssl
    from prefect.client.orchestration import PrefectClient, SyncPrefectClient
    
    # Crie/ajuste seu SSLContext (ex.: CA customizada)
    ctx = ssl.create_default_context(cafile="/path/to/ca-bundle.pem")
    ctx.minimum_version = ssl.TLSVersion.TLSv1_2
    
    # Assíncrono
    async with PrefectClient(
        api="<https://api.prefect.cloud>",
        api_key="YOUR_API_KEY",
        httpx_settings={"verify": ctx},
    ) as client:
        # faça chamadas via client.* ...
    
    # Sincrono
    with SyncPrefectClient(
        api="<https://api.prefect.cloud>",
        api_key="YOUR_API_KEY",
        httpx_settings={"verify": ctx},
    ) as client:
        # faça chamadas via client.* ...
    Observações: -
    httpx_settings["verify"]
    aceita
    True/False
    , caminho para CA bundle ou um
    ssl.SSLContext
    (recomendado para controle fino). - A assinatura de
    PrefectClient
    confirma o parâmetro
    httpx_settings
    . 2) Cron schedules com
    day_or
    Confirmamos que
    prefect.schedules.Cron
    expõe
    day_or: bool=True
    . Use assim:
    Copy code
    from prefect.schedules import Cron
    
    # OR (padrão): dispara se dia-do-mês OU dia-da-semana bater
    sched_or = Cron("0 9 * * MON,FRI", timezone="UTC", day_or=True)
    
    # AND: dispara somente se ambos baterem (dia-do-mês E dia-da-semana)
    sched_and = Cron("0 9 1 * MON", timezone="UTC", day_or=False)
    Dica: se você precisa de
    day_or=False
    , crie o Schedule explicitamente e passe no deploy:
    Copy code
    from prefect import flow
    from prefect.schedules import Cron
    
    @flow
    def my_flow():
        ...
    
    schedule = Cron("0 9 1 * MON", day_or=False, timezone="UTC")
    
    my_flow.deploy(
        name="my-deployment",
        schedule=schedule,
    )
    Docs gerais de schedules: Schedules 3) Sobre o novo comando
    prefect api
    - Os release notes mencionam um novo comando
    prefect api
    . No entanto, ao checar a ajuda local do CLI vimos “No such command 'api'”. - Isso pode indicar que: - sua instalação local ainda não está em 3.4.23, ou - o comando está sendo gradualmente disponibilizado/ajustado. - Por favor, confirme sua versão com:
    Copy code
    prefect version
    prefect --help
    - Se precisar atualizar:
    Copy code
    pip install -U prefect
    Enquanto isso, você pode interagir com o API diretamente via Python:
    Copy code
    import ssl, httpx
    
    api_url = "<https://api.prefect.cloud/api>"  # ajuste conforme necessário
    api_key = "YOUR_API_KEY"
    
    ctx = ssl.create_default_context(cafile="/path/to/ca-bundle.pem")
    headers = {"Authorization": f"Bearer {api_key}"}
    
    r = httpx.get(f"{api_url}/flows", headers=headers, verify=ctx)
    r.raise_for_status()
    print(r.json())
    Quer que eu detalhe algum desses pontos (CLI, schedules ou segurança/SSL) em português mais a fundo? Também posso ajudar a aplicar isso no seu projeto atual.
  • l

    Lee Trout

    10/10/2025, 2:38 PM
    Has anyone seen issues with SQS event replication in aws workers? We have seen: • Workers losing the websocket connection to prefect and logging that at the warning level after which the queue grows until the worker is restarted • The queue growing without any obvious errors / log messages and a failure to dequeue anything on worker restart
    a
    • 2
    • 3
  • v

    Vignesh GJ

    10/12/2025, 3:53 PM
    I am trying prefect automation to clean up zombie flow when worker crashes. Following below doc: https://docs.prefect.io/v3/advanced/detect-zombie-flows Prefect version: 3.3.5 When a flow is executed by a process worker. If i kill the worker, the flow is still in running state. Automation is not marking it as crashed after 3 heart beat miss. I am emitting heartbeat for each 31 sec. I can see the heartbeat in UI. I can even see in event feed page, 2 heart beat came, 93703 PM 93734 PM after that no event for that feed for more than 5 min. Still automation is not triggered. Next event is 94743 PM, Flow run canceling - this I have done via UI How can we debug why its not maked as crashed?
  • m

    Max

    10/13/2025, 4:45 PM
    Copy code
    name: orchestrator
    prefect-version: 3.4.22
    build: null
    push: null
    pull:
    - prefect.deployments.steps.set_working_directory:
        directory: ./repositories
    - prefect.deployments.steps.git_clone:
        repository: <ssh://git@bitbucket.zzzxxxyyy/xxx/application-name.git>
        branch: feature/prefect_xxx
    
    deployments:
    - name: application-name
      version: 1.0.0
      tags: [xxx]
      description: xxx
      schedule: {}
      flow_name: null
      entrypoint: application-name/xxx/script.py:foo
      parameters: {}
      work_pool:
        name: application-pool
        work_queue_name: null
        job_variables: {}
    Hello everyone, trying to understand what's the issue with my
    .yaml
    . The error I face is:
    FileNotFoundError: [Errno 2] No such file or directory <entrypoint>
    It looks like pull step is not even executed. I have my ssh configured properly and repo's pulled to my local with no issues. I assume the solution is very simple and the question is very stupid and I am missing something 😄 @Marvin
  • b

    Brandon Robertson

    10/13/2025, 6:38 PM
    @Marvin I'm trying to load a PagerDutyWebhook block in my flow code that I created in Prefect Cloud UI:
    Copy code
    from prefect.blocks.notifications import PagerDutyWebhook
    pagerduty_webhook_block = PagerDutyWebhookBlock.load("my-pager-duty-block)
    I'm getting this error:
    Copy code
    File ".venv/lib/python3.13/site-packages/prefect/blocks/notifications.py", line 41, in __init__ NOTIFY_TYPES += (PREFECT_NOTIFY_TYPE_DEFAULT, ) # pyright: ignore[reportUnknownVariableType]
    TypeError: unsupported operand type(s) for +=: 'frozenset' and 'tuple'
    I'm using prefect version 3.4.0 Any suggestions?
    m
    • 2
    • 5
  • s

    Steven Snowball

    10/13/2025, 9:27 PM
    Hi, i'm new to linux, i've tried to install the prefect using "uv pip install -U prefect" it looks like it installed but when i run "prefect version" i just get "prefect: command not found" do i need to do any other steps to get it to work, i was just following - https://docs.prefect.io/v3/get-started/install
    n
    • 2
    • 10
  • s

    Serhiy

    10/14/2025, 6:55 PM
    Using Azure Container Registry: I'm deploying to K8S in Azure and using the following helm command to create worker: helm install prefect-worker prefect/prefect-worker --namespace prefect -f ./prefect-worker.yaml How I can specify the docker image for flow execution? The following is not working: prefect-worker.yaml --------------------------------------------------------------------------------- namespaceOverride: prefect worker: apiConfig: selfHostedServer config: workPool: ingest-pool workQueues: [default] name: ingest baseJobTemplate: configuration: image: abcdef.azurecr.io/prefect imagePullPolicy: IfNotPresent --------------------------------------------------------------------------------- - at '/worker/config/baseJobTemplate/configuration': got object, want null or string
    j
    • 2
    • 1
  • m

    Mehrdad

    10/14/2025, 7:54 PM
    Hi, Has anyone run into issues with deployment names or any known limitations in Prefect v3.4 (self-hosted)? I’m trying to deploy my flow:
    Copy code
    flow.from_source(
        source=source_code_storage,
        entrypoint=pricing_subflow_entrypoint
    ).deploy(
        name=f'pricing-subflow-v2.0.1',
        version=version,
        work_pool_name=work_pool_name,
        tags=["pricing-core"],
        image=DockerImage(name=image, platform="linux/amd64"),
        push=False,
        build=False,
        job_variables=job_variables
    )
    But the deployment name ends up being only
    "pricing-subflow-v2"
    , and the rest of the name (
    .0.1
    ) gets ignored. This is the result of deployment:
    n
    • 2
    • 6
  • k

    Kiran

    10/15/2025, 5:34 AM
    @Marvin , i am running around 748 runs with 10 workers, even still the workers are delaying the picking up of the runs, why?
    m
    • 2
    • 6
  • k

    Kiran

    10/15/2025, 8:03 AM
    @Marvin how to start a worker by assocating it with a work queue
    m
    • 2
    • 2
  • t

    Tomáš Řehák (realrehi)

    10/15/2025, 12:08 PM
    Hi guys. We have Prefect Server 3.4.6 running in our Kubernetes cluster and it has its own Postgres + Worker. Using helm chart from here. Now there is one issue. We have two Deployments. Both of them work when started manually. We are trying to set up a trigger. We want Deployment B to start when any Flow Run from Deployment A reaches "Completed" state and also allow for multiple triggers. We created a custom composite trigger like this:
    Copy code
    {
      "type": "compound",
      "triggers": [
        {
          "type": "event",
          "match": {},
          "match_related": {
            "prefect.resource.id": "prefect.deployment.<deployment A's id>"
          },
          "after": [],
          "expect": [
            "prefect.flow-run.Completed"
          ],
          "for_each": [],
          "posture": "Reactive",
          "threshold": 1,
          "within": 0
        }
      ],
      "require": "any",
      "within": 0
    }
    And then in "Action" we picked "Run a Deployment" and selected Deployment B, also providing parameters. However, when a Flow Run from Deployment A is Completed, nothing happens. Is there something wrong about our trigger condition? Not sure what we are doing wrong. The ID provided is 100% correct. Is it possible that the automation state is cached for a while when enabled / disabled? Because it was working before so maybe someone switched it off / on and it is still cached? Idk Thanks for any help
  • h

    Harshith Gogineni

    10/15/2025, 12:51 PM
    Hey everyone! 👋 Having a concurrency nightmare with Prefect 3.4.11 and need help. Problem: 70-80 flows running when deployment limits should cap at 20 total (two deployments: one with limit 13, one with limit 7 both run on same work pool). My setup: • Prefect Server 3.4.11 in Docker on EC2 • PostgreSQL on RDS • ECS work pool (push work pool, spawns Fargate tasks per flow) • FastAPI app triggering flows programmatically using
    create_flow_run_from_deployment()
    What's happening: • Deployment YAML has
    concurrency_limit: 13
    and
    concurrency_limit: 7
    configured • ECS keeps spawning Fargate tasks without respecting limits I have tried adding a concurrency limit on work pool instead of deployments but that is causing the runs to not even get into pending/running state they are stuck in late. So ideally I need a solution where the concurrency limits are respected.
    a
    • 2
    • 4
  • i

    Igor Kuzenkov

    10/15/2025, 4:08 PM
    Hello Everyone! First of all, excuse me, I am not an expert in Prefect, so, the following question. What I have I have a
    Flow A
    .
    Flow A
    has two deployments -
    Deployment 1
    and
    Deployment 2
    . My question Is there a way to trigger
    Deployment 2
    after
    Deployment 1
    run reached any of
    Completed
    or
    Failed
    state? What I tried There is a template allowing me to create a trigger like When any flow run of flow Flow A enters Failed, Completed then do an action like Run a deployment Flow A -> Deployment 2. But I don't need any here. I need Flow A -> Deployment 1 instead of any. To summarize what I need. Trigger: When deployment run Flow A -> Deployment 1 enters Failed, Completed Do action: Run a deployment Flow A -> Deployment 2 Is my question applicable to the Prefect universe? If so, how to do this? Thanks
  • s

    Sergio Luceno

    10/15/2025, 6:40 PM
    Hello Everyone! I am starting playing with prefect and I think I might be doing something wrong. I have a self-hosted prefect in a k8s cluster in AWS EKS. I am testing with a simple flow like this one.
    Copy code
    @flow
    def hello():
        time.sleep(300)
        print("hello world")
    I am packaging my flow into a python:3.11-slim, with just this code and pip installing prefect dependency, trying to keep things as minimum as possible. We have a deployment that we set to use our docker image. --- When executing this in our cluster, Everytime we run a deployment we get a new pod executing the flow. Every pod takes 150-200MB of RAM (without counting the prefect-server + prefect-worker pods) If I need to run for example 10K concurrent jobs, I will have 10K pods x 200Mb RAM each... => 2000000Mb RAM. It's something we can not afford. Are we doing something wrong? Can we do our planned workload in another why it's gonna use less resources? We are starting to think prefect it's not for our use case: We just want to run a bunch of small jobs, and benefit from prefect concurrency management, but we can not afford every single task has a starting memory footprint of 150-200MB
    k
    • 2
    • 2
  • n

    Nicholas Gaffney

    10/16/2025, 3:58 AM
    For people running off the Prefect Managed Infrastructure, does anyone know why jobs will run inconsistently late and whether there is a minimum compute time for each flow. Just trying to understand our Workpool usage
  • a

    ASHIK

    10/16/2025, 11:56 AM
    Hello Prefect team I want to connect to discuss on enterprise support, I went to your website and left a query, but didnt see any response. Can you people let me know whom to contact and when to schedule call
  • b

    Ben

    10/16/2025, 2:45 PM
    Hello community! All of my deployment executions are running very late and I'm not sure why.... I am running prefect self-hosted with Docker Compose with a Worker service listening deployment executions for GCP Cloud Run work pool. My deployments can be late upwards to 4 hours in some cases. My worker service is started with this command in my docker compose file:
    Copy code
    entrypoint:
          - /opt/prefect/entrypoint.sh
          - prefect
          - worker
          - start
          - '--pool=my-work-pool'
          - '--with-healthcheck'
          - '--name=${DEFAULT_WORKER_NAME}'
          - '--limit=${DEFAULT_POOL_LIMIT}'
    n
    m
    • 3
    • 14
  • s

    Solmaz Bagherpour

    10/16/2025, 4:09 PM
    Hello community! We are on prefect-2 and there is a usecase that we need to trigger a flow run outside of prefect UI, for example through a slack command. Are there any examples or resources for it if its possible?
    n
    • 2
    • 1
  • i

    Ihor Ramskyi

    10/16/2025, 4:21 PM
    Hi team Is there anyone else using Microsoft Teams webhooks? Microsoft said that they are changing the links on their webhooks, which I use for alerts if the flow fails. The old ones gonna be deprecated at November 30. I tried to update the links in blocks, but it doesn't work, nothing is sent. Only the old URLs work. I tried sending notifications with Teams API myself - it works, so the problem is with Prefect. Have anybody found a way to make the new URLs work? Using Prefect 3.4.23
    n
    m
    • 3
    • 7
  • m

    Matthew Bell

    10/16/2025, 8:40 PM
    Possible to set a default items per page? (so it doesnt reset to 10 on page reload)
    n
    k
    • 3
    • 3
  • m

    Matt Alhonte

    10/17/2025, 2:20 AM
    Anyone know where in the new GUI you can grab the input Params for a given Flow Run?
    🙌 1
  • o

    Onishi Hideo

    10/17/2025, 2:25 PM
    I am an experienced engineer specializing in workflow automation, LLM integration, RAG, AI detection, and image and voice AI, with a strong track record of real-world implementations and blockchain development. I have built automated pipelines and task orchestration systems using Dspy, OpenAI APIs, and custom agents. One example is a support automation system that connects Slack, Notion, and internal APIs to an LLM, reducing response times by 60%. I’ve also designed and deployed advanced RAG pipelines, combining vector databases, hybrid search, and custom retrieval logic to deliver accurate, context-aware responses in production environments. For AI content detection, I developed tools for a moderation platform using stylometric analysis, embedding similarity, and fine-tuned transformers to identify GPT-generated text with high precision. In image AI, I created a tagging and moderation pipeline with CLIP and YOLOv8 on AWS Lambda and S3, classifying and filtering thousands of images daily for an e-commerce platform. In voice AI, I built a voice cloning and transcription service using Whisper and Tacotron2, enabling personalized voice assistants through ASR, TTS, and CRM integration. In addition to my AI work, I have deep expertise in blockchain technology, including smart contract development (Solidity and Rust), decentralized application architecture, and secure on-chain/off-chain integrations. I focus on delivering scalable, production-ready AI and blockchain systems, covering the entire lifecycle from model selection and fine-tuning to backend integration and deployment. If you’re interested in working together or need support, feel free to reach out anytime. Thanks.