Melek Alan
08/28/2025, 12:41 PMreplicaCount=2
the same flow run is picked by multiple workers.
This never happened before the upgrade.
Is this a change in worker behavior, or a bug? What’s the recommended way to prevent multiple workers from claiming the same run?
Thanks!Slackbot
08/28/2025, 2:23 PMVictor
08/28/2025, 2:25 PMVictor
08/28/2025, 2:32 PMVictor
08/28/2025, 6:07 PMSamuel Hinton
08/28/2025, 11:35 PMrun_deployment
which I worry might be a bug and not user error. The summary is that I'm scheduling flow runs on NERSC via slurm, which means the flow is often in pending for many minutes before slurm allocates resources. The timeout
parameter is kept to None, but it seems that if the poll then it errors. Ie in my parent flow logs I can see:
PrefectHTTPStatusError("Server error '500 Internal Server Error' for url '<http://prefect.prefect-pipelines.production.svc.spin.nersc.org/api/flow_runs/9418ee41-b55c-40ee-a7cb-879df77a766a>'\nResponse: {'exception_message': 'Internal Server Error'}\nFor more information check: <https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/500>")
Which comes from File "/usr/local/lib/python3.13/site-packages/prefect/deployments/flow_runs.py", line 203, in run_deployment
and flow_run = await client.read_flow_run(flow_run_id)
When I wait a minute for the job to be scheduled, I can hit this endpoint perfectly:
{
"id": "9418ee41-b55c-40ee-a7cb-879df77a766a",
"created": "2025-08-28T23:20:39.035659Z",
"updated": "2025-08-28T23:20:43.784032Z",
"name": "preprocess_/data/level=raw/runs/run_id=25_056_084/science_red.fits",
"flow_id": "344eea82-0256-46b2-bdc2-615a21be48e6",
"state_id": "0198f2fb-c55b-7ab9-aab6-971eb870d3ff",
"deployment_id": "c29d351e-ee20-4267-93ee-d157b91f1d6b",
"deployment_version": "104f2d07",
"work_queue_id": "b0ac5ee7-ce2f-47ed-8f6d-7ddce9c0a1b0",
"work_queue_name": "default",
...
}
So I feel like run_deployment
shouldn't be raising an error in this instance. Any devs around to share their thoughts?
Specifically, the code in `run_deployment`:
with anyio.move_on_after(timeout):
while True:
flow_run = await client.read_flow_run(flow_run_id)
flow_state = flow_run.state
if flow_state and flow_state.is_final():
return flow_run
await anyio.sleep(poll_interval)
Uses timeout to wait for flow completion, but it does seem to assume relatively immediate flow registrationMatt Alhonte
08/29/2025, 12:00 AMTom Han
08/29/2025, 4:09 AMFile "...\.venv\lib\site-packages\smbprotocol\transport.py", line 68, in connect
raise ValueError(f"Failed to connect to '{self.server}:{self.port}': {err}") from err
ValueError: Failed to connect to 'my.smb.serber.edusmb::445': [Errno 11001] getaddrinfo failed.
When I PDB'd into the stack trace it is the same _as_unc_path
double appending server as observed in the issue:
In `prefect.filesystem.SMB`:
# SMB.writepath calls RemoteFileSystem.writepath
# Around line 520
path = self._resolve_path(path)
dirpath = path[: path.rindex("/")]
# dirpath is <smb://my.smb.server.edu/path/to/my/folder>
self.filesystem.makedirs(dirpath, exist_ok=True)
But in `fsspec`:
#fsspec/implementation/smb.py
class SMBFileSystem()......
def makedirs(self, path, exist_ok=False):
# dirpath prefect passes in already contains `<smb://my.smb.server.edu>`
if _share_has_path(path):
wpath = _as_unc_path(self.host, path) # <=== This line double appends it
# now this line would fail because
# wpath = \\my.smb.server.edusmb:\\<http://my.smb.server.edu|my.smb.server.edu>\path\to\my\folder
smbclient.makedirs(wpath, exist_ok=exist_ok, port=self._port)
Has there been any workarounds? Thanks!Samuel Hinton
08/29/2025, 6:02 AMjson
artifact to Prefect?Idriss Bellil
08/29/2025, 10:57 AMmy_flow.with_options(retries=100).deploy(...)
I tested it and I didn't work then Marvin retracted that.
2. (more like a suggestion) it will be nice to see the full name of the deployment and not the dots suffix like project-1_flow-x_...
(as the name has valuable info for us) or at least have an option to display the full deployment name. Another missing info in the UI from what I could see is flow/deployment options (retry logic for instance because I could only see it in the deployment run - like I have to run it then cancel it just to see the setup retry logic)Michael Schwartz
08/31/2025, 12:52 AM{
"type": "event",
"match": {},
"match_related": {
"prefect.resource.role": "flow-run"
},
"after": [],
"expect": [
"cdf.publish_audited_tables"
],
"for_each": [],
"posture": "Reactive",
"threshold": 1,
"within": 0
}
the problem I'm having is it consistently triggers the flow it runs 3 times, each within ~30 minutes of each other. What might be causing this? the event is only emitted once @MarvinASHIK
09/01/2025, 5:27 AMVic
09/01/2025, 8:08 AMMorten Hauge
09/01/2025, 11:27 AMMichael Schwartz
09/01/2025, 11:29 AM{
"type": "event",
"match": {},
"match_related": {
"prefect.resource.role": "flow-run"
},
"after": [],
"expect": [
"cdf.publish_audited_tables"
],
"for_each": [],
"posture": "Reactive",
"threshold": 1,
"within": 0
}
the problem I'm having is
it consistently triggers the flow it runs 3 times, each within ~30
minutes of each other. What might be causing this? the event is only
emitted onceKiran
09/02/2025, 10:43 AMStefan
09/02/2025, 12:46 PMAhmad Bilal Khalid
09/02/2025, 2:03 PMRayTaskRunner(
init_kwargs={
"num_cpus": 5, # No. of Performance cores in M3 Pro 11 Cores
"runtime_env": {
"working_dir": ".",
"excludes": ["*.pyc", "*.pyo", ".ruff_cache", ".venv", ".git", "__pycache__"],
"worker_process_setup_hook": "backend.migrations.flows.utils.prepare_django_for_prefect",
},
"object_store_memory": 75 * 1024 * 1024, # 75 MB of object store memory, we don't use it
}
)
B returns ids: set[int]
. A
keep calling B
in a while loop until it returns an empty set. The problem is I am getting the following error 3rd time B
is being called. Previously, it was occurring on 2nd time
AssertionError: The env var, __RAY_WORKER_PROCESS_SETUP_HOOK_ENV_VAR, is not permitted because it is reserved for the internal use.What could be the issue here?
Denys Y
09/02/2025, 3:27 PMDenys Y
09/02/2025, 3:27 PMprefect.exceptions.PrefectHTTPStatusError: Server error '500 Internal Server Error' for url '<http://prefect-server.prefect.svc.cluster.local:4200/api/work_pools/>'
Response: {'exception_message': 'Internal Server Error'}
For more information check: <https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/500>
An exception occurred.
Joe
09/03/2025, 9:23 AMKiran
09/03/2025, 12:11 PMfrom prefect import flow, get_run_context
from datetime import date, timedelta, datetime
def resolve_date(date_param: str, run_time: datetime) -> str:
if date_param == "today":
return str(run_time.date())
elif date_param == "yesterday":
return str((run_time - timedelta(days=1)).date())
else:
# assume it's already YYYY-MM-DD
return date_param
@flow
def actual_flow(run_date: str):
ctx = get_run_context()
# resolve only once, based on expected_start_time (not retry time!)
resolved_date = resolve_date(run_date, ctx.flow_run.expected_start_time)
print(f"Running for resolved date: {resolved_date}")
Mitch
09/03/2025, 2:24 PMSid Bendre
09/03/2025, 4:23 PMTask run failed with exception: ConcurrencySlotAcquisitionError("Unable to acquire concurrency limits ['im-c0Ij4IOaaRYYCwnuAXqUmf', 'audio']") - Retries are exhausted
....
raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Server error '500 Internal Server Error' for url '<http://48.211.249.12/api/concurrency_limits/increment>'
Response: {'exception_message': 'Internal Server Error'}
For more information check: <https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/500>
we got a lot of spikey concurrent traffic, we never set any concurrency limits, what’s causing this?
We are also seeing issues when decrementing
Task run failed with exception: "PrefectHTTPStatusError(\"Server error '500 Internal Server Error' for url '<http://48.211.249.12/api/concurrency_limits/decrement>'\\nResponse: {'exception_message': 'Internal Server Error'}\\nFor more information check: <https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/500>\")" - Retries are exhausted
aadi i
09/03/2025, 7:17 PMflow.from_source()
method, which downloads the flow code from an S3 bucket, builds the image dynamically, and then runs the flow.
However, I’d like to avoid building the Docker image at runtime. Is there a way to use a prebuilt Docker image (pulled from a registry) and still pass parameters dynamically — preferably through a Pythonic method or REST API — without relying on deployment templates or environment variables?
I understand that job_variables
can be set dynamically and overridden during a flow run, but I’m looking for an alternative that allows passing parameters (like flow_data
) more flexibly — ideally at runtime — in a way that automatically maps values from flow_data
into job_variables
and passes them as arguments to the flow entrypoint, while using a pre-built Docker image.
flow_from_source = await flow.from_source(
source=s3_bucket_block,
entrypoint="flows/bill_flow.py:bill_assessment_flow"
)
flow_dependencies = get_flow_dependencies()
deployment = await flow_from_source.deploy(
name=PREFECT_DEPLOYMENT_NAME,
tags=["billing"],
work_pool_name="kubernetes-pool",
schedule=None,
push=False, # Skip pushing image
job_variables={
"finished_job_ttl": 100,
# "image": "mat/prefect-k8s-worker:15", # Uncomment to use a custom prebuilt image
"namespace": "prefect",
"env": {
"PREFECT_API_URL": "<http://prefect-server:4200/api>",
"EXTRA_PIP_PACKAGES": flow_dependencies,
"PYTHONPATH": "/opt/prefect/"
}
}
)
app.state.deployment_id = deployment
flow_run = await client.create_flow_run_from_deployment(
deployment_id=request.app.state.deployment_id,
tags=run_tags,
parameters={
"flow_data": {
"source_provider": source_provider,
"target_provider": target_provider,
"company_id": company_id,
"company_name": company_name,
"assessment_task_id": assessment_task_id
}
}
)
<http://logger.info|logger.info>(f"Created flow run with ID: {flow_run.id}")
Trey Gilliland
09/04/2025, 12:11 AMls /
, on successful runs the code is cloned to the right place under /code-main
and on failed runs that directory is not there.
There is no logs from the git clone step to suggest that the clone fails. The gh token is valid and it does work most of the time.
Any ideas? This is on Prefect Cloud using a Modal Work PoolKiran
09/04/2025, 7:26 AMShareef Jalloq
09/04/2025, 8:17 PMapps
user that is used to run all apps on this server.
web-server-01:/var/log/apps# cat /opt/apps/fpga-automation/start_prefect.sh
#!/bin/bash
source /opt/apps/fpga-automation/venv/bin/activate
export PREFECT_API_DATABASE_CONNECTION_URL="<postgresql+asyncpg://prefect_user:prefectdb@10.10.8.114:5432/prefect_automation>"
export PREFECT_HOME="/opt/apps/prefect"
export HOME="/home/apps"
export PREFECT_LOGGING_LEVEL=DEBUG
exec prefect server start --host 127.0.0.1 --port 4200
And the error looks like a DNS failure?
___ ___ ___ ___ ___ ___ _____
| _ \ _ \ __| __| __/ __|_ _|
| _/ / _|| _|| _| (__ | |
|_| |_|_\___|_| |___\___| |_|
Configure Prefect to communicate with the server with:
prefect config set PREFECT_API_URL=<http://127.0.0.1:4200/api>
View the API reference documentation at <http://127.0.0.1:4200/docs>
Check out the dashboard at <http://127.0.0.1:4200>
20:13:29.728 | ERROR | prefect.server.utilities.postgres_listener - Failed to establish raw asyncpg connection for LISTEN/NOTIFY: [Errno -2] Name does not resolve
Traceback (most recent call last):
File "/opt/apps/fpga-automation/venv/lib/python3.12/site-packages/prefect/server/utilities/postgres_listener.py", line 71, in get_pg_notify_connection
conn = await asyncpg.connect(**connect_args)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/apps/fpga-automation/venv/lib/python3.12/site-packages/asyncpg/connection.py", line 2421, in connect
return await connect_utils._connect(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/apps/fpga-automation/venv/lib/python3.12/site-packages/asyncpg/connect_utils.py", line 1075, in _connect
raise last_error or exceptions.TargetServerAttributeNotMatched(
File "/opt/apps/fpga-automation/venv/lib/python3.12/site-packages/asyncpg/connect_utils.py", line 1049, in _connect
conn = await _connect_addr(
^^^^^^^^^^^^^^^^^^^^
File "/opt/apps/fpga-automation/venv/lib/python3.12/site-packages/asyncpg/connect_utils.py", line 886, in _connect_addr
return await __connect_addr(params, True, *args)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/apps/fpga-automation/venv/lib/python3.12/site-packages/asyncpg/connect_utils.py", line 931, in __connect_addr
tr, pr = await connector
^^^^^^^^^^^^^^^
File "/opt/apps/fpga-automation/venv/lib/python3.12/site-packages/asyncpg/connect_utils.py", line 802, in _create_ssl_connection
tr, pr = await loop.create_connection(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "uvloop/loop.pyx", line 1982, in create_connection
socket.gaierror: [Errno -2] Name does not resolve
If I run the script as root or the apps user it works fine. What am I missing?Nate
09/04/2025, 9:37 PMOwen Boyd
09/04/2025, 10:25 PMTask run failed with exception: TaskRunTimeoutError('Scope timed out after 60.0 second(s).') - Retry 1/3 will start 10 second(s) from now 02:29:37 PM
Finished in state Completed() 02:29:22 PM