https://prefect.io logo
Join Slack
Powered by
# ask-community
  • d

    Dom

    10/20/2025, 6:43 AM
    Hello community ! I'm new to Prefect. Can I use only prefect flow, pipeline and task logics? Not having a anything cloud, or locally self-hosted Prefect server. Just use logics behind those functions. I have several python functions and private packages. I want to use those as a task. And just creating flow and pipelines and implement with REST api -- Is that best practice or work smoothly? Would i counter any problem when i use prefect as that?
  • i

    Idriss Bellil

    10/20/2025, 7:17 AM
    is Prefect Cloud down or something? the status page shows it's up but flows in progress are showing: ``Unable to check status of flow run: An error occurred (ServerException) when calling the DescribeTasks operation (reached max retries: 4): Service Unavailable. Please try again later.` and others crashing without a reason?
    r
    • 2
    • 1
  • i

    Idriss Bellil

    10/20/2025, 7:22 AM
    The status page now shows an active incident
  • q

    Quinn

    10/20/2025, 1:40 PM
    Hey there I have two work pools, one hosted by prefect and one self hosted. The one hosted by prefect just started failing all jobs while the self hosted (gcp) is still up and running. Anyone have any idea what’s going on?
    r
    • 2
    • 2
  • n

    Nathan Low

    10/20/2025, 1:56 PM
    Hi All, I used to be able to run .map() across multiple tasks in prefect 2. This seems to bug out and request I start using .wait() in 3. Unfortunately this slows down the job significantly. Is there any easy way to do this that isn't nested mapped tasks?
    Copy code
    print(accounts_date_df.to_string()) 
       geneva_data_dfs: ASPrefectGenevaUtil_dataclasses.GenevaExtractData = (
          geneva_extract_tasks.extract_geneva_data_with_update_data_and_file.map(
            update_row=accounts_date_df.itertuples(),
            geneva_tax_lot_args=unmapped(geneva_args),
            type_of_pull=type_of_pull,
          )
        )
        geneva_data_tranform: pd.DataFrame = (
          geneva_transform_tasks.transform_geneva_extract_data.map(
            geneva_data_dfs, type_of_pull=type_of_pull
          )
        )
    geneva_load_tasks.load_geneva_extract_data.map(
          geneva_data_tranform, type_of_pull=type_of_pull
        )
    n
    • 2
    • 13
  • t

    Tom Han

    10/20/2025, 9:51 PM
    Has anyone experienced memory-leak like behavior in long-running processor worker? I had a few workers running for 10 days and the resource usage graph seems very suspicious:
  • l

    Lee Trout

    10/20/2025, 10:59 PM
    All of our jobs in our prefect cloud hybrid workpool are marked as late and not processing and have been all day. our worker is up and our infra that supports jobs initiated by the worker in ECS is up but nothing is moving. I understood there being problems during the AWS outage but since Prefect gave the all clear I expected things would start functioning again and it is not. anyone else seeing this?
    a
    t
    • 3
    • 6
  • j

    Jessica Smith

    10/21/2025, 1:29 PM
    Can anyone confirm what the
    --limit
    parameter does when provided to a
    worker
    ? The docs say
    The maximum number of flow runs to start simultaneously.
    but it seems like it may actually mean something more like "how many flow runs can be running simultaneously" - which is the exact same thing as the existing concurrency limit on the work pool definition.
  • v

    Vincent D. Warmerdam

    10/21/2025, 1:53 PM
    Just a heads up @Alexander Azzam I have just found a sweet sweet integration pattern between prefect and marimo notebooks. Video will go live next week, but I am really looking forward to having notebooks be my way forward to schedule my Python jobs with marimo under the hood
    n
    • 2
    • 2
  • n

    Nathan Low

    10/21/2025, 4:44 PM
    Anyone still using soda for data checks? Been using it where I work and looks like I’ve got the integration working with prefect 3, just need some information on how to set up the blocks correctly.
  • a

    Amith M

    10/21/2025, 4:54 PM
    hey guys, i have a quick question how do i set job_body in cloud run workpool to use gcp secrets as env vars
    ✅ 1
  • n

    Nathan Low

    10/22/2025, 12:02 PM
    Are community created blocks allowed in prefect 3, not sure if the cookie cutter project from 2 is the one I should be using, could find anything in the integrations section on the docs.
    a
    • 2
    • 2
  • m

    Matt Youngberg

    10/22/2025, 1:00 PM
    We put a deployment in Prefect Cloud with it's own unique environment variable, but the presence of that single variable apparently erased the environment variables provided by the work pool (Google Cloud Run). Is this expected behavior? This relevant issue got worked on, it seems, in order to avoid such behavior, but it may have regressed? https://github.com/PrefectHQ/prefect/issues/11041
    upvote 1
    a
    • 2
    • 2
  • r

    Rick Fanta

    10/22/2025, 1:33 PM
    @Adam et al... why aren't there also "on_started" hooks for tasks and flows? are there good, simple work-arounds for that?
    a
    • 2
    • 3
  • r

    Rick Fanta

    10/22/2025, 2:10 PM
    @Nate [any thoughts?]
  • j

    Jagi Natarajan

    10/22/2025, 9:32 PM
    is the prefect mcp server working for anyone -- https://docs.prefect.io/mcp
  • j

    Jagi Natarajan

    10/22/2025, 9:33 PM
    image.png
    n
    • 2
    • 12
  • t

    thiago

    10/23/2025, 1:25 PM
    Hello folks 👋 does anyone has suggestions on how to run async functions in Prefect v3 sync flows? We’re using a library that has only async functions and our flows are built over sync calls
    n
    • 2
    • 4
  • s

    Simon Ouellette

    10/23/2025, 4:13 PM
    Hi, I'm running Prefect on a RunPod instance. When I'm running a task that takes a bit of time (say, 20+ minutes), sometimes I get a worker pool crash: + Exception Group Traceback (most recent call last): | File "/usr/local/lib/python3.11/dist-packages/prefect/cli/_utilities.py", line 44, in wrapper | return fn(*args, **kwargs) | ^^^^^^^^^^^^^^^^^^^ | File "/usr/local/lib/python3.11/dist-packages/prefect/cli/_types.py", line 156, in sync_fn | return asyncio.run(async_fn(*args, **kwargs)) | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | File "/usr/lib/python3.11/asyncio/runners.py", line 190, in run | return runner.run(main) | ^^^^^^^^^^^^^^^^ | File "/usr/lib/python3.11/asyncio/runners.py", line 118, in run | return self._loop.run_until_complete(task) | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | File "/usr/lib/python3.11/asyncio/base_events.py", line 654, in run_until_complete | return future.result() | ^^^^^^^^^^^^^^^ | File "/usr/local/lib/python3.11/dist-packages/prefect/cli/worker.py", line 168, in start | await worker.start( | File "/usr/local/lib/python3.11/dist-packages/prefect/workers/process.py", line 158, in start | async with self as worker: | File "/usr/local/lib/python3.11/dist-packages/prefect/workers/process.py", line 320, in aexit | await super().__aexit__(*exc_info) | File "/usr/local/lib/python3.11/dist-packages/prefect/workers/base.py", line 1600, in aexit | raise exceptions[0] from None | File "/usr/lib/python3.11/contextlib.py", line 728, in aexit | cb_suppress = await cb(*exc_details) | ^^^^^^^^^^^^^^^^^^^^^^ | File "/usr/local/lib/python3.11/dist-packages/prefect/runner/runner.py", line 1610, in aexit | await self._exit_stack.__aexit__(*exc_info) | File "/usr/lib/python3.11/contextlib.py", line 745, in aexit | raise exc_details[1] | File "/usr/lib/python3.11/contextlib.py", line 728, in aexit | cb_suppress = await cb(*exc_details) | ^^^^^^^^^^^^^^^^^^^^^^ | File "/usr/local/lib/python3.11/dist-packages/anyio/_backends/_asyncio.py", line 736, in aexit | raise BaseExceptionGroup( | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception) +-+---------------- 1 ---------------- | Exception Group Traceback (most recent call last): | File "/usr/local/lib/python3.11/dist-packages/prefect/workers/process.py", line 162, in start | async with anyio.create_task_group() as loops_task_group: | File "/usr/local/lib/python3.11/dist-packages/anyio/_backends/_asyncio.py", line 736, in aexit | raise BaseExceptionGroup( | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception) +-+---------------- 1 ---------------- | Traceback (most recent call last): | File "/usr/local/lib/python3.11/dist-packages/prefect/utilities/services.py", line 64, in critical_service_loop | await workload() | File "/usr/local/lib/python3.11/dist-packages/prefect/workers/base.py", line 1107, in sync_with_backend | await self._update_local_work_pool_info() | File "/usr/local/lib/python3.11/dist-packages/prefect/workers/base.py", line 1004, in _update_local_work_pool_info | work_pool = await self._client.create_work_pool(work_pool=wp) | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | File "/usr/local/lib/python3.11/dist-packages/prefect/client/orchestration/_work_pools/client.py", line 478, in create_work_pool | response = await self.request( | ^^^^^^^^^^^^^^^^^^^ | File "/usr/local/lib/python3.11/dist-packages/prefect/client/orchestration/base.py", line 53, in request | return await self._client.send(request) | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | File "/usr/local/lib/python3.11/dist-packages/prefect/client/base.py", line 379, in send | response.raise_for_status() | File "/usr/local/lib/python3.11/dist-packages/prefect/client/base.py", line 163, in raise_for_status | raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.cause | prefect.exceptions.PrefectHTTPStatusError: Client error '404 Not Found' for url 'https://rdzs18c4k9ub52-4200.proxy.runpod.net/api/work_pools/' | For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/404 What can be causing this?
  • b

    Ben

    10/23/2025, 4:20 PM
    Hey community and @Marvin - I have a question about running tasks concurrently. I have a flow that I want to execute tasks concurrently, but I also need to access the data returned from those downstream tasks. My current solution is a synchronous
    for loop
    . E.g.
    Copy code
    for object in objects:
      data = tasks.operation1(data=object)
      tasks.operation2(data=data)
    I'm reading the docs which mention using either
    .submit()
    or
    .map()
    to run tasks concurrently, however i'm not sure how to access the data returned and pass them to a secondary task (which can also be run concurrently if possible)
    m
    • 2
    • 28
  • s

    skrawczyk

    10/23/2025, 5:24 PM
    Does anyone know of a way to subscribe to perfect version changelog updates?
    n
    • 2
    • 1
  • j

    Janet Carson

    10/24/2025, 5:45 PM
    Does anyone know if the prefect slack integrations can be used with gov-slack?
    n
    • 2
    • 2
  • m

    Marcelo de Souza

    10/24/2025, 8:07 PM
    Can someone please share a contact of a Prefect SalesRep?
    n
    • 2
    • 1
  • m

    Mitch Zink

    10/24/2025, 10:52 PM
    Hi, I have prefect enterprise and need to talk to an account manager about our contract. Can someone connect me?
    a
    • 2
    • 1
  • t

    Tushar Mahajan

    10/25/2025, 8:15 PM
    Hi, I am using self hosted prefect server/workers via helm charts. I am using
    kubernetes
    type work pool and trying to pass env variables using the method described here (via prefect yaml) -> https://github.com/zzstoatzz/prefect-pack/blob/4cfc53ae94c6bd65ff0eefcb1c5ce72b48446179/prefect.yaml#L106 But I always get env variable not found error, anything I might be missing here ?
    n
    • 2
    • 2
  • j

    Jeff Rose

    10/28/2025, 5:58 PM
    Hey all, wondering if anyone else is experiencing issues with duplicate flow runs intermittently showing up? Is prefect working on a fix for this? https://github.com/PrefectHQ/prefect/issues/18523 https://github.com/PrefectHQ/prefect/issues/18894
  • b

    Ben

    10/30/2025, 9:23 PM
    Hey team, I am trying to add external storage to to store the results of my tasks and flows. Although it seems to be working locally, when I try to deploy a flow using
    prefect deploy
    , I get this error:
    Copy code
    prefect.exceptions.ScriptError: Script at 'src/providers/Tapology/flows/data_ingestion.py' encountered an exception: TypeError('Result storage configuration must be persisted server-side. Please call `.save()` on your block before passing it in.')
    An exception occurred.
    The thing is that I have created the storage block directly in the Prefect web UI, and I'm loading it in my code, something like this:
    Copy code
    from prefect import flow
    from prefect.filesystems import RemoteFileSystem
    from prefect.serializers import JSONSerializer
    from prefect.futures import as_completed
    from prefect.task_runners import ThreadPoolTaskRunner
    
    # Load the storage block you saved previously
    r2_storage = RemoteFileSystem.load("r2-persistent-storage")
    
    # Choose serializer for your payloads
    json_ser = JSONSerializer()
    
    @flow(
        log_prints=True,
        task_runner=ThreadPoolTaskRunner(max_workers=3),
        persist_result=True,
        result_storage=r2_storage,
        result_serializer=json_ser,
    )
    def my_flow_function():
        [...]
    Another thing to note is that this code is working for already existing deployments, the error is only thrown when trying to deploy a new deployment. Any idea whats going on?
    m
    • 2
    • 6
  • t

    Tom Collom

    10/31/2025, 8:32 AM
    Hello @Marvin I am getting started with self-hosted and have deployed worker and server to docker etc and all works! I can deploy simple scripts with no dependencies using the below code, however - I cannot anywhere find how to extend this to add in my project dependencies? E.g how do i tell prefect to install either my requirements.txt or simply specify what the dependencies are? I notice there is a yaml version of deployment that might enable this, but this seems long winded and i have set everything up below so wanted to ideally just extend the below? if name == "__main__": flow.from_source( source=f"https://{access_token}@github.com/tom/{repo}.git", entrypoint=entrypoint, ).deploy( name=pipeline_name, work_pool_name="local-pool", )
    b
    m
    • 3
    • 17
  • m

    Miguel Moncada

    10/31/2025, 9:23 AM
    @Marvin I have a flow deployed to a k8s work pool, the flow is crashing and from the pod logs I can see this error:
    Copy code
    Unable to authenticate to the event stream. Please ensure the provided api_key or auth_token you are using is valid for this environment. Reason: Actor is not authenticated
    Do you know what could be going on?
    m
    • 2
    • 2
  • p

    Pierre L

    10/31/2025, 10:57 AM
    @Marvin, I am encountering an error similar to the one described here when my flow is deployed on Prefect server OSS (latest version) installed with helm on a managed kubernetes cluster. The only difference with this question is that the parameter that doesn't pass for me is a
    date.date
    object. Same error when using a
    datetime.datetime
    . Here is my exact error :
    Copy code
    Task run failed with exception: PrefectHTTPStatusError('Client error \'409 Conflict\' for url \'<http://prefect-server.prefectoss.svc.cluster.local:4200/api/deployments/d6150396-f839-4084-8091-401f5a6c53ca/create_flow_run>\'\nResponse:
     {\'detail\': "Error creating flow run: Validation failed for field \'time_start\'. Failure reason: 1704067200.0 is not of type \'string\'"}\n
    The bug doesnt occur when using Prefect cloud. Why ?
    m
    n
    • 3
    • 7