aadi i
09/03/2025, 7:17 PMflow.from_source()
method, which downloads the flow code from an S3 bucket, builds the image dynamically, and then runs the flow.
However, I’d like to avoid building the Docker image at runtime. Is there a way to use a prebuilt Docker image (pulled from a registry) and still pass parameters dynamically — preferably through a Pythonic method or REST API — without relying on deployment templates or environment variables?
I understand that job_variables
can be set dynamically and overridden during a flow run, but I’m looking for an alternative that allows passing parameters (like flow_data
) more flexibly — ideally at runtime — in a way that automatically maps values from flow_data
into job_variables
and passes them as arguments to the flow entrypoint, while using a pre-built Docker image.
flow_from_source = await flow.from_source(
source=s3_bucket_block,
entrypoint="flows/bill_flow.py:bill_assessment_flow"
)
flow_dependencies = get_flow_dependencies()
deployment = await flow_from_source.deploy(
name=PREFECT_DEPLOYMENT_NAME,
tags=["billing"],
work_pool_name="kubernetes-pool",
schedule=None,
push=False, # Skip pushing image
job_variables={
"finished_job_ttl": 100,
# "image": "mat/prefect-k8s-worker:15", # Uncomment to use a custom prebuilt image
"namespace": "prefect",
"env": {
"PREFECT_API_URL": "<http://prefect-server:4200/api>",
"EXTRA_PIP_PACKAGES": flow_dependencies,
"PYTHONPATH": "/opt/prefect/"
}
}
)
app.state.deployment_id = deployment
flow_run = await client.create_flow_run_from_deployment(
deployment_id=request.app.state.deployment_id,
tags=run_tags,
parameters={
"flow_data": {
"source_provider": source_provider,
"target_provider": target_provider,
"company_id": company_id,
"company_name": company_name,
"assessment_task_id": assessment_task_id
}
}
)
<http://logger.info|logger.info>(f"Created flow run with ID: {flow_run.id}")
Nate
09/03/2025, 7:26 PMI understand thathave you checked outcan be set dynamically and overridden during a flow run, but I’m looking for an alternative that allows passing parameters (likejob_variables
) more flexibly — ideally at runtimeflow_data
run_deployment
?
@Marvin show a run_deployment
example and link docsMarvin
09/03/2025, 7:26 PMMarvin
09/03/2025, 7:28 PMrun_deployment
is a great way to trigger a deployment at runtime and pass parameters (like flow_data
) without touching job_variables
.
Here are quick examples:
Python (inside a flow or any Python process)
from prefect import flow
from prefect.deployments.flow_runs import run_deployment
@flow
def orchestrator(user_id: str):
flow_run = run_deployment(
name="etl/production", # <FLOW_NAME>/<DEPLOYMENT_NAME>
parameters={
"flow_data": {"user_id": user_id, "batch": 42}
},
flow_run_name=f"etl-{user_id}",
# Optional: override job variables for this run (interpreted as JSON)
job_variables={
# Example ECS-style overrides; adjust to your worker type
"cpu": 1024, # 1 vCPU
"memory": 2048, # 2 GB
"env": {"EXTRA_FLAG": "1"}
},
# Set to False if you don't want this to appear as a subflow in the UI
as_subflow=True,
tags={"triggered-by-orchestrator"},
)
return flow_run.id
CLI (one-off, fully runtime-driven)
prefect deployment run "etl/production" \
-p flow_data='{"user_id":"abc","batch":42}' \
--flow-run-name "etl-abc" \
-jv cpu=1024 \
-jv memory=2048 \
--watch
Docs
- How to run deployments (passing parameters, scheduling, etc.): Run deployments (how-to)
- CLI reference for `prefect deployment run`: CLI: prefect deployment run
- Python API reference for `run_deployment`: Python API: run_deploymentaadi i
09/03/2025, 7:42 PMentrypoint
. But how can we define the entrypoint
for a deployment that uses only a prebuilt image and not the source code?Nate
09/03/2025, 7:45 PMBut how can we define thewhy not pass anfor a deployment that uses only a prebuilt imageentrypoint
image
job variable to run_deployment
?
or is there something about your use-case that necessitates you creating deployments in python like this before you trigger a run?Nate
09/03/2025, 7:46 PMimage
job variable selected at runtimeaadi i
09/04/2025, 4:48 PMfrom prefect import flow
from prefect.client.orchestration import get_client
from prefect.deployments.flow_runs import run_deployment
import asyncio
async def deploy_flow():
async with get_client() as client:
# Create deployment with storage=None
deployment = await client.create_deployment(
name="billing-deployment",
flow_name="bill_assessment_flow",
work_pool_name="kubernetes-pool",
storage=None, # No external storage, code is in the image
entrypoint="/app/my_script.py:bill_assessment_flow", # Path in image
job_variables={
"image": "myregistry/prefect-custom:tag", # Prebuilt image
"namespace": "prefect",
"env": {
"PREFECT_API_URL": "<http://prefect-server:4200/api>",
"PYTHONPATH": "/app"
},
"finished_job_ttl": 100
},
tags=["billing"],
schedule=None
)
return deployment.id
async def trigger_flow_run(deployment_id: str, flow_data: dict):
# Trigger a flow run with dynamic parameters
flow_run = await run_deployment(
name="bill_assessment_flow/billing-deployment",
parameters={"flow_data": flow_data},
flow_run_name=f"billing-run-{flow_data.get('company_id')}",
tags=["triggered-run"]
)
return flow_run.id
async def main():
# Deploy the flow (run once or manage via CI/CD)
deployment_id = await deploy_flow()
print(f"Created deployment with ID: {deployment_id}")
# Example flow_data to pass dynamically
flow_data = {
"source_provider": "aws",
"target_provider": "gcp",
"company_id": "12345",
"company_name": "Example Corp",
"assessment_task_id": "task-789"
}
# Trigger a flow run
flow_run_id = await trigger_flow_run(deployment_id, flow_data)
print(f"Created flow run with ID: {flow_run_id}")
if __name__ == "__main__":
asyncio.run(main())
To clarify my earlier question about the entry point: I was under the impression that using a prebuilt image meant the flow code had to be baked into the image, which would require a different kind of entry point (since no source pull happens). But after looking more closely at the docs and examples (Prefect v3 Deploy via Python
), it seems that’s not entirely the case.
From what I understand now, the prebuilt image mainly provides the runtime environment (with Prefect and required pip packages), but the actual flow definition can still be pulled in a different way. Please correct me if I’m still missing something.Mahboobe Haddadi
09/05/2025, 12:12 PMNate
09/05/2025, 1:08 PMimage
job variable selected at runtime
that is, when you POST /create_flow_run_from_deployment
, you can choose a preexisting image at that time to use for the flow run, which in your case, will contain your flow code. you can also choose that image
at deployment time, if you never need to change it
my suggestion to build an image
with your code baked in, and then provide it as a job_variable to the work pool is what you want to do.
this is a very common pattern in prefect
• docs
• example
the steps are
• write a Dockerfile
• run docker build
and push etc or instead write build
and push
steps into yaml like my example
• define your deployment in python via .deploy or in yaml via prefect.yaml
◦ set the image
job variable on your deployment to the resulting image arn
◦ set your entrypoint relative to the WORKDIR
in your image
◦ run prefect deploy
• trigger a run of your deployment, optionally overriding your image
selection (not building an image) at runtimeNate
09/05/2025, 1:24 PMplease aggregate docs/examples on using .deploy or prefect.yaml when baking code into images
Marvin
09/05/2025, 1:24 PMMarvin
09/05/2025, 1:24 PM.deploy()
or prefect.yaml
.
Docs
- Deploy via Python (.deploy)
- prefect.yaml guide
- Customize job variables
- AWS ECS guide
Minimal examples (baked code in image)
1) Python: Flow.deploy with prebuilt image (ECR + ECS)
from prefect import flow
@flow(result_storage="<s3://my-bucket/prefect-results/prefix>", persist_result=True)
def my_flow():
...
ecr_image = "<http://123456789012.dkr.ecr.us-east-1.amazonaws.com/myrepo:2025-09-05|123456789012.dkr.ecr.us-east-1.amazonaws.com/myrepo:2025-09-05>"
my_flow.deploy(
name="my-flow-ecs",
work_pool_name="my-ecs-pool",
image=ecr_image, # baked code image
build=False, # image already built
push=False, # image already pushed
parameters={"env": "prod"},
tags=["prod"],
job_variables={
"cluster": "arn:aws:ecs:us-east-1:123456789012:cluster/my-cluster",
"execution_role_arn": "arn:aws:iam::123456789012:role/ecsTaskExecutionRole",
"task_role_arn": "arn:aws:iam::123456789012:role/MyPrefectTaskRole",
"subnets": ["subnet-abc", "subnet-def"],
"security_groups": ["sg-0123456789abcdef0"],
"assign_public_ip": False,
"cpu": 1024,
"memory": 2048,
"env": {"PREFECT_LOGGING_LEVEL": "INFO"},
},
)
2) prefect.yaml: prebuilt image (ECR + ECS)
name: my-project
deployments:
- name: etl-prod-ecs
entrypoint: my_package.flows:etl_flow # importable in the image
image: <http://123456789012.dkr.ecr.us-east-1.amazonaws.com/myrepo:2025-09-05|123456789012.dkr.ecr.us-east-1.amazonaws.com/myrepo:2025-09-05>
build: false
push: false
tags: ["prod"]
parameters:
env: "prod"
work_pool:
name: my-ecs-pool
work_queue_name: default
job_variables:
cluster: arn:aws:ecs:us-east-1:123456789012:cluster/my-cluster
execution_role_arn: arn:aws:iam::123456789012:role/ecsTaskExecutionRole
task_role_arn: arn:aws:iam::123456789012:role/MyPrefectTaskRole
subnets: ["subnet-abc", "subnet-def"]
security_groups: ["sg-0123456789abcdef0"]
assign_public_ip: false
cpu: 1024
memory: 2048
env:
PREFECT_LOGGING_LEVEL: INFO
Notes (brief)
- Entrypoint must be importable inside the image (install your project in the image and use module:function
).
- For ECS+ECR: the ECS task execution role needs ECR pull permissions (e.g., AmazonECSTaskExecutionRolePolicy). Use the task role for your app’s AWS access.
- Persist results to remote storage (e.g., S3) for resilience in container environments.
If you want, share your ECS cluster/role names and I’ll tailor these snippets exactly.aadi i
09/07/2025, 11:40 PM@flow(result_storage="<s3://my-bucket/prefect-results/prefix>", persist_result=True)
def my_flow():
...
ecr_image = "123456789012.dkr.ecr.us-east-1.amazonaws.com/myrepo:2025-09-05"
my_flow.deploy(
name="my-flow-ecs",
work_pool_name="my-ecs-pool",
image=ecr_image, # baked code image
build=False, # image already built
push=False, # image already pushed
parameters={"env": "prod"},
tags=["prod"],
job_variables={
"cluster": "arn:aws:ecs:us-east-1:123456789012:cluster/my-cluster",
"execution_role_arn": "arn:aws:iam::123456789012:role/ecsTaskExecutionRole",
"task_role_arn": "arn:aws:iam::123456789012:role/MyPrefectTaskRole",
"subnets": ["subnet-abc", "subnet-def"],
"security_groups": ["sg-0123456789abcdef0"],
"assign_public_ip": False,
"cpu": 1024,
"memory": 2048,
"env": {"PREFECT_LOGGING_LEVEL": "INFO"},
},
)
Question 1
As I mentioned, I have flow code baked into the image with a flow name, e.g., inside_docker_flow
. If I want to pass parameters to the flow dynamically during “`run_deployment`”, will they be passed to my_flow
or inside_docker_flow
?
Question 2
I have custom REST services in front of the Prefect server that accept requests from other services. it act as a gateway that serves endpoints like deploy and run, and in turn routes to the Prefect server. But I don’t want to define any flows locally when making REST calls to the Prefect server.Nate
09/08/2025, 2:29 AMpath/to/file.py:fn
) defined by your depoyment prefect deployment inspect ...
> Question 2
whats the question? if you want to trigger existing flows remotely via prefect rest API, that is what deployments are for. you can be pretty flexible and have your (deployment entrypoint) flow conditionally behave differently (e.g. call different subflows/tasks conditionally) based on the inputs you end up with in your web service layeraadi i
09/11/2025, 8:47 PM#!/usr/bin/env python3
import sys
from datetime import datetime
from prefect import flow, task, get_run_logger
@flow(
name="simple-test-flow",
description="A simple flow to test argument passing in Docker"
)
def inside_docker_test_flow(
text: str = "Hello Docker",
number: int = 42,
count: int = 3
) -> str:
logger = get_run_logger()
<http://logger.info|logger.info>("=== Starting Simple Test Flow ===")
<http://logger.info|logger.info>(f"Received arguments:")
<http://logger.info|logger.info>(f" - text: '{text}'")
<http://logger.info|logger.info>(f" - number: {number}")
<http://logger.info|logger.info>(f" - count: {count}")
return "Hello Docker"
Dockerfile:
# Use Python 3.11 as base image
FROM prefecthq/prefect:3.4.17-python3.11
# Set working directory
WORKDIR /opt/prefect
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
&& rm -rf /var/lib/apt/lists/*
RUN pip install prefect-docker
# Copy the inside_docker_flow.py file
COPY inside_docker_flow.py .
# Create a non-root user
RUN useradd --create-home --shell /bin/bash prefect && \
chown -R prefect:prefect /opt/prefect
# Switch to non-root user
USER prefect
Deployment script (deploy.py): which deploys and runs. (Wanted to use Weblayer API that offers deploy and run API option)
#!/usr/bin/env python3
"""
Prefect Flow Deployment Script
This script deploys a flow to a work pool and provides methods to run it with parameters.
"""
import asyncio
from datetime import datetime
from prefect import flow, get_run_logger
from prefect.deployments import run_deployment
@flow(
name="docker-test-flow",
description="A flow to test argument passing in Docker containers",
log_prints=True
)
def local_test_flow(
text: str = "Hello Docker",
number: int = 42,
count: int = 3
) -> str:
logger = get_run_logger()
<http://logger.info|logger.info>("=== Starting Docker Test Flow ===")
<http://logger.info|logger.info>(f"Received arguments:")
<http://logger.info|logger.info>(f" - text: '{text}'")
<http://logger.info|logger.info>(f" - number: {number}")
<http://logger.info|logger.info>(f" - count: {count}")
# Process the arguments
result = f"Processed: {text} (number: {number}, count: {count})"
<http://logger.info|logger.info>(f"Flow result: {result}")
return result
def deploy_flow():
"""Deploy the flow to a work pool."""
print("Deploying flow to work pool...")
local_test_flow.deploy(
name="docker-test-deployment",
work_pool_name="my-docker-pool-1",
build=False, # Build the image locally
image="matilda1/prefect-flow-test:latest", # Use local image name
push=False, # Set to True to push to registry
tags=["docker", "testing"],
description="Docker-based flow for testing argument passing",
parameters={
"text": "Default Text",
"number": 100,
"count": 5
},
job_variables={
"networks": ["iceberg-spark-minio_iceberg_net"],
"network_mode": "bridge",
"image_pull_policy": "IfNotPresent",
"auto_remove": True,
"volumes": ["/var/run/docker.sock:/var/run/docker.sock"],
"stream_output": True,
"stream_logs": True,
"stream_logs_interval": 1,
"stream_logs_timeout": 10,
"stream_logs_max_retries": 3,
"stream_logs_retry_delay": 1,
"stream_logs_retry_delay_max": 10,
"privileged": True,
"restart_policy": {
"name": "always",
"maximum_retry_count": 3
},
"env": {
"PREFECT_API_URL": "<http://172.24.6.140:4200/api>",
"PYTHONPATH": "/opt/prefect"
}
}
)
print("Flow deployed successfully!")
async def run_flow_with_parameters(
text: str = "Hello from run_deployment",
number: int = 200,
count: int = 7
):
print(f"Running flow with parameters: text='{text}', number={number}, count={count}")
try:
# Run the deployment with custom parameters
flow_run = await run_deployment(
name="docker-test-flow/docker-test-deployment",
parameters={
"text": text,
"number": number,
"count": count
}
)
print(f"Flow run started with ID: {flow_run.id}")
print(f"Flow run state: {flow_run.state}")
# Wait for completion and get result
await flow_run.wait_for_completion()
print(f"Flow run final state: {flow_run.state}")
if flow_run.state.is_completed():
result = flow_run.state.result()
print(f"Flow completed with result: {result}")
return result
else:
print(f"Flow failed with state: {flow_run.state}")
return None
except Exception as e:
print(f"Error running flow: {e}")
raise
def run_flow_sync(
text: str = "Hello from sync run",
number: int = 300,
count: int = 9
):
return asyncio.run(run_flow_with_parameters(text, number, count))
if __name__ == "__main__":
# deploy_flow()
run_flow_sync()
Problem:
When I run the deployment, parameters are passed to local_test_flow
(from deploy.py
), because that’s what the deployment entrypoint is set to.
But what I actually want is for the parameters to be passed to inside_docker_test_flow
, which already exists inside the pre-built Docker image.
Here’s what prefect deployment inspect shows:
(venv) matilda-svc@mc-dev-AdiN:~/projects/prefect_test$ prefect deployment inspect docker-test-flow/docker-test-deployment
{
………………….
'entrypoint': 'deploy.py:local_test_flow',
………………………..
}
My question:
How can I set the entrypoint to point to the flow that’s already inside the Docker image (inside_docker_flow.py:inside_docker_test_flow
) instead of a locally defined flow like deploy.py:local_test_flow
? so that I can pass the parameters dynamically.Nate
09/11/2025, 8:50 PMNate
09/11/2025, 8:51 PMNate
09/11/2025, 8:54 PMlocal_test_flow
as its entrypoint and you instead want a deployment to refer to a different flow in a docker container
you will need to create a different deployment whose entrypoint points to that other flow in the docker container
---
this might not be relevant, it only is if you want to have 2 separate flows accessible in your runtime image that you can decide between at runtime based on the parameters you pass
you can generalize your deployment entrypoint such that it dispatches parameters to the correct subflow conditionally
@flow
def meta_flow(subflow_params: T, subflow_choice: SomeEnum):
if subflow_choice == "foo":
# call foo subflow with subflow_params
...
and then meta_flow
stays a static entrypoint that can dispatch any subflowNate
09/11/2025, 8:54 PMNate
09/11/2025, 8:56 PMaadi i
09/11/2025, 9:37 PMcreate_deploy
and another for the flows inside the Docker image. I wanted to demonstrate the same
which means that creating the deployments dynamically is probably your best option, which if i remember correctly, is what you were doing beforeYes, I’ve done that. which is very first I mentioned it. I can remotely download the flow code, build an image dynamically, and set the entrypoint during that build. That worked for me. Problem: I don’t want build an image during deployment. But here’s the gap I’m seeing above approach: when Prefect downloads flow code, it places it under places like``/opt/prefect/...``. Why not support a simpler option where I can provide a prebuilt image that already has the flow code in place (e.g., baked into
/opt/prefect
), and then just specify the entrypoint at deploy time — without needing to fetch from GitHub/S3?
Something like this would be helpful
from prefect import flow,
flow.deploy(
name="docker-test-deployment",
work_pool_name="my-docker-pool-1",
build=False, # Build the image locally
image="matilda1/prefect-flow-test:latest", # Use local image name that has flow code alredy placed locations like /opt/prefect
'entrypoint': 'inside_docker_flow.py:inside_docker_test_flow', # give support to entrypoint
push=False, # Set to True to push to registry
........................
)
Alternatively, we can define an entry point in the Dockerfile using ENTRYPOINT ["python", "inside_docker_flow.py"],
and Prefect should be able to run it using underhood “`docker run prefect-flow-test arg1 arg2 arg3`"
That way the image fully encapsulates the flow code inside docker, and deployment just tells Prefect which baked flow to use inside docker.Nate
09/11/2025, 10:09 PMgit_clone
step and you can easily setup CI to handle deployment-specific image builds in a monorepo
we're unlikely to suggest / make it easier for users swap the entrypoint of a deployment to refer to different flows because that's not how we intended deployments to work and most people's use of prefect would not be improved by allowing this
what you're trying to do can be accomplished by creating N separate deployments, with their own entrypoints, which does not require more image builds
if you feel like swapping entrypoints on a deployment should be supported outright, it'd be super helpful for you to create an issue, which would allow others who feel the same as you to chime inaadi i
09/17/2025, 9:29 PMwell its not the most common pattern to package a lot of flows into an image that you’re not going to use (you’re only using one of them). most of the time people use docker because they can build an environment that contains only what they need. packaging all flows into your image means you need to rebuild the image each time any of them changes
we’re unlikely to suggest/make it easier for users swap the entrypoint of a deployment to refer to different flows because that’s not how we intended deployments to work and most people’s use of prefect would not be improved by allowing this@Nate I am also okay with having one flow per image, but I really don’t want to keep all my flows in a single image, nor do I want to be restricted in the number of deployments. What I’d like is for each flow to be placed inside a directory such as /opt/prefect in the Docker container, with a configurable entrypoint for the Python file inside the container. I understand that my requirement is a bit different from the usual case, where deployment typically happens during the CI/CD pipeline. In my case, the image is built during the CI/CD pipeline, but the actual deployment needs to be handled by a REST service running in production, which connects to the Prefect server using the prebuilt images from CI/CD. My design is somewhat influenced by how AWS Lambda works with prebuilt images. I may be expecting something similar to what Prefect could support, like aws-lambda-python-runtime-interface-client. A few examples for reference: • https://dev.to/aws-builders/how-to-package-and-deploy-a-lambda-function-as-a-container-image-3d1a • https://docs.aws.amazon.com/lambda/latest/dg/chapter-layers.html • https://medium.com/@rolanditaru/zip-or-containers-deploying-code-to-lambda-in-2025-245919b3cdcb However, for now, I’ll go with what Prefect currently supports. Thanks for all your answers and time, and for the opportunity to validate my ideas. Thanks again.
Nate
09/17/2025, 9:34 PM