How can I dynamically pass parameters to a Prefect...
# ask-community
a
How can I dynamically pass parameters to a Prefect flow running as a Kubernetes Job (Pod) or Docker container? I’m currently using the
flow.from_source()
method, which downloads the flow code from an S3 bucket, builds the image dynamically, and then runs the flow. However, I’d like to avoid building the Docker image at runtime. Is there a way to use a prebuilt Docker image (pulled from a registry) and still pass parameters dynamically — preferably through a Pythonic method or REST API — without relying on deployment templates or environment variables? I understand that
job_variables
can be set dynamically and overridden during a flow run, but I’m looking for an alternative that allows passing parameters (like
flow_data
) more flexibly — ideally at runtime — in a way that automatically maps values from
flow_data
into
job_variables
and passes them as arguments to the flow entrypoint, while using a pre-built Docker image.
Copy code
flow_from_source = await flow.from_source(
    source=s3_bucket_block,
    entrypoint="flows/bill_flow.py:bill_assessment_flow"
)

flow_dependencies = get_flow_dependencies()

deployment = await flow_from_source.deploy(
    name=PREFECT_DEPLOYMENT_NAME,
    tags=["billing"],
    work_pool_name="kubernetes-pool",
    schedule=None,
    push=False,  # Skip pushing image
    job_variables={
        "finished_job_ttl": 100,
        # "image": "mat/prefect-k8s-worker:15",  # Uncomment to use a custom prebuilt image
        "namespace": "prefect",
        "env": {
            "PREFECT_API_URL": "<http://prefect-server:4200/api>",
            "EXTRA_PIP_PACKAGES": flow_dependencies,
            "PYTHONPATH": "/opt/prefect/"
        }
    }
)

app.state.deployment_id = deployment

flow_run = await client.create_flow_run_from_deployment(
    deployment_id=request.app.state.deployment_id,
    tags=run_tags,
    parameters={
        "flow_data": {
            "source_provider": source_provider,
            "target_provider": target_provider,
            "company_id": company_id,
            "company_name": company_name,
            "assessment_task_id": assessment_task_id
        }
    }
)

<http://logger.info|logger.info>(f"Created flow run with ID: {flow_run.id}")
n
I understand that
job_variables
can be set dynamically and overridden during a flow run, but I’m looking for an alternative that allows passing parameters (like
flow_data
) more flexibly — ideally at runtime
have you checked out
run_deployment
? @Marvin show a
run_deployment
example and link docs
m
thought for 132.7 seconds
Yes —
run_deployment
is a great way to trigger a deployment at runtime and pass parameters (like
flow_data
) without touching
job_variables
. Here are quick examples: Python (inside a flow or any Python process)
Copy code
from prefect import flow
from prefect.deployments.flow_runs import run_deployment

@flow
def orchestrator(user_id: str):
    flow_run = run_deployment(
        name="etl/production",  # <FLOW_NAME>/<DEPLOYMENT_NAME>
        parameters={
            "flow_data": {"user_id": user_id, "batch": 42}
        },
        flow_run_name=f"etl-{user_id}",
        # Optional: override job variables for this run (interpreted as JSON)
        job_variables={
            # Example ECS-style overrides; adjust to your worker type
            "cpu": 1024,              # 1 vCPU
            "memory": 2048,           # 2 GB
            "env": {"EXTRA_FLAG": "1"}
        },
        # Set to False if you don't want this to appear as a subflow in the UI
        as_subflow=True,
        tags={"triggered-by-orchestrator"},
    )
    return flow_run.id
CLI (one-off, fully runtime-driven)
Copy code
prefect deployment run "etl/production" \
  -p flow_data='{"user_id":"abc","batch":42}' \
  --flow-run-name "etl-abc" \
  -jv cpu=1024 \
  -jv memory=2048 \
  --watch
