https://prefect.io logo
Join Slack
Powered by
# ask-community
  • a

    Arthur Ren

    07/14/2025, 4:40 PM
    Morning team! I have a few questions regarding automation and DeploymentTrigger, so I have a self hosted prefect 3.2.11 and I setup the tigger like this
    Copy code
    deployment_trigger = DeploymentEventTrigger(
            expect={"someevent.completed"},
            parameters={"payload_dict": "{{event.payload}}"}
        )
    But when I check it on the PrefectUI it looks ok when I click
    show params
    but once I click
    edit
    it shows up empty inside
  • p

    Patrick ODonnell

    07/14/2025, 6:14 PM
    Anyone know approx. what time the new pricing update takes place?
  • p

    Paige Gulley

    07/14/2025, 7:35 PM
    Can anyone confirm whether or not the git_clone step within a deployment is able to specify a specific tag or commit hash to pull from? We're at the last step of instrumenting a ci/cd solution
    n
    • 2
    • 7
  • x

    xavier

    07/14/2025, 8:41 PM
    @Marvin the code below returns variable "*result*" data type as a list. Is there a way to set the output to JSON or DICT? I wanted to extract certain details from the deployment details.
    Copy code
    from prefect import task, flow
    from prefect import runtime
    from pathlib import Path
    from datetime import timedelta
    from prefect_shell import ShellOperation
    import json
    
    @task(name='Get deployment details')
    def task_1():
        try:
            deployment_name = "main/deployment-inspect-details"
            result = ShellOperation(
                commands=[
                    f'prefect deployment inspect "{deployment_name}"'
                ]
            ).run()
            return result
        except Exception as e:
            print(f"ERROR: {e}")
    
    @flow()
    def main():
        result = task_1()
        print("Result type: ",type(result))
        print("Result: ", result)
    
        
    if __name__=='__main__':
        main.from_source(
            source=str(Path(__file__).parent),
            entrypoint="deployment_details.py:main"
        ).deploy(
            name="deployment-inspect-details",
            work_pool_name="deployment-details-workpool",
            interval=timedelta(seconds=100)
        )
    Sample of the result from the cli:
    Copy code
    Result type:  <class 'list'>
    Result:  ['{', "    'id': '6f2f5a5b-21fa-4716-b009-c0345abba1dd',", "    'created': '2025-07-14T20:20:58.571579Z',", "    'updated':", "'2025-07-14T20:29:35.703000Z',", "    'name': 'deployment-inspect-details',", "    'version': '5ccab6aa120aaea524059f5785473ad3',", "    'version_id': None,", "    'version_info': None,", "    'branch': None,", "    'base': None,", "    'root': None,", "    'description': None,", "    'flow_id': '0acc3aa0-41f8-491c-824a-9bd68adee7e3',", "    'concurrency_limit': None,", "    'global_concurrency_limit': None,", "    'concurrency_options': None,", "    'paused': False,", "    'schedules': [", '        {', "            'id': 'fec8e13a-22ae-4f13-b660-e3faf62fba79',", "            'created': '2025-07-14T20:26:06.107156Z',", "            'updated': '2025-07-14T20:29:27.280000Z',", "            'deployment_id': '6f2f5a5b-21fa-4716-b009-c0345abba1dd',", "            'schedule': {", "                'interval': 100.0,", "                'anchor_date': '2025-07-14T04:26:03.565000Z',", "                'timezone': 'UTC'", '            },', "            'active': False,", "            'max_scheduled_runs': None,", "            'parameters': {},", "            'slug': None", '        }', '    ],', "    'job_variables': {},", "    'parameters': {},", "    'pull_steps': [", '        {', "            'prefect.deployments.steps.set_working_directory': {", "                'directory': ", "'C:\\\\Users\\\\USER\\\\Desktop\\\\pipeline_proj\\\\prefect\\\\proj_name\\\\trial_depl", "oyment'", '            }', '        }', '    ],', "    'tags': [],", "    'labels': {'prefect.flow.id': '0acc3aa0-41f8-491c-824a-9bd68adee7e3'},", "    'work_queue_name': 'default',", "    'last_polled': '2025-07-14T20:29:35.701652Z',", "    'parameter_openapi_schema': {", "        'title': 'Parameters',", "        'type': 'object',", "        'properties': {}", '    },', "    'path': None,", "    'entrypoint': 'deployment_details.py:main',", "    'storage_document_id': None,", "    'infrastructure_document_id': None,", "    'created_by': None,", "    'updated_by': None,", "    'work_queue_id': None,", "    'enforce_parameter_schema': True,", "    'work_pool_name': 'deployment-details-workpool',", '', "    'status': 'READY',", "    'automations': []", '}']
    m
    • 2
    • 5
  • n

    Nick Torba

    07/14/2025, 9:18 PM
    Hello friends. I have found a bug I am really struggling with. I have a task named
    set_row_count
    I use in many different flows. I have 1 particular flow where when I call this task, it just hangs indefinitely, it never actually runs:
    Copy code
    17:13:48.835 | INFO    | Flow run 'sparkling-mosquito' - Created task run 'set_row_count-0' for task 'set_row_count'
    17:13:48.836 | INFO    | Flow run 'sparkling-mosquito' - Executing 'set_row_count-0' immediately...
    It just sits here for a long time. I am not able to reproduce it anywhere except in my remote development environment. the same task run from the same flow works locally. It even works when I run it on an ECS cluster that is configured with my "local" dev env (local in quotes because my postgres db runs remotely, but the rest of the app runs locally to be able to test jobs in ECS) My question is, has anyone else run into the problem of a task hanging indefinitely from certain places?
    m
    • 2
    • 17
  • a

    Arthur Ren

    07/14/2025, 10:57 PM
    @Marvin Could you give me an python example of how to use custom events with automation to trigger a run deployment. I want the full
    payload
    dictionary to be passed into the deployment as one type
    dict
    argument
    m
    • 2
    • 5
  • j

    Jezreel Zamora

    07/15/2025, 2:22 AM
    Hello, We setup an alert using automation when a flow is running for more than a day. Recently, this alert has been triggering randomly even though there are no flow that is running for more than a day. Anybody is running with the same issue?
  • s

    Srinivas Kandukuri

    07/15/2025, 6:48 AM
    @Marvin, I'm crrently using workpools to run my flows. When ever i run a flow i'll pass a server host name as one of the parameter saying ads_address to tell which server it should connect. Based on this paramerer i'll be decding to which server i want to connect from the flow and run my logic. Here i'll be having 8 ads servers to connect and i'll be having multiple flows(20000+) flows to trigger daily and share the load accross all the ads servers. And one more thing is that i cant run more 300 flows to run in a single ads. Concurrently can run only 300 jobs in each ads_server. Here how can i segregate this load distibution among the ads servers via workpool concept. Give me a clever solution for this. My prefect is hosted in ecs as a service. I'm using aws ecs type wiyh my pool to run the flows here.
    m
    • 2
    • 7
  • g

    Giacomo Chiarella

    07/15/2025, 8:49 AM
    @Marvin can I call a callback when a task run is cancelled using the endpoint
    api/task_runs/{id}/set_state
    ?
    m
    • 2
    • 11
  • p

    Philip MacMenamin

    07/15/2025, 2:38 PM
    please let me know if people think this is indicating a misunderstanding of something, but I'm interested if this concept makes sense. I'd like a (large) ECS Task to run a flow with (containerized) subflows, but on that same large ECS Task. As in, I'd like the ECS worker to create a big ECS task, which will run all of the workflow / subflows, internal to that Task, (and not spawn off new ECS tasks for subflows). The reasoning is that pulling the image and starting these containers is relatively inefficient compared to running it local to the parent Task. Is this wrong headed? Is there a pattern for doing this?
    a
    • 2
    • 7
  • g

    gee

    07/15/2025, 7:19 PM
    @Marvin I am testing around with github deployment on a private repo. I tried to point to a specific git commit and got an error (see logs, it tries to use the git sha as a branch name). And spotted a github token in plain in the logs. Is this the expected behaviour? Not sure it'll very safe 😨
    m
    a
    • 3
    • 4
  • d

    David Martin Calalang

    07/15/2025, 8:45 PM
    Hi everyone! I'm having some trouble creating and using a temporary cluster for the Dask task runner. My setup is as follows...
    Copy code
    task_runner=DaskTaskRunner(
        cluster_class="dask_cloudprovider.aws.FargateCluster,
        cluster_kwargs={
            "image": {image}
        },
        adapt_kwargs={
            "minimum": 1,
            "maximum": 8
        }
    )
    
    @task(name="foo")
    def square(x):
        return (x * x)
    
    @flow()
    def process_numbers():
        numbers = list(range(10))
        
        futures = [square.submit(n) for n in numbers]
        results = [future.result() for future in futures]
        print(f"Squared numbers: {results}")
        
        return results
    
    if __name__ == "__main__":
        process_numbers.with_options(task_runner=task_runner)()
    
        input("END:")
    My AWS credentials and region are set with environment variables. This implementation correctly creates a new cluster on AWS ECS which hosts the scheduler and also spawns workers, as well as brings up the Dask dashboard that shows the workers. The Dask dashboard also shows the
    square
    tasks (10 of them) being moved to a "processing" state within the workers. The issue is that they appear to stay stuck in that state. Moreover, the Prefect UI shows no progress with the message "This flow run has not yet generated any task or subflow runs". I understand that there is obviously overhead with creating and provisioning resources on ESC for the scheduler and workers, but on my latest run I left it open for 15 minutes, yet it still made no progress. Am I missing something in my task runner configuration? Note that this implementation works correctly and runs quickly (< 2 seconds) when simply passing in
    DaskTaskRunner
    without a cluster_class (ie. creating a local cluster).
  • t

    Tim Olshansky

    07/15/2025, 10:00 PM
    Hi everyone - For context, we're running Prefect with an AWS ECS-based push workpool and finding some gaps in logs/tags to be able to easily debug issues when things go wrong. I'm curious to know if there is a way to accomplish any of the following: 1. Expose PREFECT__FLOW_RUN_NAME alongside the PREFECT__FLOW_RUN_ID environment variable in an AWS ECS push-based work pool? Our logs contain the flow run name but the ECS task has the flow run id and there isn't a way to search by run id in the Prefect Cloud dashboard 2. Similarly, log the flow run id in the logger for prefect flows (i.e. from
    21:44:01.401 | INFO | Flow run 'shrewd-turkey' - Downloading flow code from storage at '.'
    to
    21:44:01.401 | INFO | Flow run 'shrewd-turkey' (06876ca3-9c3d-7520-8000-09cbe7a46f8c) - Downloading flow code from storage at '.'
    or equivalent 3. Manage the tags for a flow run for the AWS ECS Task using prefect specific values (i.e. can we manually add the values we're interested in) Any one of these things would make our debugging workflows much easier 🙂 Does anyone have any ideas on how we could expose this info?
    m
    • 2
    • 11
  • a

    Ali

    07/15/2025, 11:56 PM
    Hi all, we are trying to set up a self hosted prefect for us to use. However, I was able to get everything running expect for the last part. • I'm using docker compose to set up my prefect config ◦ I have server, postgre, worker, and cli. ▪︎ I have custom docker file that worker and cli uses because I need external libraries, etc. • Pulling code from bitbucket with python files inside my docker container to deploy. The issue is when I deploy it's not using worker as defined in my compose file. I'm able to push code to UI with deploy and work-pool but when I try manually running it fails. I tried creating a docker work pool but I was able to set it up properly (I think). I just need help configuring my worker agent to work with the flows. Thanks!
  • s

    Srinivas Kandukuri

    07/16/2025, 3:43 AM
    @Marvin Hi, I have hosted my prefect service in an ecs service as a task FARGATE. And i'm using ecs integration with task definitions and task def containes the cpu and memory configuration. Now i am using run_deployment to call my deployment wehich is associated with a workpool which contains the task def whioch contains the cpu and memory configuration. But my requirement is to change the cpu and memory dynamically while calling the deployment using run_deployment. Like i want to call the deployment with memory and cpu configuration on my own to with a single workpool to override task definitojn cpu and memory configuration. Give me a solution for this.
    m
    • 2
    • 5
  • s

    Srinivas Kandukuri

    07/16/2025, 4:02 AM
    @Marvin Hi, I have hosted my prefect service in an ecs service as a task FARGATE. And i'm using ecs integration with task definitions and task def containes the cpu and memory configuration. Now i am using run_deployment to call my deployment wehich is associated with a workpool which contains the task def whioch contains the cpu and memory configuration. But my requirement is to change the cpu and memory dynamically while calling the deployment using run_deployment. Like i want to call the deployment with memory and cpu configuration on my own to with a single workpool to override task definitojn cpu and memory configuration. Give me a solution for this. Please note Here i'm using run_deployment() method. Please let me know how i can use this override to apply for a single task/flow run alone.
    m
    • 2
    • 2
  • r

    Rina

    07/16/2025, 8:28 AM
    Hi! Curious to know if Prefect has considered sending a request to have prebuilt CDKTF package available? The company where I work use cdktf to define resources, and I was thinking of using Terraform to define some Prefect resources
    m
    • 2
    • 2
  • r

    Russell Brooks

    07/16/2025, 9:27 AM
    On 3.4.6 self hosted prefect server in kubernetes the Settings page in the UI does not display in a pretty printed manner. It starts with ‘home: /prefect/.prefect’ And then heaps of key value pairs with the values being a dict. Is this the normal current behaviour? I remember Setting as pretty printed before….
  • s

    Slackbot

    07/16/2025, 10:16 AM
    This message was deleted.
    m
    • 2
    • 2
  • a

    Anish S

    07/16/2025, 3:56 PM
    Hi there! How would I mount an efs volume into a
    @flow
    that runs in as an ecs task. Marvin bot wasn't helpful. My job variables looks something like follows at the moment. I can see in the codebase they use task_definition and container_definition, but couldn't find appropriate way to where these variables would go.
    Copy code
    flow.deploy(
      ...
      , job_variables={
                "cpu": 1024*8,
                "memory": 2048*8,
                "execution_role_arn": arn
                "vpc_id": vpc,
                "cluster": "prefect-worker",  # FIXME
                "configure_cloudwatch_logs": True,
                "cloudwatch_logs_options": {
                    "awslogs-create-group": "true",
                    "awslogs-group": "prefect-worker", 
                },
                "cloudwatch_logs_prefix": "mesh-log",
                "network_configuration": {...}
      }
    )
    ✅ 1
    m
    j
    • 3
    • 11
  • f

    Fernando Müller

    07/16/2025, 8:27 PM
    Hi there! I have a flow that get run by an ECS Push work pool. Each time the flow is run (scheduled), it creates a new task definition in my ECS cluster. Is there a way to avoid this? I have configured the
    match_latest_revision_in_family
    job variable to
    True
    in an attempt to avoid the creation of new tasks definitions, but had no luck with that. Am I missing something here? Is this happening because the flow runs with different parameters (I have 4 schedules for the same deployment, each with different parameters)? This "problem" is making some flow runs to fail with the following error message:
    Copy code
    Flow run could not be submitted to infrastructure: An error occurred (ClientException) when calling the RegisterTaskDefinition operation: Too many concurrent attempts to create a new revision of the specified family.
    @Marvin
    m
    j
    • 3
    • 3
  • d

    David Martin Calalang

    07/16/2025, 9:11 PM
    Just wanted to follow up on this message. I've still been facing issues. I tried testing with the Dask library directly (simple script creating a Fargate cluster and using the same square() number task). This worked as expected, so I'm assuming that this is an issue with my implementation of Prefect? However I'm not quite sure where to look as I am confident that my AWS credentials is all good.
    n
    • 2
    • 3
  • r

    Ritesh

    07/17/2025, 5:29 AM
    Hi. I have a python script with synchronous prefect client, this script it used to deploy and run 2 other flows present in some git repo sequentially only once(deploying second after completion of first). When i run this script using python, things execute in correct order and it works fine but when i run this script using prefect it keeps deploying the first flow multiple times and does not deploy second flow at all. I have mentioned retries=0, and there are online workers present in the work-pool assigned to this first flow but these deployments never get picked and stay in pending state only. If anyone has any idea about why is this happening please let me know
  • l

    li li

    07/17/2025, 7:00 AM
    the setting page is not work
    • 1
    • 2
  • z

    Zhongyang Xia

    07/17/2025, 3:34 PM
    Hi, we are being rate limited by the API. Can someone please help us? Thanks
  • a

    Alexis Pumel

    07/17/2025, 4:34 PM
    Hello all, I recently received a message stating that Prefect 1.0 is being sunsetted this month, and that our account may be deactivated at the end of the month. We rely on Prefect for a critical job, but we had not received any prior communication before this notice. I've followed up in the thread a few times but haven’t received a response yet. Is there someone who could assist us? I want to make sure we have the support needed to plan appropriately and avoid disruption. Thank you in advance for your help!
    b
    • 2
    • 1
  • z

    Zhongyang Xia

    07/17/2025, 5:40 PM
    Hi All, today we encountered an incident, the logs say
    17:25:29.594 | INFO    | prefect.flow_runs.worker - Aborted submission of flow run '06858fc2-7ef1-7fc9-8000-c05e12028631'. Server sent an abort signal: Cannot transition a flow run with in-process retries to PENDING state.
    Marvin says it's normal and not a logic bomb. I'm not sure if that's an accurate statement - given the fact that logs like this are persistent, and very likely caused our ratelimiting threshold to be hit, and eventually stopped all our data flows
  • s

    Solomon Negusse

    07/17/2025, 10:15 PM
    Hi, I’m wondering what the best practice with regards to use common function across multiple flows:
    utils.py
    Copy code
    def common(a):
        # imagine a function that makes over network calls 
        pass
    Option 1: create a common task that’s then invoked from the flows like so:
    prefect_common.py
    Copy code
    from prefect import task
    
    from utils import common
    
    @task
    def common_task(a):
        return common(a)
    flow_a.py
    Copy code
    from prefect import task
    
    from prefect_common import common_task
    
    @task
    def task_one():
        pass
    
    @flow
    def flow():
        common_task("foo")
        task_one()
    flow_b.py
    Copy code
    from prefect import task
    
    from prefect_common import common_task
    
    @task
    def task_two():
        pass
    
    @flow
    def flow():
        common_task("bar")
        task_two()
    Option 2: create separate task in each flow that uses the common function
    flow_a.py
    Copy code
    from prefect import task
    
    from util import common
    
    @task
    def task_one():
        pass
    
    @task
    def common_task(a):
        return common(a)
    
    @flow
    def flow():
        common_task("foo")
        task_one()
    flow_b.py
    Copy code
    from prefect import task
    
    from util import common
    
    @task
    def task_two():
        pass
    
    @task
    def common_task(a):
        return common(a)
    
    @flow
    def flow():
        common_task(5)
        task_two()
    Option 1 seems like the pythonic choice (DRY etc) and Marvin suggested that, but my concern with that is losing configurability of the task specific to each flow (different retry and timeout options, name, etc.)
    ✅ 1
    j
    • 2
    • 2
  • a

    Ateeb Rizwan

    07/18/2025, 12:27 PM
    Need guidance my prefect is deployed on ECS the error I am getting is work queue does not have any actively polling workers to execute work. i am able to start the worker locally but not on my ecs instance
    n
    • 2
    • 1
  • z

    Zhongyang Xia

    07/18/2025, 9:10 PM
    Hi community, we just revealed that jobs in a stuck state may cause death spirals of API requests coming from the worker, as well as lock releases issues regarding to work-queue level concurrency. Hope this helps anyone facing similar issues