Theom
03/21/2025, 3:24 PMfrom prefect import flow
from prefect.results import get_result_store
@flow(persist_result=True)
def pipeline():
store = get_result_store()
result = store.read("my-key") # Where I know that a file corresponding to this key exists on disk
return result
if __name__ == "__main__":
pipeline()
But I get the following error.Damian Birchler
03/21/2025, 6:33 PMTinendra kumar
03/22/2025, 7:22 PMKiana Alessandra Villaera
03/23/2025, 2:54 PM(prefect-2) ubuntu@ip-172-31-13-253:~/Docker/eti-data-pipeline-worflows$ prefect worker start --pool my-ecs-pool
Discovered type 'ecs' for work pool 'my-ecs-pool'.
Worker 'ECSWorker 05116438-a5b0-42d3-a699-1a23bed3ba35' started!
Failed the last 3 attempts. Please check your environment and configuration.
Examples of recent errors:
Traceback (most recent call last):
File "/home/ubuntu/.conda/envs/prefect-2/lib/python3.10/site-packages/prefect/utilities/services.py", line 64, in critical_service_loop
await workload()
File "/home/ubuntu/.conda/envs/prefect-2/lib/python3.10/site-packages/prefect/workers/base.py", line 760, in get_and_submit_flow_runs
runs_response = await self._get_scheduled_flow_runs()
File "/home/ubuntu/.conda/envs/prefect-2/lib/python3.10/site-packages/prefect/workers/base.py", line 917, in _get_scheduled_flow_runs
await self.client.get_scheduled_flow_runs_for_work_pool(
File "/home/ubuntu/.conda/envs/prefect-2/lib/python3.10/site-packages/prefect/client/orchestration/_work_pools/client.py", line 586, in get_scheduled_flow_runs_for_work_pool
response = await self.request(
File "/home/ubuntu/.conda/envs/prefect-2/lib/python3.10/site-packages/prefect/client/orchestration/base.py", line 53, in request
return await self._client.send(request)
File "/home/ubuntu/.conda/envs/prefect-2/lib/python3.10/site-packages/prefect/client/base.py", line 354, in send
response.raise_for_status()
File "/home/ubuntu/.conda/envs/prefect-2/lib/python3.10/site-packages/prefect/client/base.py", line 162, in raise_for_status
raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Server error '500 Internal Server Error' for url '<http://127.0.0.1:4200/api/work_pools/my-ecs-pool/get_scheduled_flow_runs>'
Response: {'exception_message': 'Internal Server Error'}
For more information check: <https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/500>
I presume this has something to do with my worker not able to access the API. but my task-definition.json
already has my prefect api user key configured. I'm at a loss as to what step I've missed. Perhaps my PREFECT_APU_URL
is incorrect?Ulysse Petit
03/23/2025, 4:37 PMzizhengwang
03/24/2025, 7:26 AMGiacomo Chiarella
03/24/2025, 8:36 AMflow_runs = await client.read_flow_runs(flow_run_filter=FlowRunFilter(id={"any_": [flow_run_id]}), limit=1)
state_updates = {}
state_updates.setdefault("name", "Cancelled")
state_updates.setdefault("type", StateType.CANCELLED)
state = flow_runs[0].state.copy(update=state_updates)
await client.set_flow_run_state(flow_run_id=flow_run_id, state=state, force=True)
I’ve noticed although the flow run state is set correctly, the task runs are not cancelled, they are actually still executed. I’ve read from doc that cancelling the flow run like that actually forces the state to be set in Prefect database but it does not stop the tasks. Here where I’ve got that info.
The agent type is Process. I’m on Prefect 3.2.7.
How to cancel a flow run execution and all its task runs? Is it something I should manage? Is there an out-of-the-box or Prefect way than retrieve all task runs of the flow run and loop over the task run list cancelling them?Kiran
03/24/2025, 10:12 AM@flow(log_prints=True,name="highpriority-triggering-flow",flow_run_name="highpriority-triggering-flow-run")
async def highpriority_first_triggeringflow():
with open("calc_file.txt", "r") as file:
expressions = [line.strip() for line in file.readlines()]
flow_names=["first-flow","second-low","third-flow","fourth-flow","fifth-flow","sixth-flow","seventh-flow","eigth-flow","ninth-flow","tenth-flow"]
dictionary=dict(zip(expressions, flow_names))
print(f"the created dictionary is {dictionary}")
flow_tasks=[]
for num,name in dictionary.items():
print(f"the number is {num}")
print(f"the name is {name}")
print(f"triggering flow run {name} successfully")
if int(num) in range(1,4):
print(f"entered first if condition, number is {num}")
flow_tasks.append(run_deployment(
name="main-flow/highpriority-maindeployment", # Deployment Name
parameters={"num": num},
flow_run_name=name,
work_queue_name="high"
#scheduled_time=datetime(2025,03,21,4,30,00)
))
elif int(num) in range(4,7):
print(f"entered second if condition, number is {num}")
flow_tasks.append(run_deployment(
name="main-flow/highpriority-maindeployment", # Deployment Name
parameters={"num": num},
flow_run_name=name,
work_queue_name="medium"
))
elif int(num) in range(7,11):
print(f"entered third if condition, number is {num}")
flow_tasks.append(run_deployment(
name="main-flow/highpriority-maindeployment", # Deployment Name
parameters={"num": num},
flow_run_name=name,
work_queue_name="low"
))
print(f"flow tasks are {flow_tasks}")
await asyncio.gather(*flow_tasks)
print("All flows triggered successfully!")
if __name__=="__main__":
asyncio.run(highpriority_first_triggeringflow())
Kiran
03/24/2025, 10:13 AMKiran
03/24/2025, 10:16 AM@flow(log_prints=True,name="highpriority-triggering-flow",flow_run_name="highpriority-triggering-flow-run")
async def highpriority_first_triggeringflow():
with open("calc_file.txt", "r") as file:
expressions = [line.strip() for line in file.readlines()]
flow_names=["first-flow","second-low","third-flow","fourth-flow","fifth-flow","sixth-flow","seventh-flow","eigth-flow","ninth-flow","tenth-flow"]
dictionary=dict(zip(expressions, flow_names))
print(f"the created dictionary is {dictionary}")
flow_tasks=[]
for num,name in dictionary.items():
print(f"the number is {num}")
print(f"the name is {name}")
print(f"triggering flow run {name} successfully")
if int(num) in range(1,4):
print(f"entered first if condition, number is {num}")
flow_tasks.append(run_deployment(
name="main-flow/highpriority-maindeployment", # Deployment Name
parameters={"num": num},
flow_run_name=name,
work_queue_name="high"
#scheduled_time=datetime(2025,03,21,4,30,00)
))
elif int(num) in range(4,7):
print(f"entered second if condition, number is {num}")
flow_tasks.append(run_deployment(
name="main-flow/highpriority-maindeployment", # Deployment Name
parameters={"num": num},
flow_run_name=name,
work_queue_name="medium"
))
elif int(num) in range(7,11):
print(f"entered third if condition, number is {num}")
flow_tasks.append(run_deployment(
name="main-flow/highpriority-maindeployment", # Deployment Name
parameters={"num": num},
flow_run_name=name,
work_queue_name="low"
))
print(f"flow tasks are {flow_tasks}")
await asyncio.gather(*flow_tasks)
print("All flows triggered successfully!")
if __name__=="__main__":
asyncio.run(highpriority_first_triggeringflow())
Melek Alan
03/24/2025, 10:49 AMselfHostedServer
.Fabian Gruben
03/24/2025, 11:04 AMMarko T
03/24/2025, 1:00 PMPrefectHQ/prefect-hightouch
still being maintained by Prefect? It doesn't seem to support Prefect 3 because of Pydantic version problem: https://github.com/PrefectHQ/prefect-hightouch/issues/39 . I'm wondering if we really have to downgrade to Prefect 2 to use it in our pipelines.Theo Bouwman
03/24/2025, 1:24 PMStefan
03/24/2025, 1:45 PMflow: FlowDecorator
Task and every other import works fine. Latest version.Jeremy Karn
03/24/2025, 4:31 PMBen Epstein
03/24/2025, 4:45 PMfrom prefect.blocks.notifications import SlackWebhook
slack_webhook_block = SlackWebhook.load("BLOCK_NAME")
slack_webhook_block.notify("Hello from Prefect!")
this is working for me. I wrote a function to do that as per this thread and added the on_failure=[notify_slack]
but i'm getting the error (and mypy is showing the same)
packages/flows/notify.py", line 7, in notify_slack
slack_webhook_block.notify(
^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'coroutine' object has no attribute 'notify'
I'm curious why this is working when I just run this code directly, but within prefect flow infrastructure it's failing. I'm seeing that in pylance the variable is of type slack_webhook_block: SlackWebhook | Coroutine[Any, Any, SlackWebhook]
-- so perhaps because my flow is async, this notify_slack
function needs to be async as well?Abyaya Lamsal
03/24/2025, 5:41 PM2.14.16
to 2.20.16
. I started seeing some intermittent issues with a subset of flows. This seems to happen randomly; hence, not every flow run has this issue. For reference, I am using a custom image. Attached logs below:
13:29:55.058 | INFO | prefect.flow_runs.worker - Worker 'KubernetesWorker a5d26a51-ff36-4697-8daf-f8aa3a0fea54' submitting flow run '855ead39-db97-4fa6-85b0-723ddd90b7c8'
13:29:55.236 | INFO | prefect.flow_runs.worker - Creating Kubernetes job...
13:29:55.314 | INFO | prefect.flow_runs.worker - Completed submission of flow run '855ead39-db97-4fa6-85b0-723ddd90b7c8'
13:29:55.349 | INFO | prefect.flow_runs.worker - Job 'adept-hog-hwcjq': Pod has status 'Pending'.
13:30:55.327 | ERROR | prefect.flow_runs.worker - Job 'adept-hog-hwcjq': Pod never started.
13:30:55.570 | INFO | prefect.flow_runs.worker - Pod event 'Scheduled' at 2025-03-18 13:29:55+00:00: Successfully assigned [OUR_NAMESPACE]/adept-hog-hwcjq-pqhbc to <INTERNAL_NODE>
13:30:55.571 | INFO | prefect.flow_runs.worker - Job event 'SuccessfulCreate' at 2025-03-18 13:29:55+00:00: Created pod: adept-hog-hwcjq-pqhbc
13:30:55.572 | INFO | prefect.flow_runs.worker - Pod event 'Pulling' at 2025-03-18 13:29:56+00:00: Pulling image "<CUSTOM_IMAGE>"
13:30:55.572 | INFO | prefect.flow_runs.worker - Pod event 'Pulled' at 2025-03-18 13:30:33+00:00: Successfully pulled image "<CUSTOM_IMAGE>" in 37.16s (37.16s including waiting). Image size: <SIZE> bytes.
13:30:55.716 | INFO | prefect.flow_runs.worker - Reported flow run '855ead39-db97-4fa6-85b0-723ddd90b7c8' as crashed: Flow run infrastructure exited with non-zero status code -1.
<NORMAL EXECUTION>
...
The job eventually runs. The issue is if I subscribe to any failure notification, then I get randomly bombarded with crash notifications, which is not very helpful. Would appreciate any pointers here. Here is a sample of the job logs:Stefan
03/24/2025, 6:54 PMTim Galvin
03/25/2025, 8:02 AMflow
. I am doing this inside a docker container. My test fails in the container with the following error:
FAILED tests/test_prefect_helpers.py::test_enable_loguru_support - sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such column: "Debug Print Notification" - should this be a string literal in single-quotes?
@Marvin - what is going wrong and causing the error?Claudiu
03/25/2025, 11:02 AMwork pool
in order to schedule flows, but since our deployment will be tied to some specific hardware (Jetson) we have to support emergency use-case without any cloud or work pool access. Our orchestration and infrastructure layers are on the same hardware so we don't need to separate them. What's the solution for us to be able to schedule flows without work pools?
• is there a way to pause/resume flows without work pools? Rn it seems we NEED to create a work pool to have this functionality.
• prefect.deploy
or prefect.serve
seem like good tools for remote deployment, but that just isn't our usecase.
• Do work pools make sense for our specific use-case or is there an entity that we can use instead?
Currently we have a yaml file that provides the scheduling details for a flow, but it's a very convoluted process. having the ability to directly scheduling a task when needed would simplify our process (more details in the thread)
Issue nr 2: serialization issues
• we have some entities that can't be easily serialized and custom serialisation logic will require additional parts of the system, that aren't implemented in the scope of POC . We know you have some serializers, but they don't work for our entities.
• we also have some singleton classes that act as a "syncing" element in our system. Is there a better alternative to manage the state for a single-machine all in one deployment?
• we're currently using the default task runner
, is there any benefit to using another one (like DaskTaskRunner
)? given that we don't need distributed cognition for POCMartin Tomovič
03/25/2025, 11:39 AMChris
03/25/2025, 12:35 PMprefect.yaml
, the job variables appear in my deployment configuration but are not passed through to my ECS Task. A task definition is created with the name of my work pool and a UUID (e.g., prefect_ecs-dev-pool_79227824-ac2e-48de-8639-58bc6fae1f6c
), but it does not include the configured variables. As a result, network settings and IAM roles are missing (managed to bypass this by allowing my worker to pass roles to my task… for now.).
I’ve already discussed this with @Bianca Hoch but am still unsure why this is happening. Hoping the community can help us troubleshoot! Thanks in advance.
Secondary ask;
I’m using webhooks in Prefect Cloud and would like to validate incoming webhooks. To do this, I need access to the raw, unprocessed request body to generate a signature and match it against the signature in the headers. However, it seems that Prefect modifies the raw body in some way, making it impossible to validate my signature.
While I appreciate the ability to extract webhook parameters directly into my flow, it would be helpful to have an option to pass the full request payload so I can handle it myself - without manipulation by prefect. I’ve tried several approaches to pass the raw body to my function, but I either get validation errors in my automation or signatures that don’t match.
Has anyone else encountered this, or found a workaround? Would love any insights.
Thanks!antonk
03/25/2025, 2:58 PMEmon Datta
03/25/2025, 5:13 PMResponse: {'detail': "Error creating flow run: Validation failed. Failure reason: 'args' is a required property"}
any advice for how to fix this?sadath
03/25/2025, 5:38 PMLuis Henrique Dallelaste Bernartt
03/25/2025, 5:46 PMsadath
03/26/2025, 5:25 AMVlad
03/26/2025, 9:31 AMSam Greenwood
03/26/2025, 11:46 AMprefect deploy
issue which was reliably producing badly built images for ECR which EKS then refused to run (EKS reported on bad hash on image pull, imagePullErr).
I had had this issue for over a week and tried everything with installing/re-installing Docker, different package manager venvs, different project layouts for prefect.yaml, etc.
In the end, the solution was to disable the containerd image store in Docker Desktop.
With that checkbox unticked, the prefect deploy
worked first time, with it checked it would reliably not work.
This is Prefect 3.2 with Python 3.12.