Melek Alan
03/24/2025, 10:49 AMselfHostedServer
.Fabian Gruben
03/24/2025, 11:04 AMMarko T
03/24/2025, 1:00 PMPrefectHQ/prefect-hightouch
still being maintained by Prefect? It doesn't seem to support Prefect 3 because of Pydantic version problem: https://github.com/PrefectHQ/prefect-hightouch/issues/39 . I'm wondering if we really have to downgrade to Prefect 2 to use it in our pipelines.Theo Bouwman
03/24/2025, 1:24 PMStefan
03/24/2025, 1:45 PMflow: FlowDecorator
Task and every other import works fine. Latest version.Jeremy Karn
03/24/2025, 4:31 PMBen Epstein
03/24/2025, 4:45 PMfrom prefect.blocks.notifications import SlackWebhook
slack_webhook_block = SlackWebhook.load("BLOCK_NAME")
slack_webhook_block.notify("Hello from Prefect!")
this is working for me. I wrote a function to do that as per this thread and added the on_failure=[notify_slack]
but i'm getting the error (and mypy is showing the same)
packages/flows/notify.py", line 7, in notify_slack
slack_webhook_block.notify(
^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'coroutine' object has no attribute 'notify'
I'm curious why this is working when I just run this code directly, but within prefect flow infrastructure it's failing. I'm seeing that in pylance the variable is of type slack_webhook_block: SlackWebhook | Coroutine[Any, Any, SlackWebhook]
-- so perhaps because my flow is async, this notify_slack
function needs to be async as well?Abyaya Lamsal
03/24/2025, 5:41 PM2.14.16
to 2.20.16
. I started seeing some intermittent issues with a subset of flows. This seems to happen randomly; hence, not every flow run has this issue. For reference, I am using a custom image. Attached logs below:
13:29:55.058 | INFO | prefect.flow_runs.worker - Worker 'KubernetesWorker a5d26a51-ff36-4697-8daf-f8aa3a0fea54' submitting flow run '855ead39-db97-4fa6-85b0-723ddd90b7c8'
13:29:55.236 | INFO | prefect.flow_runs.worker - Creating Kubernetes job...
13:29:55.314 | INFO | prefect.flow_runs.worker - Completed submission of flow run '855ead39-db97-4fa6-85b0-723ddd90b7c8'
13:29:55.349 | INFO | prefect.flow_runs.worker - Job 'adept-hog-hwcjq': Pod has status 'Pending'.
13:30:55.327 | ERROR | prefect.flow_runs.worker - Job 'adept-hog-hwcjq': Pod never started.
13:30:55.570 | INFO | prefect.flow_runs.worker - Pod event 'Scheduled' at 2025-03-18 13:29:55+00:00: Successfully assigned [OUR_NAMESPACE]/adept-hog-hwcjq-pqhbc to <INTERNAL_NODE>
13:30:55.571 | INFO | prefect.flow_runs.worker - Job event 'SuccessfulCreate' at 2025-03-18 13:29:55+00:00: Created pod: adept-hog-hwcjq-pqhbc
13:30:55.572 | INFO | prefect.flow_runs.worker - Pod event 'Pulling' at 2025-03-18 13:29:56+00:00: Pulling image "<CUSTOM_IMAGE>"
13:30:55.572 | INFO | prefect.flow_runs.worker - Pod event 'Pulled' at 2025-03-18 13:30:33+00:00: Successfully pulled image "<CUSTOM_IMAGE>" in 37.16s (37.16s including waiting). Image size: <SIZE> bytes.
13:30:55.716 | INFO | prefect.flow_runs.worker - Reported flow run '855ead39-db97-4fa6-85b0-723ddd90b7c8' as crashed: Flow run infrastructure exited with non-zero status code -1.
<NORMAL EXECUTION>
...
The job eventually runs. The issue is if I subscribe to any failure notification, then I get randomly bombarded with crash notifications, which is not very helpful. Would appreciate any pointers here. Here is a sample of the job logs:Stefan
03/24/2025, 6:54 PMTim Galvin
03/25/2025, 8:02 AMflow
. I am doing this inside a docker container. My test fails in the container with the following error:
FAILED tests/test_prefect_helpers.py::test_enable_loguru_support - sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such column: "Debug Print Notification" - should this be a string literal in single-quotes?
@Marvin - what is going wrong and causing the error?Claudiu
03/25/2025, 11:02 AMwork pool
in order to schedule flows, but since our deployment will be tied to some specific hardware (Jetson) we have to support emergency use-case without any cloud or work pool access. Our orchestration and infrastructure layers are on the same hardware so we don't need to separate them. What's the solution for us to be able to schedule flows without work pools?
• is there a way to pause/resume flows without work pools? Rn it seems we NEED to create a work pool to have this functionality.
• prefect.deploy
or prefect.serve
seem like good tools for remote deployment, but that just isn't our usecase.
• Do work pools make sense for our specific use-case or is there an entity that we can use instead?
Currently we have a yaml file that provides the scheduling details for a flow, but it's a very convoluted process. having the ability to directly scheduling a task when needed would simplify our process (more details in the thread)
Issue nr 2: serialization issues
• we have some entities that can't be easily serialized and custom serialisation logic will require additional parts of the system, that aren't implemented in the scope of POC . We know you have some serializers, but they don't work for our entities.
• we also have some singleton classes that act as a "syncing" element in our system. Is there a better alternative to manage the state for a single-machine all in one deployment?
• we're currently using the default task runner
, is there any benefit to using another one (like DaskTaskRunner
)? given that we don't need distributed cognition for POCMartin Tomovič
03/25/2025, 11:39 AMChris
03/25/2025, 12:35 PMprefect.yaml
, the job variables appear in my deployment configuration but are not passed through to my ECS Task. A task definition is created with the name of my work pool and a UUID (e.g., prefect_ecs-dev-pool_79227824-ac2e-48de-8639-58bc6fae1f6c
), but it does not include the configured variables. As a result, network settings and IAM roles are missing (managed to bypass this by allowing my worker to pass roles to my task… for now.).
I’ve already discussed this with @Bianca Hoch but am still unsure why this is happening. Hoping the community can help us troubleshoot! Thanks in advance.
Secondary ask;
I’m using webhooks in Prefect Cloud and would like to validate incoming webhooks. To do this, I need access to the raw, unprocessed request body to generate a signature and match it against the signature in the headers. However, it seems that Prefect modifies the raw body in some way, making it impossible to validate my signature.
While I appreciate the ability to extract webhook parameters directly into my flow, it would be helpful to have an option to pass the full request payload so I can handle it myself - without manipulation by prefect. I’ve tried several approaches to pass the raw body to my function, but I either get validation errors in my automation or signatures that don’t match.
Has anyone else encountered this, or found a workaround? Would love any insights.
Thanks!antonk
03/25/2025, 2:58 PMEmon Datta
03/25/2025, 5:13 PMResponse: {'detail': "Error creating flow run: Validation failed. Failure reason: 'args' is a required property"}
any advice for how to fix this?sadath
03/25/2025, 5:38 PMLuis Henrique Dallelaste Bernartt
03/25/2025, 5:46 PMsadath
03/26/2025, 5:25 AMVlad
03/26/2025, 9:31 AMSam Greenwood
03/26/2025, 11:46 AMprefect deploy
issue which was reliably producing badly built images for ECR which EKS then refused to run (EKS reported on bad hash on image pull, imagePullErr).
I had had this issue for over a week and tried everything with installing/re-installing Docker, different package manager venvs, different project layouts for prefect.yaml, etc.
In the end, the solution was to disable the containerd image store in Docker Desktop.
With that checkbox unticked, the prefect deploy
worked first time, with it checked it would reliably not work.
This is Prefect 3.2 with Python 3.12.Abdul Raheem Siddiqui
06/27/2025, 10:26 PMMartin Klefas
06/27/2025, 11:15 PMBen Epstein
06/29/2025, 12:34 AMMohammad Nassar
06/29/2025, 10:22 AMMohammed Noureldin
06/29/2025, 5:40 PM.deploy
, as Prefect does not output anything but the very latest error, which is mostly missing the context and the text of the real fatal error. While building (in the deploy step) a custom image, it only shows building, without the steps, and if an error happens, show the "error" message, while a the real message, when using docker build, is in the fatal message, which prefect just does not show. Can we make prefect show all of the building steps, just like docker build?Royzac
06/29/2025, 10:54 PMMohammed Noureldin
06/29/2025, 11:48 PM.deploy
to build the whole image again.
I already tried
image=DockerImage(
name="myworkflow",
tag="latest",
dockerfile="Dockerfile",
build_kwargs={"nocache": True},
But for some reason the build_kwargs is not recognized (I am getting an error)Tobias Bjormyr
06/30/2025, 11:26 AMLegacy Free Tier
to Starter Tier
, but when we try to initiate the process by clicking on "Upgrade" we get this error.
I've already emailed <mailto:help@prefect.io|help@prefect.io>
early on Friday - still waiting for answers.
Does anyone know the normal response time? I haven't received a notification that the support case has been received.
Is this a normal bug people have seen before?Robin
06/30/2025, 1:11 PMPav Staz
06/30/2025, 5:00 PM@task
async def check_logs():
async with get_client() as client:
# Ensure any pending logs are sent
await APILogHandler.aflush()
logs = await client.read_logs(
LogFilter(flow_run_id={"any_": [runtime.flow_run.id]})
)
records = []
for log in logs:
# Gets task run and flow run info
if log.task_run_id is not None:
task_runs = await client.read_task_runs(
task_run_filter=TaskRunFilter(id=TaskRunFilterId(any_=[log.task_run_id]))
)
task_run = task_runs[0]
task_run_name = task_run.name
print("-------------------- Task Run Details ----------------------------")
print(task_run)
print("------------------------------------------------------------------")
if task_run.flow_run_id is not None:
flow_runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(id=FlowRunFilterId(any_=[task_run.flow_run_id]))
)
flow_run = flow_runs[0]
print("-------------------- Flow Run Details ----------------------------")
print(flow_run)
print("------------------------------------------------------------------")
if flow_run.flow_id is not None:
flow = await client.read_flow(flow_run.flow_id)
print(f"Flow name is {flow.name}")
print(log)
@flow(log_prints=True)
def daily_flow():
## A subflow
some_subflow()
## A task
some_task()
check_logs.submit()
# only logs of the some_task are printed by the check_logs function, not the some_subflow function