Docs - How to run deployments (passing parameters, scheduling, etc.): Run deployments (how-to) - CLI reference for `prefect deployment run`: CLI: prefect deployment run - Python API reference for `run_deployment`: Python API: run_deployment
a
@Nate, I did check it. Yes, we can pass the parameters dynamically to the existing deployment that has
entrypoint
. But how can we define the
entrypoint
for a deployment that uses only a prebuilt image and not the source code?
n
i could be naive to your use case, but im not sure that this is the right question
But how can we define the
entrypoint
for a deployment that uses only a prebuilt image
why not pass an
image
job variable to
run_deployment
? or is there something about your use-case that necessitates you creating deployments in python like this before you trigger a run?
like it sounds like you just want a single deployment with a docker work pool and an
image
job variable selected at runtime
a
@Nate Apologies for the confusion—I double-checked everything again. I probably should’ve asked the question differently, something like: “How can I deploy a flow when the entry point code is baked into the image itself, rather than coming from a local Python file or remote code storage?” My main goal is to completely skip runtime image building and use a prebuilt image from our registry that already includes everything, including the flow code. At the same time, I still want to be able to pass parameters to the flow dynamically. So, I don’t want to: • Define the flow inline in the same Python file where the deployment is triggered, or • Use remote code storage (like GitHub, S3, etc.) Example pseudo-code:
Copy code
from prefect import flow
from prefect.client.orchestration import get_client
from prefect.deployments.flow_runs import run_deployment
import asyncio

async def deploy_flow():
    async with get_client() as client:
        # Create deployment with storage=None
        deployment = await client.create_deployment(
            name="billing-deployment",
            flow_name="bill_assessment_flow",
            work_pool_name="kubernetes-pool",
            storage=None,  # No external storage, code is in the image
            entrypoint="/app/my_script.py:bill_assessment_flow",  # Path in image
            job_variables={
                "image": "myregistry/prefect-custom:tag",  # Prebuilt image
                "namespace": "prefect",
                "env": {
                    "PREFECT_API_URL": "<http://prefect-server:4200/api>",
                    "PYTHONPATH": "/app"
                },
                "finished_job_ttl": 100
            },
            tags=["billing"],
            schedule=None
        )
        return deployment.id

async def trigger_flow_run(deployment_id: str, flow_data: dict):
    # Trigger a flow run with dynamic parameters
    flow_run = await run_deployment(
        name="bill_assessment_flow/billing-deployment",
        parameters={"flow_data": flow_data},
        flow_run_name=f"billing-run-{flow_data.get('company_id')}",
        tags=["triggered-run"]
    )
    return flow_run.id

async def main():
    # Deploy the flow (run once or manage via CI/CD)
    deployment_id = await deploy_flow()
    print(f"Created deployment with ID: {deployment_id}")

    # Example flow_data to pass dynamically
    flow_data = {
        "source_provider": "aws",
        "target_provider": "gcp",
        "company_id": "12345",
        "company_name": "Example Corp",
        "assessment_task_id": "task-789"
    }

    # Trigger a flow run
    flow_run_id = await trigger_flow_run(deployment_id, flow_data)
    print(f"Created flow run with ID: {flow_run_id}")

if __name__ == "__main__":
    asyncio.run(main())
To clarify my earlier question about the entry point: I was under the impression that using a prebuilt image meant the flow code had to be baked into the image, which would require a different kind of entry point (since no source pull happens). But after looking more closely at the docs and examples (Prefect v3 Deploy via Python ), it seems that’s not entirely the case. From what I understand now, the prebuilt image mainly provides the runtime environment (with Prefect and required pip packages), but the actual flow definition can still be pulled in a different way. Please correct me if I’m still missing something.
m
It is my question too
👍 1
n
> My main goal is to completely skip runtime image building yes, im not suggesting building an image at runtime, i said > an
image
job variable selected at runtime that is, when you
POST /create_flow_run_from_deployment
, you can choose a preexisting image at that time to use for the flow run, which in your case, will contain your flow code. you can also choose that
image
at deployment time, if you never need to change it my suggestion to build an
image
with your code baked in, and then provide it as a job_variable to the work pool is what you want to do. this is a very common pattern in prefect • docsexample the steps are • write a Dockerfile • run
docker build
and push etc or instead write
build
and
push
steps into yaml like my example • define your deployment in python via .deploy or in yaml via
prefect.yaml
◦ set the
image
job variable on your deployment to the resulting image arn ◦ set your entrypoint relative to the
WORKDIR
in your image ◦ run
prefect deploy
• trigger a run of your deployment, optionally overriding your
image
selection (not building an image) at runtime
@Marvin there's no need to show the pattern i didnt ask for just for contrast. please be concise and focus on my actual query
please aggregate docs/examples on using .deploy or prefect.yaml when baking code into images
m
thought for 24.7 seconds
Got it — here’s a concise aggregation focused on baking code into images and deploying via
.deploy()
or
prefect.yaml
. Docs - Deploy via Python (.deploy) - prefect.yaml guide - Customize job variables - AWS ECS guide Minimal examples (baked code in image) 1) Python: Flow.deploy with prebuilt image (ECR + ECS)
Copy code
from prefect import flow

