Arthur Ren
07/14/2025, 4:40 PMdeployment_trigger = DeploymentEventTrigger(
expect={"someevent.completed"},
parameters={"payload_dict": "{{event.payload}}"}
)
But when I check it on the PrefectUI it looks ok when I click show params
but once I click edit
it shows up empty insidePatrick ODonnell
07/14/2025, 6:14 PMPaige Gulley
07/14/2025, 7:35 PMxavier
07/14/2025, 8:41 PMfrom prefect import task, flow
from prefect import runtime
from pathlib import Path
from datetime import timedelta
from prefect_shell import ShellOperation
import json
@task(name='Get deployment details')
def task_1():
try:
deployment_name = "main/deployment-inspect-details"
result = ShellOperation(
commands=[
f'prefect deployment inspect "{deployment_name}"'
]
).run()
return result
except Exception as e:
print(f"ERROR: {e}")
@flow()
def main():
result = task_1()
print("Result type: ",type(result))
print("Result: ", result)
if __name__=='__main__':
main.from_source(
source=str(Path(__file__).parent),
entrypoint="deployment_details.py:main"
).deploy(
name="deployment-inspect-details",
work_pool_name="deployment-details-workpool",
interval=timedelta(seconds=100)
)
Sample of the result from the cli:
Result type: <class 'list'>
Result: ['{', " 'id': '6f2f5a5b-21fa-4716-b009-c0345abba1dd',", " 'created': '2025-07-14T20:20:58.571579Z',", " 'updated':", "'2025-07-14T20:29:35.703000Z',", " 'name': 'deployment-inspect-details',", " 'version': '5ccab6aa120aaea524059f5785473ad3',", " 'version_id': None,", " 'version_info': None,", " 'branch': None,", " 'base': None,", " 'root': None,", " 'description': None,", " 'flow_id': '0acc3aa0-41f8-491c-824a-9bd68adee7e3',", " 'concurrency_limit': None,", " 'global_concurrency_limit': None,", " 'concurrency_options': None,", " 'paused': False,", " 'schedules': [", ' {', " 'id': 'fec8e13a-22ae-4f13-b660-e3faf62fba79',", " 'created': '2025-07-14T20:26:06.107156Z',", " 'updated': '2025-07-14T20:29:27.280000Z',", " 'deployment_id': '6f2f5a5b-21fa-4716-b009-c0345abba1dd',", " 'schedule': {", " 'interval': 100.0,", " 'anchor_date': '2025-07-14T04:26:03.565000Z',", " 'timezone': 'UTC'", ' },', " 'active': False,", " 'max_scheduled_runs': None,", " 'parameters': {},", " 'slug': None", ' }', ' ],', " 'job_variables': {},", " 'parameters': {},", " 'pull_steps': [", ' {', " 'prefect.deployments.steps.set_working_directory': {", " 'directory': ", "'C:\\\\Users\\\\USER\\\\Desktop\\\\pipeline_proj\\\\prefect\\\\proj_name\\\\trial_depl", "oyment'", ' }', ' }', ' ],', " 'tags': [],", " 'labels': {'prefect.flow.id': '0acc3aa0-41f8-491c-824a-9bd68adee7e3'},", " 'work_queue_name': 'default',", " 'last_polled': '2025-07-14T20:29:35.701652Z',", " 'parameter_openapi_schema': {", " 'title': 'Parameters',", " 'type': 'object',", " 'properties': {}", ' },', " 'path': None,", " 'entrypoint': 'deployment_details.py:main',", " 'storage_document_id': None,", " 'infrastructure_document_id': None,", " 'created_by': None,", " 'updated_by': None,", " 'work_queue_id': None,", " 'enforce_parameter_schema': True,", " 'work_pool_name': 'deployment-details-workpool',", '', " 'status': 'READY',", " 'automations': []", '}']
Nick Torba
07/14/2025, 9:18 PMset_row_count
I use in many different flows. I have 1 particular flow where when I call this task, it just hangs indefinitely, it never actually runs:
17:13:48.835 | INFO | Flow run 'sparkling-mosquito' - Created task run 'set_row_count-0' for task 'set_row_count'
17:13:48.836 | INFO | Flow run 'sparkling-mosquito' - Executing 'set_row_count-0' immediately...
It just sits here for a long time. I am not able to reproduce it anywhere except in my remote development environment. the same task run from the same flow works locally.
It even works when I run it on an ECS cluster that is configured with my "local" dev env (local in quotes because my postgres db runs remotely, but the rest of the app runs locally to be able to test jobs in ECS)
My question is, has anyone else run into the problem of a task hanging indefinitely from certain places?Arthur Ren
07/14/2025, 10:57 PMpayload
dictionary to be passed into the deployment as one type dict
argumentJezreel Zamora
07/15/2025, 2:22 AMSrinivas Kandukuri
07/15/2025, 6:48 AMGiacomo Chiarella
07/15/2025, 8:49 AMapi/task_runs/{id}/set_state
?Philip MacMenamin
07/15/2025, 2:38 PMgee
07/15/2025, 7:19 PMDavid Martin Calalang
07/15/2025, 8:45 PMtask_runner=DaskTaskRunner(
cluster_class="dask_cloudprovider.aws.FargateCluster,
cluster_kwargs={
"image": {image}
},
adapt_kwargs={
"minimum": 1,
"maximum": 8
}
)
@task(name="foo")
def square(x):
return (x * x)
@flow()
def process_numbers():
numbers = list(range(10))
futures = [square.submit(n) for n in numbers]
results = [future.result() for future in futures]
print(f"Squared numbers: {results}")
return results
if __name__ == "__main__":
process_numbers.with_options(task_runner=task_runner)()
input("END:")
My AWS credentials and region are set with environment variables. This implementation correctly creates a new cluster on AWS ECS which hosts the scheduler and also spawns workers, as well as brings up the Dask dashboard that shows the workers. The Dask dashboard also shows the square
tasks (10 of them) being moved to a "processing" state within the workers.
The issue is that they appear to stay stuck in that state. Moreover, the Prefect UI shows no progress with the message "This flow run has not yet generated any task or subflow runs".
I understand that there is obviously overhead with creating and provisioning resources on ESC for the scheduler and workers, but on my latest run I left it open for 15 minutes, yet it still made no progress. Am I missing something in my task runner configuration?
Note that this implementation works correctly and runs quickly (< 2 seconds) when simply passing in DaskTaskRunner
without a cluster_class (ie. creating a local cluster).Tim Olshansky
07/15/2025, 10:00 PM21:44:01.401 | INFO | Flow run 'shrewd-turkey' - Downloading flow code from storage at '.'
to 21:44:01.401 | INFO | Flow run 'shrewd-turkey' (06876ca3-9c3d-7520-8000-09cbe7a46f8c) - Downloading flow code from storage at '.'
or equivalent
3. Manage the tags for a flow run for the AWS ECS Task using prefect specific values (i.e. can we manually add the values we're interested in)
Any one of these things would make our debugging workflows much easier 🙂 Does anyone have any ideas on how we could expose this info?Ali
07/15/2025, 11:56 PMSrinivas Kandukuri
07/16/2025, 3:43 AMSrinivas Kandukuri
07/16/2025, 4:02 AMRina
07/16/2025, 8:28 AMRussell Brooks
07/16/2025, 9:27 AMSlackbot
07/16/2025, 10:16 AMAnish S
07/16/2025, 3:56 PM@flow
that runs in as an ecs task. Marvin bot wasn't helpful. My job variables looks something like follows at the moment. I can see in the codebase they use task_definition and container_definition, but couldn't find appropriate way to where these variables would go.
flow.deploy(
...
, job_variables={
"cpu": 1024*8,
"memory": 2048*8,
"execution_role_arn": arn
"vpc_id": vpc,
"cluster": "prefect-worker", # FIXME
"configure_cloudwatch_logs": True,
"cloudwatch_logs_options": {
"awslogs-create-group": "true",
"awslogs-group": "prefect-worker",
},
"cloudwatch_logs_prefix": "mesh-log",
"network_configuration": {...}
}
)
Fernando Müller
07/16/2025, 8:27 PMmatch_latest_revision_in_family
job variable to True
in an attempt to avoid the creation of new tasks definitions, but had no luck with that.
Am I missing something here? Is this happening because the flow runs with different parameters (I have 4 schedules for the same deployment, each with different parameters)?
This "problem" is making some flow runs to fail with the following error message:
Flow run could not be submitted to infrastructure: An error occurred (ClientException) when calling the RegisterTaskDefinition operation: Too many concurrent attempts to create a new revision of the specified family.
@MarvinDavid Martin Calalang
07/16/2025, 9:11 PMRitesh
07/17/2025, 5:29 AMli li
07/17/2025, 7:00 AMZhongyang Xia
07/17/2025, 3:34 PMAlexis Pumel
07/17/2025, 4:34 PMZhongyang Xia
07/17/2025, 5:40 PM17:25:29.594 | INFO | prefect.flow_runs.worker - Aborted submission of flow run '06858fc2-7ef1-7fc9-8000-c05e12028631'. Server sent an abort signal: Cannot transition a flow run with in-process retries to PENDING state.
Marvin says it's normal and not a logic bomb. I'm not sure if that's an accurate statement - given the fact that logs like this are persistent, and very likely caused our ratelimiting threshold to be hit, and eventually stopped all our data flowsSolomon Negusse
07/17/2025, 10:15 PMutils.py
def common(a):
# imagine a function that makes over network calls
pass
Option 1: create a common task that’s then invoked from the flows like so:
prefect_common.py
from prefect import task
from utils import common
@task
def common_task(a):
return common(a)
flow_a.py
from prefect import task
from prefect_common import common_task
@task
def task_one():
pass
@flow
def flow():
common_task("foo")
task_one()
flow_b.py
from prefect import task
from prefect_common import common_task
@task
def task_two():
pass
@flow
def flow():
common_task("bar")
task_two()
Option 2: create separate task in each flow that uses the common function
flow_a.py
from prefect import task
from util import common
@task
def task_one():
pass
@task
def common_task(a):
return common(a)
@flow
def flow():
common_task("foo")
task_one()
flow_b.py
from prefect import task
from util import common
@task
def task_two():
pass
@task
def common_task(a):
return common(a)
@flow
def flow():
common_task(5)
task_two()
Option 1 seems like the pythonic choice (DRY etc) and Marvin suggested that, but my concern with that is losing configurability of the task specific to each flow (different retry and timeout options, name, etc.)Ateeb Rizwan
07/18/2025, 12:27 PMZhongyang Xia
07/18/2025, 9:10 PM