https://prefect.io logo
Join Slack
Powered by
# ask-community
  • t

    Theom

    03/21/2025, 3:24 PM
    Hi there, Does anyone know how I can directly read persisted results from storage? I tried the following:
    Copy code
    from prefect import flow
    from prefect.results import get_result_store
    
    
    @flow(persist_result=True)
    def pipeline():
        store = get_result_store()
        result = store.read("my-key")  # Where I know that a file corresponding to this key exists on disk
        return result
    
    
    if __name__ == "__main__":
        pipeline()
    But I get the following error.
    Sans titre
    c
    • 2
    • 2
  • d

    Damian Birchler

    03/21/2025, 6:33 PM
    how would you go about ensuring flows are submitted for execution among a group of users fairly, i.e. in a round-robin fashion? would a work queue per user, all with the same prio work?
  • t

    Tinendra kumar

    03/22/2025, 7:22 PM
    Hi, I'm currently working on a POC where I have a simple ETL job. I have created a flow with three subflows inside it and provided retry logic in the main flow. Please consider the example below. @flow(name='run ETL flow', log_prints=True, retries=3, retry_delay_seconds=180) def main_flow(): data_extract_subflow() # Extract data_transform_subflow() # Transform data_load_subflow() # Load if name == "__main__": main_flow() QQ: Suppose my second subflow (data_transform_subflow()) fails for some reason during execution. Are there any parameters or settings we can configure so that the next retry starts from where it failed last time? In my case It should start from second subflow data_transform_subflow instead of running from the start
    n
    • 2
    • 2
  • k

    Kiana Alessandra Villaera

    03/23/2025, 2:54 PM
    Hi guys, I am currently following this guide on deployments using ECS and I am struggling. I am using a self-managed instance on EC2. When I first started my worker, it was fine and able to communicate with my server and all the other components. But I had to go and debug other things. Now a few days later that stuff is sorted, I can't seem to start another worker. My work queue is in "Not Ready" status, presumably because there is a problem with the worker. I get this error message:
    Copy code
    (prefect-2) ubuntu@ip-172-31-13-253:~/Docker/eti-data-pipeline-worflows$ prefect worker start --pool my-ecs-pool
    Discovered type 'ecs' for work pool 'my-ecs-pool'.
    Worker 'ECSWorker 05116438-a5b0-42d3-a699-1a23bed3ba35' started!
    
    Failed the last 3 attempts. Please check your environment and configuration.
    Examples of recent errors:
    
    Traceback (most recent call last):
      File "/home/ubuntu/.conda/envs/prefect-2/lib/python3.10/site-packages/prefect/utilities/services.py", line 64, in critical_service_loop
        await workload()
      File "/home/ubuntu/.conda/envs/prefect-2/lib/python3.10/site-packages/prefect/workers/base.py", line 760, in get_and_submit_flow_runs
        runs_response = await self._get_scheduled_flow_runs()
      File "/home/ubuntu/.conda/envs/prefect-2/lib/python3.10/site-packages/prefect/workers/base.py", line 917, in _get_scheduled_flow_runs
        await self.client.get_scheduled_flow_runs_for_work_pool(
      File "/home/ubuntu/.conda/envs/prefect-2/lib/python3.10/site-packages/prefect/client/orchestration/_work_pools/client.py", line 586, in get_scheduled_flow_runs_for_work_pool
        response = await self.request(
      File "/home/ubuntu/.conda/envs/prefect-2/lib/python3.10/site-packages/prefect/client/orchestration/base.py", line 53, in request
        return await self._client.send(request)
      File "/home/ubuntu/.conda/envs/prefect-2/lib/python3.10/site-packages/prefect/client/base.py", line 354, in send
        response.raise_for_status()
      File "/home/ubuntu/.conda/envs/prefect-2/lib/python3.10/site-packages/prefect/client/base.py", line 162, in raise_for_status
        raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
    prefect.exceptions.PrefectHTTPStatusError: Server error '500 Internal Server Error' for url '<http://127.0.0.1:4200/api/work_pools/my-ecs-pool/get_scheduled_flow_runs>'
    Response: {'exception_message': 'Internal Server Error'}
    For more information check: <https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/500>
    I presume this has something to do with my worker not able to access the API. but my
    task-definition.json
    already has my prefect api user key configured. I'm at a loss as to what step I've missed. Perhaps my
    PREFECT_APU_URL
    is incorrect?
    j
    • 2
    • 9
  • u

    Ulysse Petit

    03/23/2025, 4:37 PM
    Hi, The run of my flow is executing twice and not one as expected. The run count indicates 2/1 in Prefect Cloud UI. I tried to put the concurrency of my work pool to 1 (was empty before) but still. My work pool is setup with GCP and my worker has been created as a Cloud Run Service using the tutorial provided in Prefect doc. I don't understand why. Thanks for the help 🙏 (Note: I've experienced this isssue when I run my deployment using "Quick Run")
  • z

    zizhengwang

    03/24/2025, 7:26 AM
    Hi, I am using DeploymentEventTrigger, how could I get partial arguments from upstream deployment, and pass these arguments to triggered deployment?
  • g

    Giacomo Chiarella

    03/24/2025, 8:36 AM
    Hi everyone! I’m cancelling a flow using the following code
    Copy code
    flow_runs = await client.read_flow_runs(flow_run_filter=FlowRunFilter(id={"any_": [flow_run_id]}), limit=1)
    state_updates = {}
    state_updates.setdefault("name", "Cancelled")
    state_updates.setdefault("type", StateType.CANCELLED)
    state = flow_runs[0].state.copy(update=state_updates)
    await client.set_flow_run_state(flow_run_id=flow_run_id, state=state, force=True)
    I’ve noticed although the flow run state is set correctly, the task runs are not cancelled, they are actually still executed. I’ve read from doc that cancelling the flow run like that actually forces the state to be set in Prefect database but it does not stop the tasks. Here where I’ve got that info. The agent type is Process. I’m on Prefect 3.2.7. How to cancel a flow run execution and all its task runs? Is it something I should manage? Is there an out-of-the-box or Prefect way than retrieve all task runs of the flow run and loop over the task run list cancelling them?
    c
    • 2
    • 12
  • k

    Kiran

    03/24/2025, 10:12 AM
    Hi @Marvin, Using run_deployment Iam running some flows, all flows use the same workool, I have created 3 queues for this workpool, and divided my flows into different queues, with high, medium and low with the priority set to 1,2 and 3 respectively for these work-queues. As shown in the code below, I am using the 'work_queue_name' parameter to distribute my flows into these work-queues. I see that the order of execution of my flows is not according to the priority of the queues. The documentation also says that the order of submission will be prioritised, not the order of execution. But I want the flows in 'high' priority queue to be executed first, and then medium, then low with all the flows schedules at the same time either with or without concurrency. How to achieve this? this is a snippet from my code
    Copy code
    @flow(log_prints=True,name="highpriority-triggering-flow",flow_run_name="highpriority-triggering-flow-run")
    async def highpriority_first_triggeringflow():
        with open("calc_file.txt", "r") as file:
            expressions = [line.strip() for line in file.readlines()]
        flow_names=["first-flow","second-low","third-flow","fourth-flow","fifth-flow","sixth-flow","seventh-flow","eigth-flow","ninth-flow","tenth-flow"]
        dictionary=dict(zip(expressions, flow_names))
        print(f"the created  dictionary is {dictionary}")
        
    
        flow_tasks=[]
        for num,name in dictionary.items():
            print(f"the number is {num}")
            print(f"the name is {name}")
            print(f"triggering flow run {name} successfully")
            
            if int(num) in range(1,4):
                print(f"entered first if condition, number is {num}")
            
                flow_tasks.append(run_deployment(
                    name="main-flow/highpriority-maindeployment",  # Deployment Name
                    parameters={"num": num},
                    flow_run_name=name,
                    work_queue_name="high"
                   #scheduled_time=datetime(2025,03,21,4,30,00)
                ))
    
            elif int(num) in range(4,7):
                print(f"entered second if condition, number is {num}")
                flow_tasks.append(run_deployment(
                    name="main-flow/highpriority-maindeployment",  # Deployment Name
                    parameters={"num": num},
                    flow_run_name=name,
                    work_queue_name="medium"
                   
                ))
            elif int(num) in range(7,11):
                print(f"entered third if condition, number is {num}")
                flow_tasks.append(run_deployment(
                    name="main-flow/highpriority-maindeployment",  # Deployment Name
                    parameters={"num": num},
                    flow_run_name=name,
                    work_queue_name="low"
                ))
    
        print(f"flow tasks are {flow_tasks}")
       
        await asyncio.gather(*flow_tasks)
        
        print("All flows triggered successfully!")
       
    
    
    if __name__=="__main__":
      asyncio.run(highpriority_first_triggeringflow())
    m
    • 2
    • 1
  • k

    Kiran

    03/24/2025, 10:13 AM
    Hi @Marvin, Using run_deployment Iam running some flows, all flows use the same workool, I have created 3 queues for this workpool, and divided my flows into different queues, with high, medium and low with the priority set to 1,2 and 3 respectively for these work-queues. As shown in the code below, I am using the 'work_queue_name' parameter to distribute my flows into these work-queues. I see that the order of execution of my flows is not according to the priority of the queues. The documentation also says that the order of submission will be prioritised, not the order of execution. But I want the flows in 'high' priority queue to be executed first, and then medium, then low with all the flows schedules at the same time either with or without concurrency. How to achieve this?
    m
    • 2
    • 1
  • k

    Kiran

    03/24/2025, 10:16 AM
    Hi everyone, Using run_deployment Iam running some flows, all flows use the same workpool, I have created 3 queues for this workpool, and divided my flows into different queues, with high, medium and low with the priority set to 1,2 and 3 respectively for these work-queues. As shown in the code below, I am using the 'work_queue_name' parameter to distribute my flows into these work-queues. I see that the order of execution of my flows is not according to the priority of the queues. The documentation also says that the order of submission will be prioritised, not the order of execution. But I want the flows in 'high' priority queue to be executed first, and then medium, then low with all the flows schedules at the same time either with or without concurrency. How to achieve this? here is my code snippet
    Copy code
    @flow(log_prints=True,name="highpriority-triggering-flow",flow_run_name="highpriority-triggering-flow-run")
    async def highpriority_first_triggeringflow():
        with open("calc_file.txt", "r") as file:
            expressions = [line.strip() for line in file.readlines()]
        flow_names=["first-flow","second-low","third-flow","fourth-flow","fifth-flow","sixth-flow","seventh-flow","eigth-flow","ninth-flow","tenth-flow"]
        dictionary=dict(zip(expressions, flow_names))
        print(f"the created  dictionary is {dictionary}")
        
    
        flow_tasks=[]
        for num,name in dictionary.items():
            print(f"the number is {num}")
            print(f"the name is {name}")
            print(f"triggering flow run {name} successfully")
            
            if int(num) in range(1,4):
                print(f"entered first if condition, number is {num}")
            
                flow_tasks.append(run_deployment(
                    name="main-flow/highpriority-maindeployment",  # Deployment Name
                    parameters={"num": num},
                    flow_run_name=name,
                    work_queue_name="high"
                   #scheduled_time=datetime(2025,03,21,4,30,00)
                ))
    
            elif int(num) in range(4,7):
                print(f"entered second if condition, number is {num}")
                flow_tasks.append(run_deployment(
                    name="main-flow/highpriority-maindeployment",  # Deployment Name
                    parameters={"num": num},
                    flow_run_name=name,
                    work_queue_name="medium"
                   
                ))
            elif int(num) in range(7,11):
                print(f"entered third if condition, number is {num}")
                flow_tasks.append(run_deployment(
                    name="main-flow/highpriority-maindeployment",  # Deployment Name
                    parameters={"num": num},
                    flow_run_name=name,
                    work_queue_name="low"
                ))
    
        print(f"flow tasks are {flow_tasks}")
       
        await asyncio.gather(*flow_tasks)
        
        print("All flows triggered successfully!")
       
    
    
    if __name__=="__main__":
      asyncio.run(highpriority_first_triggeringflow())
  • m

    Melek Alan

    03/24/2025, 10:49 AM
    Hi everyone, How do I set an annotation on the kubernetes job pods that prefect-worker runs? I deployed prefect via helm chart (2024.12.11173517) on aws eks cluster. I am using
    selfHostedServer
    .
    r
    j
    m
    • 4
    • 7
  • f

    Fabian Gruben

    03/24/2025, 11:04 AM
    Hello, is the limit to five deployed workflows (=Deployments) a recent restriction for the Prefect Cloud Free Tier? Does this also apply to free tier accounts that were established before the restriction or is there something like a grace period? Best regards Fabian
    a
    • 2
    • 1
  • m

    Marko T

    03/24/2025, 1:00 PM
    Hello, is repo
    PrefectHQ/prefect-hightouch
    still being maintained by Prefect? It doesn't seem to support Prefect 3 because of Pydantic version problem: https://github.com/PrefectHQ/prefect-hightouch/issues/39 . I'm wondering if we really have to downgrade to Prefect 2 to use it in our pipelines.
  • t

    Theo Bouwman

    03/24/2025, 1:24 PM
    Hi, we have successfully deployed Prefect on k8s with additional worker pods. Works amazing! Unfortunately when I took a look at the dashboard again today it looks like all settings are removed, like the default namespace for the worker pool, and no previous runs are visible, deployments have been removed etc. Looks like a clean install again. We deployed by following https://docs.prefect.io/v3/deploy/infrastructure-examples/kubernetes. So anyone else having this issue as well?
  • s

    Stefan

    03/24/2025, 1:45 PM
    Hi! I'm not getting docs or any kind of recognition for the flow decorator.
    flow: FlowDecorator
    Task and every other import works fine. Latest version.
    n
    j
    • 3
    • 10
  • j

    Jeremy Karn

    03/24/2025, 4:31 PM
    Hi Everyone! I was wondering if anyone knew of a way to get a parameter from a Flow Run to display in the Prefect Cloud "Runs" UI list. It has a bunch of generic information (time, deployment, work pool, etc.) but each of our flow runs has one specific parameter that differs and it would be really nice to see it in the top level list but I couldn't figure out a way to do that. (Edit: I know I can click the parameters link to see the dictionary of parameters, but I'd like a way to just have it display without clicking each job).
    j
    • 2
    • 2
  • b

    Ben Epstein

    03/24/2025, 4:45 PM
    I'm trying to setup a programatic notification on slack when my flows fail. I'm looking at the docs for SlackWebhook and they say to use it like so
    Copy code
    from prefect.blocks.notifications import SlackWebhook
    
    slack_webhook_block = SlackWebhook.load("BLOCK_NAME")
    slack_webhook_block.notify("Hello from Prefect!")
    this is working for me. I wrote a function to do that as per this thread and added the
    on_failure=[notify_slack]
    but i'm getting the error (and mypy is showing the same)
    Copy code
    packages/flows/notify.py", line 7, in notify_slack
        slack_webhook_block.notify(
        ^^^^^^^^^^^^^^^^^^^^^^^^^^
    AttributeError: 'coroutine' object has no attribute 'notify'
    I'm curious why this is working when I just run this code directly, but within prefect flow infrastructure it's failing. I'm seeing that in pylance the variable is of type
    slack_webhook_block: SlackWebhook | Coroutine[Any, Any, SlackWebhook]
    -- so perhaps because my flow is async, this
    notify_slack
    function needs to be async as well?
    c
    • 2
    • 13
  • a

    Abyaya Lamsal

    03/24/2025, 5:41 PM
    Hi team, recently I am trying to migrate from prefect agent to worker in Prefect. At the same, I am also upgrading from
    2.14.16
    to
    2.20.16
    . I started seeing some intermittent issues with a subset of flows. This seems to happen randomly; hence, not every flow run has this issue. For reference, I am using a custom image. Attached logs below:
    Copy code
    13:29:55.058 | INFO    | prefect.flow_runs.worker - Worker 'KubernetesWorker a5d26a51-ff36-4697-8daf-f8aa3a0fea54' submitting flow run '855ead39-db97-4fa6-85b0-723ddd90b7c8'
    13:29:55.236 | INFO    | prefect.flow_runs.worker - Creating Kubernetes job...
    13:29:55.314 | INFO    | prefect.flow_runs.worker - Completed submission of flow run '855ead39-db97-4fa6-85b0-723ddd90b7c8'
    13:29:55.349 | INFO    | prefect.flow_runs.worker - Job 'adept-hog-hwcjq': Pod has status 'Pending'.
    13:30:55.327 | ERROR   | prefect.flow_runs.worker - Job 'adept-hog-hwcjq': Pod never started.
    13:30:55.570 | INFO    | prefect.flow_runs.worker - Pod event 'Scheduled' at 2025-03-18 13:29:55+00:00: Successfully assigned [OUR_NAMESPACE]/adept-hog-hwcjq-pqhbc to <INTERNAL_NODE>
    13:30:55.571 | INFO    | prefect.flow_runs.worker - Job event 'SuccessfulCreate' at 2025-03-18 13:29:55+00:00: Created pod: adept-hog-hwcjq-pqhbc
    13:30:55.572 | INFO    | prefect.flow_runs.worker - Pod event 'Pulling' at 2025-03-18 13:29:56+00:00: Pulling image "<CUSTOM_IMAGE>"
    13:30:55.572 | INFO    | prefect.flow_runs.worker - Pod event 'Pulled' at 2025-03-18 13:30:33+00:00: Successfully pulled image "<CUSTOM_IMAGE>" in 37.16s (37.16s including waiting). Image size: <SIZE> bytes.
    13:30:55.716 | INFO    | prefect.flow_runs.worker - Reported flow run '855ead39-db97-4fa6-85b0-723ddd90b7c8' as crashed: Flow run infrastructure exited with non-zero status code -1.
    <NORMAL EXECUTION>
    ...
    The job eventually runs. The issue is if I subscribe to any failure notification, then I get randomly bombarded with crash notifications, which is not very helpful. Would appreciate any pointers here. Here is a sample of the job logs:
    • 1
    • 1
  • s

    Stefan

    03/24/2025, 6:54 PM
    Hi! How are you handling monorepos in GitHub? I have several Flows where their own repo would be maintenance overkill, so I’d like to have them in their own repo. Today we commit to git and the worker pulls from git, but we would like to use Docker moving forward. How can we achieve: • Having multiple projects in a repo, each with their own build instructions and dependencies (preferably their own isolated uv-projects) • Only deploy and build projects that have changed to avoid building all of them every time. • …preferably without writing a .yaml-file for each project?
    n
    • 2
    • 8
  • t

    Tim Galvin

    03/25/2025, 8:02 AM
    @Marvin - I am trying to run a pytest unit test that runs a very simple
    flow
    . I am doing this inside a docker container. My test fails in the container with the following error:
    Copy code
    FAILED tests/test_prefect_helpers.py::test_enable_loguru_support - sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such column: "Debug Print Notification" - should this be a string literal in single-quotes?
    @Marvin - what is going wrong and causing the error?
    m
    • 2
    • 3
  • c

    Claudiu

    03/25/2025, 11:02 AM
    Hi everyone, my team and I are considering using Prefect to orchestrate tasks for a cognition system of general purpose humanoid robot. We have a POC that works, but we've run into some big bottlenecks and we're wondering if what we're doing makes sense. Issue nr 1: we can't schedule/pause/stop flows: • from the documentation it's very clear that we need to create a
    work pool
    in order to schedule flows, but since our deployment will be tied to some specific hardware (Jetson) we have to support emergency use-case without any cloud or work pool access. Our orchestration and infrastructure layers are on the same hardware so we don't need to separate them. What's the solution for us to be able to schedule flows without work pools? • is there a way to pause/resume flows without work pools? Rn it seems we NEED to create a work pool to have this functionality. •
    prefect.deploy
    or
    prefect.serve
    seem like good tools for remote deployment, but that just isn't our usecase. • Do work pools make sense for our specific use-case or is there an entity that we can use instead? Currently we have a yaml file that provides the scheduling details for a flow, but it's a very convoluted process. having the ability to directly scheduling a task when needed would simplify our process (more details in the thread) Issue nr 2: serialization issues • we have some entities that can't be easily serialized and custom serialisation logic will require additional parts of the system, that aren't implemented in the scope of POC . We know you have some serializers, but they don't work for our entities. • we also have some singleton classes that act as a "syncing" element in our system. Is there a better alternative to manage the state for a single-machine all in one deployment? • we're currently using the
    default task runner
    , is there any benefit to using another one (like
    DaskTaskRunner
    )? given that we don't need distributed cognition for POC
    đź‘€ 2
    👍 1
    âž• 1
    • 1
    • 2
  • m

    Martin TomoviÄŤ

    03/25/2025, 11:39 AM
    Hi, I use prefect for flow recurring at a interval using deployments. But now I have a list of thousands of specific times e.g. 2025/3/25 at 112312:303 and I need to schedule a flow to run at those exact times. Is this somehow possible? Thank you for your advice.
    j
    • 2
    • 2
  • c

    Chris

    03/25/2025, 12:35 PM
    Hi all, I’m running a flow on ECS and have encountered a strange issue. After setting my deployment configurations (mainly network configurations) in
    prefect.yaml
    , the job variables appear in my deployment configuration but are not passed through to my ECS Task. A task definition is created with the name of my work pool and a UUID (e.g.,
    prefect_ecs-dev-pool_79227824-ac2e-48de-8639-58bc6fae1f6c
    ), but it does not include the configured variables. As a result, network settings and IAM roles are missing (managed to bypass this by allowing my worker to pass roles to my task… for now.). I’ve already discussed this with @Bianca Hoch but am still unsure why this is happening. Hoping the community can help us troubleshoot! Thanks in advance. Secondary ask; I’m using webhooks in Prefect Cloud and would like to validate incoming webhooks. To do this, I need access to the raw, unprocessed request body to generate a signature and match it against the signature in the headers. However, it seems that Prefect modifies the raw body in some way, making it impossible to validate my signature. While I appreciate the ability to extract webhook parameters directly into my flow, it would be helpful to have an option to pass the full request payload so I can handle it myself - without manipulation by prefect. I’ve tried several approaches to pass the raw body to my function, but I either get validation errors in my automation or signatures that don’t match. Has anyone else encountered this, or found a workaround? Would love any insights. Thanks!
    j
    • 2
    • 4
  • a

    antonk

    03/25/2025, 2:58 PM
    Hi all, For some reason my schedules deactivate whenever I do a deploy. I tried explicitly setting the schedule to active, but it keeps happening. See images below. Any ideas?
    j
    • 2
    • 1
  • e

    Emon Datta

    03/25/2025, 5:13 PM
    hmm I created a prefect deployment, and it's not picking up the correct parameters. It only shows "args" and "kwargs" in the parameters section instead of the actual expected argument names. When I try to run it, I get the following error because it doesn't pick up the expected parameter names:
    Response: {'detail': "Error creating flow run: Validation failed. Failure reason: 'args' is a required property"}
    any advice for how to fix this?
  • s

    sadath

    03/25/2025, 5:38 PM
    Hi , how can I list failed / success flows and rerun them ?
  • l

    Luis Henrique Dallelaste Bernartt

    03/25/2025, 5:46 PM
    Guys, I am currently facing an issue with using AWS + Prefect, wich boils down to a problematic query. I downloaded the source code of prefect and have been trying for some time to find this query to modify it for better performance. Below is the query in question: UPDATE deployment SET last_polled=$: : TIMESTAMP WITH TIME ZONE, status=$2: :deployment_status, updated=now() WHERE deployment.in IN (NULL) AND (1 !=1) or deployment.work_queue_id IN (3$: :UUID) If you could help me find and fix this query, I Would be grateful! The goal would be to remove the following part: “(NULL) AND (1!=1) OR”. Thank you in advance!
    n
    • 2
    • 2
  • s

    sadath

    03/26/2025, 5:25 AM
    Hi need some help for comparing https://github.com/tensorlakeai/indexify vs prefect . Not feature wise . But on the core engine design wise
  • v

    Vlad

    03/26/2025, 9:31 AM
    Hello there đź‘‹, could someone please suggest a contact from the Prefect team (ideally UK-based) whom I could contact for a technical partnership? We are interested in adopting the Prefect solution.
  • s

    Sam Greenwood

    03/26/2025, 11:46 AM
    @Bianca Hoch Just wanted to drop in to say I've been trying to troubleshoot a
    prefect deploy
    issue which was reliably producing badly built images for ECR which EKS then refused to run (EKS reported on bad hash on image pull, imagePullErr). I had had this issue for over a week and tried everything with installing/re-installing Docker, different package manager venvs, different project layouts for prefect.yaml, etc. In the end, the solution was to disable the containerd image store in Docker Desktop. With that checkbox unticked, the
    prefect deploy
    worked first time, with it checked it would reliably not work. This is Prefect 3.2 with Python 3.12.