@flow(result_storage="<s3://my-bucket/prefect-results/prefix>", persist_result=True)
def my_flow():
    ...

ecr_image = "<http://123456789012.dkr.ecr.us-east-1.amazonaws.com/myrepo:2025-09-05|123456789012.dkr.ecr.us-east-1.amazonaws.com/myrepo:2025-09-05>"

my_flow.deploy(
    name="my-flow-ecs",
    work_pool_name="my-ecs-pool",
    image=ecr_image,     # baked code image
    build=False,         # image already built
    push=False,          # image already pushed
    parameters={"env": "prod"},
    tags=["prod"],
    job_variables={
        "cluster": "arn:aws:ecs:us-east-1:123456789012:cluster/my-cluster",
        "execution_role_arn": "arn:aws:iam::123456789012:role/ecsTaskExecutionRole",
        "task_role_arn": "arn:aws:iam::123456789012:role/MyPrefectTaskRole",
        "subnets": ["subnet-abc", "subnet-def"],
        "security_groups": ["sg-0123456789abcdef0"],
        "assign_public_ip": False,
        "cpu": 1024,
        "memory": 2048,
        "env": {"PREFECT_LOGGING_LEVEL": "INFO"},
    },
)
2) prefect.yaml: prebuilt image (ECR + ECS)
Copy code
name: my-project

deployments:
  - name: etl-prod-ecs
    entrypoint: my_package.flows:etl_flow   # importable in the image
    image: <http://123456789012.dkr.ecr.us-east-1.amazonaws.com/myrepo:2025-09-05|123456789012.dkr.ecr.us-east-1.amazonaws.com/myrepo:2025-09-05>
    build: false
    push: false
    tags: ["prod"]
    parameters:
      env: "prod"
    work_pool:
      name: my-ecs-pool
      work_queue_name: default
      job_variables:
        cluster: arn:aws:ecs:us-east-1:123456789012:cluster/my-cluster
        execution_role_arn: arn:aws:iam::123456789012:role/ecsTaskExecutionRole
        task_role_arn: arn:aws:iam::123456789012:role/MyPrefectTaskRole
        subnets: ["subnet-abc", "subnet-def"]
        security_groups: ["sg-0123456789abcdef0"]
        assign_public_ip: false
        cpu: 1024
        memory: 2048
        env:
          PREFECT_LOGGING_LEVEL: INFO
Notes (brief) - Entrypoint must be importable inside the image (install your project in the image and use
module:function
). - For ECS+ECR: the ECS task execution role needs ECR pull permissions (e.g., AmazonECSTaskExecutionRolePolicy). Use the task role for your app’s AWS access. - Persist results to remote storage (e.g., S3) for resilience in container environments. If you want, share your ECS cluster/role names and I’ll tailor these snippets exactly.
a
Hello @Nate, Thanks for the documentation link. I already went through this. I have two questions here as per mentioned below example flow deploy for alredy backed image.
Copy code
@flow(result_storage="<s3://my-bucket/prefect-results/prefix>", persist_result=True)
def my_flow():
  ...

ecr_image = "123456789012.dkr.ecr.us-east-1.amazonaws.com/myrepo:2025-09-05"

