https://prefect.io logo
Join Slack
Powered by
# ask-community
  • f

    Fugo Takefusa

    10/01/2025, 10:45 PM
    Hi team, I am encountering the issue that the tasks runs are missing in the Prefect run logs. The flow executes as expected but the just the logs for the task runs are not recorded. The same issue was raised https://github.com/PrefectHQ/prefect/issues/15153 and I followed the debug steps but no luck. I verified that the running container can talk to the postgress database. No task runs were recorded in the db when I use prefect client to check. Does anyone know the fix? Config: • Self-hosting Prefect API server + Prefect worker + Postgress db in the ECS. • The prefect version "3.7.20" (derived using
    curl https://<prefect-server-url>/api/admin/version
    )
    🤔 1
    ➕ 1
    l
    • 2
    • 4
  • n

    Nick Ackerman

    10/03/2025, 5:17 PM
    Hey, folks! When I run this code:
    Copy code
    from prefect import flow, task
    
    @task
    async def async_task():
        raise ValueError("aah!")
    
    @task
    def not_async_task():
        async_task.submit().result()
    
    @flow
    def my_flow():
        not_async_task()
    
    my_flow()
    I get the following error:
    Copy code
    File "/Users/nickackerman/code/python/practice/.venv/lib/python3.10/site-packages/prefect/transactions.py", line 624, in __aexit__
        await self.reset()
      File "/Users/nickackerman/code/python/practice/.venv/lib/python3.10/site-packages/prefect/transactions.py", line 492, in reset
        await parent.rollback()
    TypeError: object bool can't be used in 'await' expression
    Is this way of using Prefect futures together with async tasks not allowed for some reason? More details in 🧵
    n
    • 2
    • 7
  • k

    Kurt Sys (Vintecc)

    10/04/2025, 11:56 AM
    Hey all, anyone had issues just running the most basic test with prefect?
    Copy code
    from prefect import flow
    from prefect.testing.utilities import prefect_test_harness
    
    
    @flow
    def my_favorite_flow():
        return 42
    
    
    def test_my_favorite_flow():
        with prefect_test_harness(server_startup_timeout=120):
            assert my_favorite_flow() == 42
    I seem to be unable to make it work, I always get a 'timeout':
    Copy code
    pytest tests/test_dummy.py 
    =============================================================================================================================== test session starts ===============================================================================================================================
    platform linux -- Python 3.10.12, pytest-7.4.4, pluggy-1.6.0
    rootdir: /workspace
    configfile: pytest.ini
    plugins: devtools-0.12.2, docker-3.2.3, anyio-4.11.0, asyncio-0.23.8, xdist-3.8.0, postgresql-7.0.2, dash-3.2.0, hydra-core-1.3.2, Faker-37.8.0, cov-6.2.1
    asyncio: mode=strict
    collected 1 item                                                                                                                                                                                                                                                                  
    
    tests/test_dummy.py 
    F                                                                                                                                                                                                    [100%]
    
    == FAILURES ==
    __ test_my_favorite_flow __
    
        def test_my_favorite_flow():
    >       with prefect_test_harness(server_startup_timeout=120):
    
    tests/test_dummy.py:11: 
     _ _
    /usr/lib/python3.10/contextlib.py:135: in __enter__
        return next(self.gen)
    ...py310/lib/python3.10/site-packages/prefect/testing/utilities.py:168: in prefect_test_harness
        test_server.start(
     _ _
    
    self = <prefect.server.api.server.SubprocessASGIServer object at 0x77ee36386200>, timeout = 120
    
        def start(self, timeout: Optional[int] = None) -> None:
            ...
    >                       raise RuntimeError(error_message)
    E                       RuntimeError: Timed out while attempting to connect to ephemeral Prefect API server.
    
    ...py310/lib/python3.10/site-packages/prefect/server/api/server.py:921: RuntimeError
    -- Captured stderr call --
    13:49:16.879 | INFO    | prefect - Starting temporary server on <http://127.0.0.1:8787>
    See <https://docs.prefect.io/v3/concepts/server#how-to-guides> for more information on running a dedicated Prefect server.
    -- Captured log call --
    INFO     prefect:server.py:881 Starting temporary server on <http://127.0.0.1:8787>
    See <https://docs.prefect.io/v3/concepts/server#how-to-guides> for more information on running a dedicated Prefect server.
  • k

    Kyle Mulka

    10/05/2025, 9:32 PM
    Just started using prefect. I created a single file hello.py with a hello world function. But when I run it, I’m running into an error:
    Copy code
    [Errno 2] No such file or directory: 'uvx'
    n
    • 2
    • 2
  • a

    Arthur Ren

    10/05/2025, 10:03 PM
    @Marvin I’m trying to upgrade prefect2 to prefect3, what’s the corresponding syntax for push deployment code to a s3 storage
    m
    • 2
    • 20
  • k

    Kartik

    10/06/2025, 8:25 AM
    Hi team, can someone explain me this, why it shows this error
    p
    • 2
    • 2
  • s

    Shubham

    10/07/2025, 3:05 PM
    Hi, is there limitation with no. of work pool I can create in prefect cloud with starter plan ?
  • k

    Kartik

    10/08/2025, 5:39 AM
    I’m facing an issue after migrating my self-hosted Prefect server from SQLite to PostgreSQL (hosted on AWS RDS). Initially, I was using the default SQLite database, which stored all data locally on the EC2 instance. As the space consumed by events and resource metadata kept growing significantly, I decided to switch to PostgreSQL on RDS. However, after the migration, my flow runs have started taking much longer than before. Previously, an average flow run would complete in around 5 minutes, but now they’re taking 30 minutes or more. Additionally, the next scheduled run (set to trigger every 30 minutes) often starts and enters the RUNNING state before the previous flow has completed. PS: The schedule interval is 30 minutes. Can someone please help me understand what could be causing this slowdown?
    n
    • 2
    • 2
  • j

    Jo Tryti

    10/08/2025, 6:12 AM
    Hi, we are running a self hosted prefect server. I'm now updating the prefect-dbt to 0.7 and trying to get the PrefectDbtRunner to work. For some reason the runner takes forever starting the model subtasks. This is both locally running the flow from a terminal and in the docker container worker. I've tried to just use the dbtRunner and that works as expected. Haven't been able to find anyone else posting about the same problem. In the case bellow the dbt_run and dbt_prun does the same thing the only difference is that each task calls different invoke help functions:
    Copy code
    def __invoke_dbt(command: List[str]) -> dbtRunnerResult:
        runner = dbtRunner()
        results = runner.invoke(command)
        return results
    
    def __invoke_preect_dbt(command: List[str]) -> dbtRunnerResult:
        runner = PrefectDbtRunner()
        results = runner.invoke(command)
        return results
    Anyone have any idea on what the cause could be?
  • t

    Tom Han

    10/08/2025, 10:19 PM
    Hi! Is
    MetricTrigger
    a Prefect-Cloud only thing? aka Self hosted prefect server can't use metric trigger for automations?
  • c

    chase albright

    10/08/2025, 10:40 PM
    Hi Prefect community. I am using prefect opensource prefect --version 3.4.6 i am trying to set a custom webhook alert for different flow states, but it appears to not be working. Wondering if it is something i am doing wrong... i can trigger it form the terminal, but if i set up an automation for any state, i am unable to recieve any notifcations.
    • 1
    • 1
  • k

    Kiran

    10/09/2025, 6:32 AM
    @Marvin iam calling multiple deployments in a sequential order from my main deployment, can i show the deplendency between these sub deployments in the UI?
    m
    • 2
    • 11
  • k

    Kiran

    10/09/2025, 6:43 AM
    hi @Nate I am calling multiple deployments in a sequential order from my main deployment, can I show the dependency between these sub deployments in the UI?
  • m

    Michal Barer

    10/09/2025, 8:21 AM
    Hey! I’m trying to add a new member to our account via the UI (we use SSO login). He gets the email invite, but when he clicks the link and tries to join the workspace, he sees a message saying the link has expired. Any idea how to fix this? @Marvin
    m
    • 2
    • 3
  • m

    Momo Chimes

    10/09/2025, 12:23 PM
    Hey, is it correct to think that prefect is not made for thousands of tasks in the queue? As I keep getting PoolTimeout after 30-60mins for my agent. Latest prefect version
  • m

    Momo Chimes

    10/09/2025, 12:24 PM
    I wanted to use this solution to parse my data, that i get from sensors, so the amount of tasks is something like this. And for some reason even if there are no tasks schedules, and only pending ones in the past, they do not get picked up by the agent. Or rather server is not providing tasks to the agent according to debug info
  • m

    Momo Chimes

    10/09/2025, 12:26 PM
    exact error on the agent
  • j

    Jared

    10/09/2025, 4:04 PM
    Hi all, I’m trying to set up a prefect log subscriber to demo real-time logging from my prefect server from another client. This is the simple implementation I did for the client: """Stream ALL Prefect logs - no filtering""" import asyncio import os from prefect.logging.clients import get_logs_subscriber     async def stream_all_logs():     os.environ["PREFECT_API_URL"] = "http://10.0.0.81/prefect/server/api"     os.environ["PREFECT_API_AUTH_STRING"] = "admin:your-secure-password"     print("Connecting to Prefect logs...")     print("Streaming ALL logs (no filter)...\n")           # Pass NO filter at all - just get everything     async with get_logs_subscriber() as subscriber:         print("Connected! Waiting for logs...\n")         count = 0         async for log in subscriber:             count += 1             # Simple output             ts = log.timestamp.strftime("%H:%M:%S.%f")[:-3]             level = (log.level_name or "INFO")[:5]             msg = log.message[:100]             print(f"[{ts}] {level:5} | {msg}")             if count % 20 == 0:                 print(f"\n {count} logs...\n")     if name == "__main__":     try:         asyncio.run(stream_all_logs())     except KeyboardInterrupt:         print("\n Stopped")   And I keep getting this error:     _ Connecting to Prefect logs... _ Streaming ALL logs (no filter)...   __ Connected! Waiting for logs...   Traceback (most recent call last): File "/code/constellation/test_prefect_streaming.py", line 35, in <module> asyncio.run(stream_all_logs()) File "/usr/local/lib/python3.10/asyncio/runners.py", line 44, in run return loop.run_until_complete(main) File "/usr/local/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete return future.result() File "/code/constellation/test_prefect_streaming.py", line 19, in stream_all_logs async for log in subscriber: File "/usr/local/lib/python3.10/site-packages/prefect/logging/clients.py", line 288, in anext message = orjson.loads(await self._websocket.recv()) File "/usr/local/lib/python3.10/site-packages/websockets/asyncio/connection.py", line 322, in recv raise self.protocol.close_exc from self.recv_exc websockets.exceptions.ConnectionClosedError: no close frame received or sent Any ideas what I’m doing wrong?
  • m

    Martin Janiczek

    10/10/2025, 8:50 AM
    I'm asking Marvin at the same time in the other channel, but - has anybody experienced weird pauses between concurrency-limited tasks? This flow normally only takes 20-30min, but after update to the latest Prefect library, the flow is taking 4h+
    m
    • 2
    • 4
  • m

    Marvin

    10/10/2025, 1:16 PM
    ✅ thought for 749.2 seconds
  • m

    Marvin

    10/10/2025, 1:28 PM
    Você está perguntando sobre a nova atualização do Prefect — aqui vai um resumo rápido do que chegou na 3.4.23 (lançada em 09/Out/2025): Principais novidades - Suporte a contextos SSL personalizados no cliente Prefect. PR #19106 - (Anunciado) Novo comando de CLI
    prefect api
    para acessar o API diretamente. PR #19131 - Correções: - Suporte ao parâmetro
    day_or
    em cron schedules. PR #19121 - Outras correções em cancelamento assíncrono e serialização de bundles. - Docs: exemplos e links de integrações atualizados (inclui exemplo de dashboard ATProto com Prefect Assets). PR #18984 Changelog completo: 3.4.22 → 3.4.23 Como usar as novidades 1) Cliente com SSL customizado Em Prefect 3.x você pode passar um
    ssl.SSLContext
    via
    httpx_settings
    para o cliente:
    Copy code
    import ssl
    from prefect.client.orchestration import PrefectClient, SyncPrefectClient
    
    # Crie/ajuste seu SSLContext (ex.: CA customizada)
    ctx = ssl.create_default_context(cafile="/path/to/ca-bundle.pem")
    ctx.minimum_version = ssl.TLSVersion.TLSv1_2
    
    # Assíncrono
    async with PrefectClient(
        api="<https://api.prefect.cloud>",
        api_key="YOUR_API_KEY",
        httpx_settings={"verify": ctx},
    ) as client:
        # faça chamadas via client.* ...
    
    # Sincrono
    with SyncPrefectClient(
        api="<https://api.prefect.cloud>",
        api_key="YOUR_API_KEY",
        httpx_settings={"verify": ctx},
    ) as client:
        # faça chamadas via client.* ...
    Observações: -
    httpx_settings["verify"]
    aceita
    True/False
    , caminho para CA bundle ou um
    ssl.SSLContext
    (recomendado para controle fino). - A assinatura de
    PrefectClient
    confirma o parâmetro
    httpx_settings
    . 2) Cron schedules com
    day_or
    Confirmamos que
    prefect.schedules.Cron
    expõe
    day_or: bool=True
    . Use assim:
    Copy code
    from prefect.schedules import Cron
    
    # OR (padrão): dispara se dia-do-mês OU dia-da-semana bater
    sched_or = Cron("0 9 * * MON,FRI", timezone="UTC", day_or=True)
    
    # AND: dispara somente se ambos baterem (dia-do-mês E dia-da-semana)
    sched_and = Cron("0 9 1 * MON", timezone="UTC", day_or=False)
    Dica: se você precisa de
    day_or=False
    , crie o Schedule explicitamente e passe no deploy:
    Copy code
    from prefect import flow
    from prefect.schedules import Cron
    
    @flow
    def my_flow():
        ...
    
    schedule = Cron("0 9 1 * MON", day_or=False, timezone="UTC")
    
    my_flow.deploy(
        name="my-deployment",
        schedule=schedule,
    )
    Docs gerais de schedules: Schedules 3) Sobre o novo comando
    prefect api
    - Os release notes mencionam um novo comando
    prefect api
    . No entanto, ao checar a ajuda local do CLI vimos “No such command 'api'”. - Isso pode indicar que: - sua instalação local ainda não está em 3.4.23, ou - o comando está sendo gradualmente disponibilizado/ajustado. - Por favor, confirme sua versão com:
    Copy code
    prefect version
    prefect --help
    - Se precisar atualizar:
    Copy code
    pip install -U prefect
    Enquanto isso, você pode interagir com o API diretamente via Python:
    Copy code
    import ssl, httpx
    
    api_url = "<https://api.prefect.cloud/api>"  # ajuste conforme necessário
    api_key = "YOUR_API_KEY"
    
    ctx = ssl.create_default_context(cafile="/path/to/ca-bundle.pem")
    headers = {"Authorization": f"Bearer {api_key}"}
    
    r = httpx.get(f"{api_url}/flows", headers=headers, verify=ctx)
    r.raise_for_status()
    print(r.json())
    Quer que eu detalhe algum desses pontos (CLI, schedules ou segurança/SSL) em português mais a fundo? Também posso ajudar a aplicar isso no seu projeto atual.
  • l

    Lee Trout

    10/10/2025, 2:38 PM
    Has anyone seen issues with SQS event replication in aws workers? We have seen: • Workers losing the websocket connection to prefect and logging that at the warning level after which the queue grows until the worker is restarted • The queue growing without any obvious errors / log messages and a failure to dequeue anything on worker restart
    a
    • 2
    • 3
  • v

    Vignesh GJ

    10/12/2025, 3:53 PM
    I am trying prefect automation to clean up zombie flow when worker crashes. Following below doc: https://docs.prefect.io/v3/advanced/detect-zombie-flows Prefect version: 3.3.5 When a flow is executed by a process worker. If i kill the worker, the flow is still in running state. Automation is not marking it as crashed after 3 heart beat miss. I am emitting heartbeat for each 31 sec. I can see the heartbeat in UI. I can even see in event feed page, 2 heart beat came, 93703 PM 93734 PM after that no event for that feed for more than 5 min. Still automation is not triggered. Next event is 94743 PM, Flow run canceling - this I have done via UI How can we debug why its not maked as crashed?
  • m

    Max

    10/13/2025, 4:45 PM
    Copy code
    name: orchestrator
    prefect-version: 3.4.22
    build: null
    push: null
    pull:
    - prefect.deployments.steps.set_working_directory:
        directory: ./repositories
    - prefect.deployments.steps.git_clone:
        repository: <ssh://git@bitbucket.zzzxxxyyy/xxx/application-name.git>
        branch: feature/prefect_xxx
    
    deployments:
    - name: application-name
      version: 1.0.0
      tags: [xxx]
      description: xxx
      schedule: {}
      flow_name: null
      entrypoint: application-name/xxx/script.py:foo
      parameters: {}
      work_pool:
        name: application-pool
        work_queue_name: null
        job_variables: {}
    Hello everyone, trying to understand what's the issue with my
    .yaml
    . The error I face is:
    FileNotFoundError: [Errno 2] No such file or directory <entrypoint>
    It looks like pull step is not even executed. I have my ssh configured properly and repo's pulled to my local with no issues. I assume the solution is very simple and the question is very stupid and I am missing something 😄 @Marvin
  • b

    Brandon Robertson

    10/13/2025, 6:38 PM
    @Marvin I'm trying to load a PagerDutyWebhook block in my flow code that I created in Prefect Cloud UI:
    Copy code
    from prefect.blocks.notifications import PagerDutyWebhook
    pagerduty_webhook_block = PagerDutyWebhookBlock.load("my-pager-duty-block)
    I'm getting this error:
    Copy code
    File ".venv/lib/python3.13/site-packages/prefect/blocks/notifications.py", line 41, in __init__ NOTIFY_TYPES += (PREFECT_NOTIFY_TYPE_DEFAULT, ) # pyright: ignore[reportUnknownVariableType]
    TypeError: unsupported operand type(s) for +=: 'frozenset' and 'tuple'
    I'm using prefect version 3.4.0 Any suggestions?
    m
    • 2
    • 5
  • s

    Steven Snowball

    10/13/2025, 9:27 PM
    Hi, i'm new to linux, i've tried to install the prefect using "uv pip install -U prefect" it looks like it installed but when i run "prefect version" i just get "prefect: command not found" do i need to do any other steps to get it to work, i was just following - https://docs.prefect.io/v3/get-started/install
    n
    • 2
    • 10
  • s

    Serhiy

    10/14/2025, 6:55 PM
    Using Azure Container Registry: I'm deploying to K8S in Azure and using the following helm command to create worker: helm install prefect-worker prefect/prefect-worker --namespace prefect -f ./prefect-worker.yaml How I can specify the docker image for flow execution? The following is not working: prefect-worker.yaml --------------------------------------------------------------------------------- namespaceOverride: prefect worker: apiConfig: selfHostedServer config: workPool: ingest-pool workQueues: [default] name: ingest baseJobTemplate: configuration: image: abcdef.azurecr.io/prefect imagePullPolicy: IfNotPresent --------------------------------------------------------------------------------- - at '/worker/config/baseJobTemplate/configuration': got object, want null or string
    j
    • 2
    • 1
  • m

    Mehrdad

    10/14/2025, 7:54 PM
    Hi, Has anyone run into issues with deployment names or any known limitations in Prefect v3.4 (self-hosted)? I’m trying to deploy my flow:
    Copy code
    flow.from_source(
        source=source_code_storage,
        entrypoint=pricing_subflow_entrypoint
    ).deploy(
        name=f'pricing-subflow-v2.0.1',
        version=version,
        work_pool_name=work_pool_name,
        tags=["pricing-core"],
        image=DockerImage(name=image, platform="linux/amd64"),
        push=False,
        build=False,
        job_variables=job_variables
    )
    But the deployment name ends up being only
    "pricing-subflow-v2"
    , and the rest of the name (
    .0.1
    ) gets ignored. This is the result of deployment:
    n
    • 2
    • 3
  • k

    Kiran

    10/15/2025, 5:34 AM
    @Marvin , i am running around 748 runs with 10 workers, even still the workers are delaying the picking up of the runs, why?
    m
    • 2
    • 6
  • k

    Kiran

    10/15/2025, 8:03 AM
    @Marvin how to start a worker by assocating it with a work queue
    m
    • 2
    • 2