aadi i
09/03/2025, 7:17 PMflow.from_source()
method, which downloads the flow code from an S3 bucket, builds the image dynamically, and then runs the flow.
However, I’d like to avoid building the Docker image at runtime. Is there a way to use a prebuilt Docker image (pulled from a registry) and still pass parameters dynamically — preferably through a Pythonic method or REST API — without relying on deployment templates or environment variables?
I understand that job_variables
can be set dynamically and overridden during a flow run, but I’m looking for an alternative that allows passing parameters (like flow_data
) more flexibly — ideally at runtime — in a way that automatically maps values from flow_data
into job_variables
and passes them as arguments to the flow entrypoint, while using a pre-built Docker image.
flow_from_source = await flow.from_source(
source=s3_bucket_block,
entrypoint="flows/bill_flow.py:bill_assessment_flow"
)
flow_dependencies = get_flow_dependencies()
deployment = await flow_from_source.deploy(
name=PREFECT_DEPLOYMENT_NAME,
tags=["billing"],
work_pool_name="kubernetes-pool",
schedule=None,
push=False, # Skip pushing image
job_variables={
"finished_job_ttl": 100,
# "image": "mat/prefect-k8s-worker:15", # Uncomment to use a custom prebuilt image
"namespace": "prefect",
"env": {
"PREFECT_API_URL": "<http://prefect-server:4200/api>",
"EXTRA_PIP_PACKAGES": flow_dependencies,
"PYTHONPATH": "/opt/prefect/"
}
}
)
app.state.deployment_id = deployment
flow_run = await client.create_flow_run_from_deployment(
deployment_id=request.app.state.deployment_id,
tags=run_tags,
parameters={
"flow_data": {
"source_provider": source_provider,
"target_provider": target_provider,
"company_id": company_id,
"company_name": company_name,
"assessment_task_id": assessment_task_id
}
}
)
<http://logger.info|logger.info>(f"Created flow run with ID: {flow_run.id}")
Trey Gilliland
09/04/2025, 12:11 AMls /
, on successful runs the code is cloned to the right place under /code-main
and on failed runs that directory is not there.
There is no logs from the git clone step to suggest that the clone fails. The gh token is valid and it does work most of the time.
Any ideas? This is on Prefect Cloud using a Modal Work PoolKiran
09/04/2025, 7:26 AMShareef Jalloq
09/04/2025, 8:17 PMapps
user that is used to run all apps on this server.
web-server-01:/var/log/apps# cat /opt/apps/fpga-automation/start_prefect.sh
#!/bin/bash
source /opt/apps/fpga-automation/venv/bin/activate
export PREFECT_API_DATABASE_CONNECTION_URL="<postgresql+asyncpg://prefect_user:prefectdb@10.10.8.114:5432/prefect_automation>"
export PREFECT_HOME="/opt/apps/prefect"
export HOME="/home/apps"
export PREFECT_LOGGING_LEVEL=DEBUG
exec prefect server start --host 127.0.0.1 --port 4200
And the error looks like a DNS failure?
___ ___ ___ ___ ___ ___ _____
| _ \ _ \ __| __| __/ __|_ _|
| _/ / _|| _|| _| (__ | |
|_| |_|_\___|_| |___\___| |_|
Configure Prefect to communicate with the server with:
prefect config set PREFECT_API_URL=<http://127.0.0.1:4200/api>
View the API reference documentation at <http://127.0.0.1:4200/docs>
Check out the dashboard at <http://127.0.0.1:4200>
20:13:29.728 | ERROR | prefect.server.utilities.postgres_listener - Failed to establish raw asyncpg connection for LISTEN/NOTIFY: [Errno -2] Name does not resolve
Traceback (most recent call last):
File "/opt/apps/fpga-automation/venv/lib/python3.12/site-packages/prefect/server/utilities/postgres_listener.py", line 71, in get_pg_notify_connection
conn = await asyncpg.connect(**connect_args)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/apps/fpga-automation/venv/lib/python3.12/site-packages/asyncpg/connection.py", line 2421, in connect
return await connect_utils._connect(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/apps/fpga-automation/venv/lib/python3.12/site-packages/asyncpg/connect_utils.py", line 1075, in _connect
raise last_error or exceptions.TargetServerAttributeNotMatched(
File "/opt/apps/fpga-automation/venv/lib/python3.12/site-packages/asyncpg/connect_utils.py", line 1049, in _connect
conn = await _connect_addr(
^^^^^^^^^^^^^^^^^^^^
File "/opt/apps/fpga-automation/venv/lib/python3.12/site-packages/asyncpg/connect_utils.py", line 886, in _connect_addr
return await __connect_addr(params, True, *args)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/apps/fpga-automation/venv/lib/python3.12/site-packages/asyncpg/connect_utils.py", line 931, in __connect_addr
tr, pr = await connector
^^^^^^^^^^^^^^^
File "/opt/apps/fpga-automation/venv/lib/python3.12/site-packages/asyncpg/connect_utils.py", line 802, in _create_ssl_connection
tr, pr = await loop.create_connection(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "uvloop/loop.pyx", line 1982, in create_connection
socket.gaierror: [Errno -2] Name does not resolve
If I run the script as root or the apps user it works fine. What am I missing?Nate
09/04/2025, 9:37 PMOwen Boyd
09/04/2025, 10:25 PMTask run failed with exception: TaskRunTimeoutError('Scope timed out after 60.0 second(s).') - Retry 1/3 will start 10 second(s) from now 02:29:37 PM
Finished in state Completed() 02:29:22 PM
Mark Callison
09/05/2025, 3:00 PMNick Torba
09/07/2025, 12:34 AMKiran
09/08/2025, 12:19 PMMichael Savarese
09/08/2025, 5:03 PMJoe D
09/08/2025, 7:56 PMKiran
09/09/2025, 10:03 AMMiguel Moncada
09/09/2025, 11:43 AMMiguel Moncada
09/09/2025, 2:34 PMShareef Jalloq
09/09/2025, 3:23 PMGitRepository
source for my flow deployments but now realise my mistake as I need to both git clone
and pip install
my package. What's the correct way to clone and install my deployments? The docs don't provide examples for this sort of flow.David Michael Carter
09/09/2025, 3:56 PMFinished in state Completed(message=None, type=COMPLETED, result=ResultRecord(metadata=ResultRecordMetadata(...
Every time one of my tasks finishes, its outputs are involuntarily printed to the cloud logs. This is unacceptable as some task outputs contain secrets such as auth tokens, and now we have to deal with leaked secrets.Court
09/09/2025, 4:48 PMJoe
09/10/2025, 4:23 AMKiran
09/10/2025, 6:19 AMKiran
09/10/2025, 7:29 AMxiaotian lu
09/10/2025, 7:39 AMSyméon del Marmol
09/10/2025, 11:42 AMDennis Hinnenkamp
09/11/2025, 11:04 AMitielOlenick
09/11/2025, 3:27 PMBrandon Robertson
09/11/2025, 6:45 PMOleksii Stupak
09/12/2025, 7:11 AMRicardo Gaspar
09/12/2025, 11:17 AMFailed
or Success
https://docs.prefect.io/v3/concepts/states#states
I need for my data pipelines to be able to signal warnings, but I don't have a proper way to signal that via the DAG final state; which would be ideal for observability.
Is there anything to support 3 final workflow states, like a semaphore:
• Failed
🔴
• Warnings
/ Success_with_warnings
🟡
• Success
🟢
Or any other monitoring solution within the UI
or observability side of Prefect
that allows that?
CC: @Nathan Nowack @Brendan O'Leary @Anna M GellerPyHannes
09/12/2025, 11:50 AMOwen Boyd
09/13/2025, 8:20 PMIshan Anilbhai Koradiya
09/14/2025, 2:09 PM