my_flow.deploy(
  name="my-flow-ecs",
  work_pool_name="my-ecs-pool",
  image=ecr_image,   # baked code image
  build=False,     # image already built
  push=False,     # image already pushed
  parameters={"env": "prod"},
  tags=["prod"],
  job_variables={
    "cluster": "arn:aws:ecs:us-east-1:123456789012:cluster/my-cluster",
    "execution_role_arn": "arn:aws:iam::123456789012:role/ecsTaskExecutionRole",
    "task_role_arn": "arn:aws:iam::123456789012:role/MyPrefectTaskRole",
    "subnets": ["subnet-abc", "subnet-def"],
    "security_groups": ["sg-0123456789abcdef0"],
    "assign_public_ip": False,
    "cpu": 1024,
    "memory": 2048,
    "env": {"PREFECT_LOGGING_LEVEL": "INFO"},
  },
)
Question 1 As I mentioned, I have flow code baked into the image with a flow name, e.g.,
inside_docker_flow
. If I want to pass parameters to the flow dynamically during “`run_deployment`”, will they be passed to
my_flow
or
inside_docker_flow
? Question 2 I have custom REST services in front of the Prefect server that accept requests from other services. it act as a gateway that serves endpoints like deploy and run, and in turn routes to the Prefect server. But I don’t want to define any flows locally when making REST calls to the Prefect server.
n
> Question 1 parameters will be passed to whatever function lives at the entrypoint (
path/to/file.py:fn
) defined by your depoyment
prefect deployment inspect ...
> Question 2 whats the question? if you want to trigger existing flows remotely via prefect rest API, that is what deployments are for. you can be pretty flexible and have your (deployment entrypoint) flow conditionally behave differently (e.g. call different subflows/tasks conditionally) based on the inputs you end up with in your web service layer
a
@Nate I’d like to give this one more try with you. I still believe my concern about the ``entrypoint`` for pre-built images is valid. Request you to please bear with me. I just want to be very clear here, Sorry for the with the long message Here’s my setup: I have below flow which goes inside the pre-built docker image: inside_docker_flow.py: (Flow baked into the Docker image)
Copy code
#!/usr/bin/env python3

import sys
from datetime import datetime
from prefect import flow, task, get_run_logger


@flow(
  name="simple-test-flow",
  description="A simple flow to test argument passing in Docker"
)
def inside_docker_test_flow(
  text: str = "Hello Docker",
  number: int = 42,
  count: int = 3
) -> str:

  logger = get_run_logger()
   
  <http://logger.info|logger.info>("=== Starting Simple Test Flow ===")
  <http://logger.info|logger.info>(f"Received arguments:")
  <http://logger.info|logger.info>(f" - text: '{text}'")
  <http://logger.info|logger.info>(f" - number: {number}")
  <http://logger.info|logger.info>(f" - count: {count}")
  return "Hello Docker"
Dockerfile:
Copy code
# Use Python 3.11 as base image
FROM prefecthq/prefect:3.4.17-python3.11

# Set working directory
WORKDIR /opt/prefect

