Bjarne Johannsen
08/05/2025, 10:28 PMgee
08/06/2025, 11:20 AMNick Torba
08/06/2025, 1:12 PMAkash N
08/06/2025, 1:18 PMKiran
08/06/2025, 1:51 PMBen
08/06/2025, 2:39 PM16:19:01.204 | INFO | prefect.flow_runs.worker - Reported flow run 'f6740f0f-e635-4017-a73c-c8907010f99e' as crashed: Flow run could not be submitted to infrastructure:
agent-aws-esc-1 | ProfileNotFound('The config profile (prefect) could not be found')
Michael Hadorn
08/06/2025, 4:25 PMZhongyang Xia
08/06/2025, 7:47 PM19:40:23.179 | WARNING | prefect.events.clients - Unable to connect to '<wss://api.prefect.cloud/api/accounts/[uuid]/workspaces/[uuid]/events/in>'. Please check your network settings to ensure websocket connections to the API are allowed. Otherwise event data (including task run data) may be lost. Reason: timed out during opening handshake. Set PREFECT_DEBUG_MODE=1 to see the full error.
Samuel Hinton
08/07/2025, 5:53 AMdocker run
command, the UI is very different in the flow runs. Prefect Cloud has tracing, log trees, better artifact viewing, etc. Is there a way to unify the UIs, or is Prefect Cloud having a more advanced UI one of the gatekeepers to encourage subscription?António Domingues
08/07/2025, 9:13 AMload_data
task could be fairly generic and used in two different flows such as apply_model
or train_model
. How to go about it? Write each task in a file and load as needed in each flow file?
Cheers!Naren K
08/07/2025, 11:22 AM@wait_for_deployments
.
It allows one flow to wait for other deployments to complete before running — no need for nesting flows or using Cloud Automations.
✅ Async/sync compatible
✅ Configurable timeout & retry
✅ Lightweight + fully OSS-friendly
🔗 GitHub Issue: https://github.com/PrefectHQ/prefect/issues/18665
🔗 Repo: https://github.com/nrjangir/prefect-flow-dependency
Would love your feedback — happy to improve or open a PR if there’s interest! 🙌
#CL09KU1K7David Martin Calalang
08/07/2025, 3:10 PMExited with non-zero code. (Error Code: 1
or
RuntimeError: Timed out after 301.83046412467957s while watching task for status RUNNING.
Has anyone else deployed Prefect flows through Prefect workers hosted on ECS and faced any similar issues or have any insights?Ricardo Garcia Silva
08/07/2025, 5:53 PMprocess
and since the cluster cannot access the Internet, I'm also storing flows locally on each node. So I have setup a couple of nodes, each running prefect worker
with a work pool of type process
.
I have successfully came up with an ansible playbook which deploys my sample flow to the cluster. However, I would like to be able to use an isolated environment for each flow - how can I achieve this when using the process
work pool type?
I have already created a venv for my flow and would now like to configure the prefect deployment with instructions to use it. This is my prefect.yaml
file:
deployments:
- name: "my_workflow"
entrypoint: "main.py:my_workflow"
work_pool:
name: green
work_queue_name: low
job_variables:
command: |
echo "About to activate" && . /opt/prefect/flows/demo1/activate.sh && /opt/prefect/flows/demo1/env/bin/python -m prefect.engine
working_dir: "/opt/prefect/flows/demo1"
unfortunately this does not seem to work - the flow is deployed successfully and I can trigger workflow runs just fine. However they do not use the custom environment, but rather run with the same environment being used by the prefect worker process.
I was hoping to be able to override the python env with that job_variables.command
property of the deployment, but it seems to be silently ignored. Is there some other way to do this, maybe using env variables?
Thanks for helping out!Andrew Allen
08/07/2025, 5:55 PMRayTaskRunner
or DaskTaskRunner
on kubernetes.Daniel Rodriguez Ciotti
08/07/2025, 7:36 PMNicolas
08/07/2025, 9:51 PMNicolas
08/07/2025, 9:52 PMJaco Waes
08/08/2025, 6:00 AMpull: &id001
- prefect.deployments.steps.run_shell_script:
id: ensure-directory
script: mkdir -p /opt/prefect/repos && echo "Directory ready"
stream_output: false
- prefect.deployments.steps.set_working_directory:
directory: "/opt/prefect/repos"
- prefect.deployments.steps.git_clone:
id: clone_step
repository: <https://github.com/MY_ORG/bess-prefect.git>
branch: "{{ $PREFECT_VAR_deployment_branch }}"
access_token: "{{ prefect.blocks.secret.github-access-token }}"
- prefect.deployments.steps.set_working_directory:
directory: "/opt/prefect/repos/{{ clone_step.directory }}"
- prefect.deployments.steps.pip_install_requirements:
requirements_file: requirements.txt
stream_output: false
António Domingues
08/08/2025, 7:46 AMAmith M
08/09/2025, 5:50 AM.serve
deployment on gcp. I have everything setup and the deployment gets the job and reports it back to the prefect server, but it never starts the job, it goes to "pending" state. It then goes back for listening for jobs again. the worker/runner never starts.
and sometimes every other run, there is some form of httpx error in the logs.
i have attached the logsYaron Levi
08/11/2025, 10:38 AMRobyn H
08/11/2025, 3:49 PMPREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS=1
.
In my test flow, if I add a concurrency limit on one task (tag-based), I notice the execution time of the flow increasing significantly. I'm testing it on a flow where I use asyncio to start many executions of task1 concurrently (I'm using the default task runner), and task1 just calls task2 which has a tag with a concurrency limit. Even if I set the number of slots in the concurrency limit to be higher than the number of tasks (e.g. 1000 tasks, 1001 slots), I see a significant runtime increase (from 7s to 33s, in my case). I assume this increased latency is due to database round trips and lock contention on acquiring slots, is this correct?
In this test, we're not even waiting on slots to become free - there are always free slots (I set the slot value this high to verify that we're seeing significant latency despite not actually waiting for tasks to free up slots, as this was my suspicion). For e.g. 50k tasks, the execution time goes from 5min to 30min.
What I'm wondering is if there is any way to optimize this so we don't see such major increases? Can I tune my database?Gabor Hosszu
08/12/2025, 8:08 AMDavid Martin Calalang
08/12/2025, 3:28 PMrequests.exceptions.HTTPError: 500 Server Error: Internal Server Error for url: <http+docker://localnpipe/v1.41/images/{ecr_registry}/{image}/push?tag=latest>
Has anyone else encountered this in the past? My .yaml configuration for the build and push steps are below...
# build section allows you to manage and build docker images
build:
- prefect_docker.deployments.steps.build_docker_image:
id: build_image
requires: prefect-docker>=0.3.1
image_name: {ecr_registry}/{image}
tag: latest
dockerfile: Dockerfile
# push section allows you to manage if and how this project is uploaded to remote locations
push:
- prefect_docker.deployments.steps.push_docker_image:
requires: prefect-docker>=0.3.1
image_name: '{{ build_image.image_name }}'
tag: '{{ build_image.tag }}'
I've made sure that my AWS user has the correct permissions, and that I've logged into ecr prior to deployment.pavel hrablis
08/12/2025, 5:10 PMasyncio
(for example ):
import asyncio
import random
from typing import AsyncGenerator, Callable, Awaitable
async def run_jobs_async(jobs: list[Callable[[], Awaitable]]) -> AsyncGenerator:
tasks = [asyncio.create_task(job()) for job in jobs]
for completed in asyncio.as_completed(tasks):
result = await completed
yield result
async def sample_job(i: int) -> str:
await asyncio.sleep(random.random() * 2) # simulate work
return f"Job {i} done"
async def main():
jobs = [lambda i=i: sample_job(i) for i in range(5)]
async for outcome in run_jobs_async(jobs):
print(outcome)
if name == "__main__":
asyncio.run(main())
If I implement my logic in this way, will I lose key Prefect benefits — such as automatic logging, dependency tracking, the ability to restart only parts of the pipeline, etc.?
Or is there a recommended way to implement my use case in Prefect 3 so that I can keep those benefits?Andres Mora
08/13/2025, 11:58 AM{
'Type': 'Notification',
'MessageId': 'd4aa09a0-6d51-5219-bcb2-68b12345bd7b',
'TopicArn': 'arn:aws:sns:us-east-1:1234567890:sns_topic',
'Subject': 'Amazon S3 Notification',
'Message': '{
"Records":
[
{
"eventVersion":"2.1",
"eventSource":"aws:s3",
"eventTime":"2025-08-12T16:25:24.066Z",
"s3":{"s3SchemaVersion":"1.0","configurationId":"New file",
"bucket":{"name":"testing.data","arn":"arn:aws:s3:::testing.data"},
"object":{"key":"data_file.csv","size":34}
}
}
]
}',
'Timestamp': '2025-08-12T16:25:24.883Z',
}
I want to capture the object key "_data_file.csv_" but i haven't been able to. What I've tested is:
In the Webhook's template, if I extract, for example, the MessageId
like this: "messageId": "{{ body.MessageId }}"
There is no issue. This works fine.
But if I try to extract something from inside the Message
of the body using something like this: "file_name": "{{ body.Message.Records[0].s3.object.key }}"
Nothing happens, file_name ends up being empty.
I thought that maybe the whole Message
was being parsed as a single string so I did this just to test: "file_name": "{{ body.Message }}"
This made the Webhook fail with this error: "_unexpected character: line 8 column 23 (char 425)_"
It seems like I just cannot access Message
, not even as a string(?)
Does anyone understand this issue and know how to extract the value I'm trying to obtain?
Thanks for the help!Kiran
08/13/2025, 12:37 PMAdrien Besnard
08/13/2025, 1:02 PMprepare_flow_for_deployment
within a CLI to deploy some flows. How to avoid the attempted relative import beyond top-level package
issue if my flow imports some stuff from other relative packages?Yaron Levi
08/13/2025, 3:09 PMTri
08/13/2025, 7:10 PMprefect-shell
and the shell operation block
, are they better suited for calling external applications?