# Install system dependencies
RUN apt-get update && apt-get install -y \
  gcc \
  && rm -rf /var/lib/apt/lists/*

RUN pip install prefect-docker

# Copy the inside_docker_flow.py file
COPY inside_docker_flow.py .



# Create a non-root user
RUN useradd --create-home --shell /bin/bash prefect && \
  chown -R prefect:prefect /opt/prefect

# Switch to non-root user
USER prefect
Deployment script (deploy.py): which deploys and runs. (Wanted to use Weblayer API that offers deploy and run API option)
Copy code
#!/usr/bin/env python3
"""
Prefect Flow Deployment Script
This script deploys a flow to a work pool and provides methods to run it with parameters.
"""

import asyncio
from datetime import datetime
from prefect import flow, get_run_logger
from prefect.deployments import run_deployment


@flow(
  name="docker-test-flow",
  description="A flow to test argument passing in Docker containers",
  log_prints=True
)
def local_test_flow(
  text: str = "Hello Docker",
  number: int = 42,
  count: int = 3
) -> str:

  logger = get_run_logger()
   
  <http://logger.info|logger.info>("=== Starting Docker Test Flow ===")
  <http://logger.info|logger.info>(f"Received arguments:")
  <http://logger.info|logger.info>(f" - text: '{text}'")
  <http://logger.info|logger.info>(f" - number: {number}")
  <http://logger.info|logger.info>(f" - count: {count}")
   
  # Process the arguments
  result = f"Processed: {text} (number: {number}, count: {count})"
  <http://logger.info|logger.info>(f"Flow result: {result}")
   
  return result


def deploy_flow():
  """Deploy the flow to a work pool."""
  print("Deploying flow to work pool...")
   
  local_test_flow.deploy(
    name="docker-test-deployment",
    work_pool_name="my-docker-pool-1",
    build=False, # Build the image locally
    image="matilda1/prefect-flow-test:latest", # Use local image name
    push=False, # Set to True to push to registry
    tags=["docker", "testing"],
    description="Docker-based flow for testing argument passing",
    parameters={
      "text": "Default Text",
      "number": 100,
      "count": 5
    },
    job_variables={
      "networks": ["iceberg-spark-minio_iceberg_net"],
      "network_mode": "bridge",
      "image_pull_policy": "IfNotPresent",
      "auto_remove": True,
      "volumes": ["/var/run/docker.sock:/var/run/docker.sock"],
      "stream_output": True,
      "stream_logs": True,
      "stream_logs_interval": 1,
      "stream_logs_timeout": 10,
      "stream_logs_max_retries": 3,
      "stream_logs_retry_delay": 1,
      "stream_logs_retry_delay_max": 10,
      "privileged": True,
      "restart_policy": {
        "name": "always",
        "maximum_retry_count": 3
      },
      "env": {
        "PREFECT_API_URL": "<http://172.24.6.140:4200/api>",
        "PYTHONPATH": "/opt/prefect"
      }
    }
  )
  print("Flow deployed successfully!")


async def run_flow_with_parameters(
  text: str = "Hello from run_deployment",
  number: int = 200,
  count: int = 7
):

  print(f"Running flow with parameters: text='{text}', number={number}, count={count}")
   
  try:
    # Run the deployment with custom parameters
    flow_run = await run_deployment(
      name="docker-test-flow/docker-test-deployment",
      parameters={
        "text": text,
        "number": number,
        "count": count
      }
    )
     
    print(f"Flow run started with ID: {flow_run.id}")
    print(f"Flow run state: {flow_run.state}")
     
    # Wait for completion and get result
    await flow_run.wait_for_completion()
    print(f"Flow run final state: {flow_run.state}")
     
    if flow_run.state.is_completed():
      result = flow_run.state.result()
      print(f"Flow completed with result: {result}")
      return result
    else:
      print(f"Flow failed with state: {flow_run.state}")
      return None
     
  except Exception as e:
    print(f"Error running flow: {e}")
    raise


def run_flow_sync(
  text: str = "Hello from sync run",
  number: int = 300,
  count: int = 9
):
  return asyncio.run(run_flow_with_parameters(text, number, count))



if __name__ == "__main__":
  # deploy_flow()
  run_flow_sync()
Problem: When I run the deployment, parameters are passed to
local_test_flow
(from
deploy.py
), because that’s what the deployment entrypoint is set to. But what I actually want is for the parameters to be passed to
inside_docker_test_flow
, which already exists inside the pre-built Docker image. Here’s what prefect deployment inspect shows: (venv) matilda-svc@mc-dev-AdiN:~/projects/prefect_test$ prefect deployment inspect docker-test-flow/docker-test-deployment
Copy code
{
  ………………….
  'entrypoint': 'deploy.py:local_test_flow',
………………………..
}
My question: How can I set the entrypoint to point to the flow that’s already inside the Docker image (
inside_docker_flow.py:inside_docker_test_flow
) instead of a locally defined flow like
deploy.py:local_test_flow
? so that I can pass the parameters dynamically.
n
i think the answer is your question is that you cannot. deployments are 1 to 1 with their entrypoints. a deployment cannot have more than one entrypoint. if you want to pass parameters to a different flow, then you should make a separate deployment for it based on the same image
which means that creating the deployments dynamically is probably your best option, which if i remember correctly, is what you were doing before
im realizing there's likely still some confusion here that I am not picking up on, but i think the key point is what i said above a deployment can only have one entrypoint, ie a deployment can only have one flow if you have a deployment that points to
local_test_flow
as its entrypoint and you instead want a deployment to refer to a different flow in a docker container you will need to create a different deployment whose entrypoint points to that other flow in the docker container --- this might not be relevant, it only is if you want to have 2 separate flows accessible in your runtime image that you can decide between at runtime based on the parameters you pass you can generalize your deployment entrypoint such that it dispatches parameters to the correct subflow conditionally
Copy code
@flow
def meta_flow(subflow_params: T, subflow_choice: SomeEnum):
  if subflow_choice == "foo":
    # call foo subflow with subflow_params

...
and then
meta_flow
stays a static entrypoint that can dispatch any subflow
common pattern
also just for context: if you have a long form question, the best place to put it is in a github discussion. i cannot feasibly spend a ton of time in slack on hyper specific situations. github is a lot easier to keep track of
a
@Nate No, I don’t want two different flows. I understand — I just followed the approach you suggested for using a prebuilt image during deployment, and then passing parameters dynamically during the flow run. Yes. The problem with that approach is that it ends up creating two flows: one for
create_deploy
and another for the flows inside the Docker image. I wanted to demonstrate the same
which means that creating the deployments dynamically is probably your best option, which if i remember correctly, is what you were doing before
Yes, I’ve done that. which is very first I mentioned it. I can remotely download the flow code, build an image dynamically, and set the entrypoint during that build. That worked for me. Problem: I don’t want build an image during deployment. But here’s the gap I’m seeing above approach: when Prefect downloads flow code, it places it under places like``/opt/prefect/...``. Why not support a simpler option where I can provide a prebuilt image that already has the flow code in place (e.g., baked into
/opt/prefect
)
, and then just specify the entrypoint at deploy time — without needing to fetch from GitHub/S3? Something like this would be helpful
Copy code
from prefect import flow,

flow.deploy(
 name="docker-test-deployment",
 work_pool_name="my-docker-pool-1",
 build=False, # Build the image locally
 image="matilda1/prefect-flow-test:latest", # Use local image name that has flow code alredy placed locations like /opt/prefect
 'entrypoint': 'inside_docker_flow.py:inside_docker_test_flow',   # give support to entrypoint
 push=False, # Set to True to push to registry
  ........................
 )
Alternatively, we can define an entry point in the Dockerfile using
ENTRYPOINT ["python", "inside_docker_flow.py"],
and Prefect should be able to run it using underhood “`docker run prefect-flow-test arg1 arg2 arg3`" That way the image fully encapsulates the flow code inside docker, and deployment just tells Prefect which baked flow to use inside docker.
n
well its not the most common pattern to package a lot of flows into an image that you're not going to use (you're only using one of them). most of the time people use docker because they can build an environment that contains only what they need. packaging all flows into your image means you need to rebuild the image each time any of them changes pulling code at runtime is most popular because then the image only depends on OS-level and 3rd party dependencies. packaging one flow into a runtime image is nearly as popular because (while it requires image builds each time you change your flow) it doesn't require the latency of the
git_clone
step and you can easily setup CI to handle deployment-specific image builds in a monorepo we're unlikely to suggest / make it easier for users swap the entrypoint of a deployment to refer to different flows because that's not how we intended deployments to work and most people's use of prefect would not be improved by allowing this what you're trying to do can be accomplished by creating N separate deployments, with their own entrypoints, which does not require more image builds if you feel like swapping entrypoints on a deployment should be supported outright, it'd be super helpful for you to create an issue, which would allow others who feel the same as you to chime in
a
well its not the most common pattern to package a lot of flows into an image that you’re not going to use (you’re only using one of them). most of the time people use docker because they can build an environment that contains only what they need. packaging all flows into your image means you need to rebuild the image each time any of them changes
we’re unlikely to suggest/make it easier for users swap the entrypoint of a deployment to refer to different flows because that’s not how we intended deployments to work and most people’s use of prefect would not be improved by allowing this
@Nate I am also okay with having one flow per image, but I really don’t want to keep all my flows in a single image, nor do I want to be restricted in the number of deployments. What I’d like is for each flow to be placed inside a directory such as /opt/prefect in the Docker container, with a configurable entrypoint for the Python file inside the container. I understand that my requirement is a bit different from the usual case, where deployment typically happens during the CI/CD pipeline. In my case, the image is built during the CI/CD pipeline, but the actual deployment needs to be handled by a REST service running in production, which connects to the Prefect server using the prebuilt images from CI/CD. My design is somewhat influenced by how AWS Lambda works with prebuilt images. I may be expecting something similar to what Prefect could support, like aws-lambda-python-runtime-interface-client. A few examples for reference: • https://dev.to/aws-builders/how-to-package-and-deploy-a-lambda-function-as-a-container-image-3d1ahttps://docs.aws.amazon.com/lambda/latest/dg/chapter-layers.htmlhttps://medium.com/@rolanditaru/zip-or-containers-deploying-code-to-lambda-in-2025-245919b3cdcb However, for now, I’ll go with what Prefect currently supports. Thanks for all your answers and time, and for the opportunity to validate my ideas. Thanks again.
n
thanks for taking the time to explain! ill create a discussion about this https://github.com/PrefectHQ/prefect/discussions/18977