<@ULVA73B9P> for every flow code change i have to ...
# ask-marvin
m
@Marvin for every flow code change i have to re deploy. I am have prefect flow code in a docker image. How make it easy my local test. I am using prefect yaml for loca testing
m
thought for 877.5 seconds
Got it — you’re hitting friction because your flows are packaged in a Docker image and you’re redeploying on every code change. In Prefect 3.x you don’t need to redeploy for every change. Here are a few dev-friendly patterns you can use with
prefect.yaml
to speed up local testing. Key rule: When do you need to redeploy? - Redeploy only when deployment metadata changes (entrypoint path or name, parameters defaults, schedule, work pool, etc.). - If the deployment points at a stable source (local code, a Git branch, or a Docker tag that you rebuild), you can update the code and run again without redeploying. Option A: Use your local code (no Docker) for fast iteration - Run locally with a Process work pool and pull your code from your working directory or from Git. This avoids builds entirely. Example
prefect.yaml
using Git for source:
Copy code
# prefect.yaml
name: my-project
pull:
  - prefect.deployments.steps.pull.git_clone:
      repository: <https://github.com/your-org/your-repo>
      branch: dev
  - prefect.deployments.steps.pull.set_working_directory:
      directory: your-repo
deployments:
  - name: dev
    entrypoint: flows/etl.py:etl
    work_pool:
      name: local-process
Commands to create/start the local worker and deploy:
Copy code
prefect work-pool create local-process --type process
prefect worker start -p local-process
prefect deploy
prefect deployment run "etl/dev"
Tip: You can also just run your flow directly for quick tests without a deployment:
Copy code
python flows/etl.py  # calls your @flow in __main__
Or use the Python API to serve ephemeral deployments during dev:
Copy code
from prefect import flow, serve

@flow
def etl(): ...

if __name__ == "__main__":
    serve(etl)  # starts a local, ephemeral deployment
Option B: Stay with Docker but avoid redeploys by using a mutable dev tag - Keep your deployment’s
image
the same (e.g.,
my-repo/etl:dev
), rebuild that tag locally (or push to your registry), and just run the deployment again. You do not need to redeploy if the image tag string didn’t change. - For remote pulls, set
image_pull_policy
to
ALWAYS
to ensure the worker pulls the latest tag before each run.
prefect.yaml
snippet:
Copy code
deployments:
  - name: dev
    entrypoint: flows/etl.py:etl
    work_pool:
      name: local-docker
      job_variables:
        image: my-repo/etl:dev
        image_pull_policy: "ALWAYS"
Option C: Docker with bind-mounted code (true hot reload without rebuilds) - Mount your local source into the container the worker starts. This way, saving a file locally updates what runs in the container.
prefect.yaml
snippet:
Copy code
deployments:
  - name: dev
    entrypoint: flows/etl.py:etl
    work_pool:
      name: local-docker
      job_variables:
        image: my-repo/etl:base  # a base image with deps installed
        image_pull_policy: "NEVER"
        volumes:
          - "${PWD}:/opt/project"      # bind mount your repo
        container_create_kwargs:
          working_dir: "/opt/project"  # run from the mounted repo
Create/start the Docker work pool and worker:
Copy code
prefect work-pool create local-docker --type docker
prefect worker start -p local-docker
prefect deploy
What I recommend for your setup - For fastest local iteration: Option C (Docker + volume mount) or Option A (Process worker + local code). - If you prefer to keep using images: Option B (rebuild same dev tag, set
image_pull_policy: ALWAYS
) — then you won’t need to redeploy on every change. Docs for deeper reference: - Prefect YAML and steps: prefect.yaml guide - Deployment versioning patterns: deployment versioning A couple of clarifying questions so I can tailor an exact
prefect.yaml
for you: - Do you want to keep using Docker for local testing, or are you open to a Process worker for dev?
- If Docker: can you bind mount your local repo into containers on your EC2 host? - Are you pushing images to a registry for dev, or building locally on the same host where the worker runs?
m
@Marvin yes i prefet docker for local testing
m
thought for 32.7 seconds
Great — if you prefer Docker for local testing, here are two solid workflows that avoid redeploying on every code change. Both work with
prefect.yaml
in Prefect 3.x. Option 1 (Recommended): Docker worker + bind-mounted code (hot reload) - Build a base image with your dependencies only (not your flow code). - Mount your local repo into the container the Docker worker starts. Your code changes are picked up instantly; no image rebuilds and no redeploys. prefect.yaml
Copy code
name: my-project
deployments:
  - name: dev
    entrypoint: flows/etl.py:etl
    work_pool:
      name: local-docker
      job_variables:
        image: your-repo/prefect-base:py3.11  # deps only, no app code
        image_pull_policy: "NEVER"
        # Mount your local repo into the container
        volumes:
          - "${PWD}:/opt/project"
        container_create_kwargs:
          working_dir: "/opt/project"
        env:
          # Point flows to your Server API (running on same EC2)
          # On Linux, network_mode: host is easiest (see below)
          PREFECT_API_URL: "<http://127.0.0.1:4200/api>"
        # Optional, if you want containers to see the host network directly
        network_mode: "host"
One-time setup
Copy code
prefect work-pool create local-docker --type docker
prefect worker start -p local-docker
prefect deploy
Dev loop - Edit your Python files locally - Run the deployment again (no redeploy needed):
Copy code
prefect deployment run "etl/dev"
Notes - Use an absolute path instead of ${PWD} if you prefer (e.g., /home/ubuntu/my-repo:/opt/project). - If you don’t use network_mode: host, set PREFECT_API_URL to your EC2’s private IP (e.g., http//&lt;ec2 private ip&gt;4200/api). On some Linux configs, host.docker.internal may not resolve. - Your base image just needs Prefect and your libs installed. The code comes from the bind mount. Option 2: Docker image with a mutable dev tag (no redeploys, just rebuild/pull) - Keep the deployment’s image the same (e.g., your-repo/etl:dev). - Rebuild that tag when you change code; the worker always pulls the latest tag. prefect.yaml
Copy code
name: my-project
deployments:
  - name: dev
    entrypoint: flows/etl.py:etl
    work_pool:
      name: local-docker
      job_variables:
        image: your-repo/etl:dev
        image_pull_policy: "ALWAYS"
        env:
          PREFECT_API_URL: "<http://127.0.0.1:4200/api>"
        network_mode: "host"
Dev loop
Copy code
# Rebuild same tag
docker build -t your-repo/etl:dev .
# If worker pulls from a registry, push; if local-only, skip push
docker push your-repo/etl:dev

# No prefect redeploy needed; just run again
prefect deployment run "etl/dev"
When do you actually need to redeploy? - Only when deployment metadata changes: name, entrypoint path, default params, schedule, work pool, etc. - If the source of code (bind mount or image tag) stays the same string in the deployment, you can iterate without redeploying. Links - Prefect YAML guide: prefect.yaml - Deployment versioning patterns: deployment versioning Want me to tailor the exact
prefect.yaml
to your repo layout and base image? If you share: - Your base image name (or Dockerfile), - The path to your flow entrypoint (e.g., flows/etl.py:etl), - Whether you can use network_mode: host or prefer a static PREFECT_API_URL, I’ll drop in a ready-to-use config.
m
@Marvin I need a step where i keep flow code in docker image locally, and without runining prefect deploy comand again for every code change and run the flow. alos let me know how to pass parameters with the "prefect deployment run " command. another consideration is use the same prefect ymal for ci/cd pipeline
m
thought for 109.5 seconds
Totally doable. Since you want to keep your flow code inside a Docker image locally and avoid running
prefect deploy
for every change, the key is: rebuild the same image tag and keep your deployment metadata unchanged. Then you can run the deployment again with new code without redeploying. Here’s a concrete setup and dev loop that should fit your workflow. One-time setup 1) Create a Docker work pool and start a worker on your EC2 host:
Copy code
prefect work-pool create local-docker --type docker
prefect worker start -p local-docker
2) Define a single deployment in prefect.yaml that points to a stable image tag:
Copy code
name: my-project

deployments:
  - name: dev
    entrypoint: flows/etl.py:etl
    work_pool:
      name: local-docker
      job_variables:
        image: your-repo/etl:dev   # mutable dev tag
        image_pull_policy: "NEVER" # local image on same EC2 host
        network_mode: "host"
        env:
          # Since your Server & worker are on the same EC2 host
          PREFECT_API_URL: "<http://127.0.0.1:4200/api>"
3) Deploy once:
Copy code
prefect deploy
Local dev loop (no redeploy needed) - Rebuild the same tag locally when code changes:
Copy code
docker build -t your-repo/etl:dev .
- Run your deployment with the updated code:
Copy code
prefect deployment run "etl/dev" --watch
Because the deployment still references the same image tag and entrypoint, no
prefect deploy
is needed for code changes. The Docker worker will launch containers from your freshly rebuilt local image. Passing parameters with prefect deployment run - Use multiple
-p key=value
flags; values are parsed as JSON:
Copy code
prefect deployment run "etl/dev" \
  -p start_date='"2025-08-01T00:00:00Z"' \
  -p runs=10 \
  -p features='["alpha","beta"]' \
  --watch
- You can also pass a JSON object with
--params
(handy for complex/nested params):
Copy code
prefect deployment run "etl/dev" \
  --params '{"start_date":"2025-08-01T00:00:00Z","runs":10,"features":["alpha","beta"]}' \
  --watch
- If you need to override job variables at run time (e.g., env for a single run), use `-jv`:
Copy code
prefect deployment run "etl/dev" \
  -jv env='{"LOG_LEVEL":"DEBUG"}'
Using the same prefect.yaml in CI/CD - Keep a single
prefect.yaml
and vary only environment-specific values using env vars. - Example: use a dynamic image tag for CI:
Copy code
deployments:
  - name: dev
    entrypoint: flows/etl.py:etl
    work_pool:
      name: local-docker
      job_variables:
        image: your-repo/etl:${IMAGE_TAG:-dev}
        image_pull_policy: "ALWAYS"   # in CI if pulling from registry
        env:
          PREFECT_API_URL: "${PREFECT_API_URL}"
- In CI: - Build and push the image with an immutable tag (e.g., the commit SHA). - Set
IMAGE_TAG
and
PREFECT_API_URL
in the pipeline environment. - Run:
Copy code
prefect deploy --name dev
- Workers will pull the new tag on next runs. Notes for your single-EC2 setup - With
network_mode: "host"
, containers can talk to your Server via
<http://127.0.0.1:4200/api>
from inside the container. If you don’t use host networking, set
PREFECT_API_URL
to your EC2 host’s IP instead. - Since your worker and Server run on the same host,
image_pull_policy: "NEVER"
is ideal for local dev (no registry required). In CI, use
"ALWAYS"
if your worker must pull a newly pushed image. When do you actually need to redeploy? - Only if deployment metadata changes: entrypoint path, deployment name, schedule, default parameters, work pool, or the image tag string in your YAML. If you keep the tag constant (e.g.,
your-repo/etl:dev
), you can rebuild and run without redeploying. References - Prefect YAML guide: prefect.yaml - Deployment versioning patterns: deployment versioning
If you share your repo structure and Dockerfile base, I can tailor the exact
prefect.yaml
and a minimal Dockerfile that keeps dependencies in the image and makes your builds fast.
m
@Marvin prefect yaml file to run code from local code
m
thought for 15.7 seconds
Here are two minimal
prefect.yaml
examples to run flows directly from your local code in Prefect 3.x. Pick the one that fits your dev style. Option 1: Local code with a Process work pool (simplest)
Copy code
name: my-project

deployments:
  - name: dev
    description: "Run flows from local source code"
    entrypoint: flows/etl.py:etl   # path to your flow: file.py:function_name
    parameters:
      runs: 1                      # optional defaults
    work_pool:
      name: local-process
      job_variables:
        # Run from your current repo directory
        working_dir: "${PWD}"
Setup/run:
Copy code
prefect work-pool create local-process --type process
prefect worker start -p local-process
prefect deploy
prefect deployment run "etl/dev" --watch
Option 2: Local code with a Docker work pool (bind mount your repo) - Requires a base image with your Python deps preinstalled. - Mount your local repo into the container so code changes pick up instantly.
Copy code
name: my-project

deployments:
  - name: dev
    description: "Run flows from local source code mounted into Docker"
    entrypoint: flows/etl.py:etl
    work_pool:
      name: local-docker
      job_variables:
        image: your-repo/prefect-base:py311   # deps-only base image
        image_pull_policy: "NEVER"            # using local image
        # IMPORTANT: use an absolute host path to your repo
        volumes:
          - "/home/ubuntu/my-repo:/opt/project"
        container_create_kwargs:
          working_dir: "/opt/project"
        env:
          # Server and worker on same EC2 host
          PREFECT_API_URL: "<http://127.0.0.1:4200/api>"
        network_mode: "host"
Setup/run:
Copy code
prefect work-pool create local-docker --type docker
prefect worker start -p local-docker
prefect deploy
prefect deployment run "etl/dev" --watch
Notes: - Redeploy only when metadata changes (entrypoint, schedule, params defaults, work pool, image name, etc.). With the above, you can iterate on your local code without redeploying. - For Docker, ensure your base image already has Prefect and your libraries installed; your code is provided via the bind mount. - If you prefer a Git source later, you can add
pull.git_clone
steps to
prefect.yaml
. Docs: - Prefect YAML guide: prefect.yaml - Work pool job variables (templates): use
prefect work-pool get-default-base-job-template --type process|docker
to see available fields.
m
@Marvin this is my exiting prefect yaml file # Generic metadata about this project name: src prefect-version: 3.4.11 # build section allows you to manage and build docker images build: - prefect_docker.deployments.steps.build_docker_image: id: build_image requires: prefect-docker>=0.3.1 image_name: 010614160260.dkr.ecr.ap-southeast-1.amazonaws.com/credilinq-prefect tag: dev dockerfile: Dockerfile.prefect_flows # push : false # push section allows you to manage if and how this project is uploaded to remote locations push: - prefect_docker.deployments.steps.push_docker_image: requires: prefect-docker>=0.3.1 image_name: '{{ build_image.image_name }}' tag: dev # pull section allows you to provide instructions for cloning this project in remote locations pull: - prefect.deployments.steps.set_working_directory: directory: /opt/prefect/src # the deployments section allows you to provide configuration for deploying flows deployments: - name: ebay version: '{{ build_image.tag }}' tags: - '{{ build_image.tag }}' description: null schedule: {} flow_name: null entrypoint: extraction/ebay/ebay.py:run parameters: {"seller_id":"d480fd76-f59e-4ea0-95cb-4e22db0164e0"} work_pool: name: my-docker-pool-2 work_queue_name: default job_variables: image: '{{ build_image.image }}' push: false - name: local description: null schedule: {} flow_name: null entrypoint: extraction/ebay/ebay.py:run parameters: {"seller_id":"d480fd76-f59e-4ea0-95cb-4e22db0164e0"} work_pool: name: my-docker-pool-2 work_queue_name: default job_variables: working_dir: "${PWD}" i just want to deploy o
m
thought for 93.0 seconds
Thanks for sharing your
prefect.yaml
. It looks like your last sentence got cut off — did you mean “I just want to deploy once and then iterate without redeploying”? I’ll assume yes and show you two tidy options using your Docker pool. What needs fixing in your current YAML - For Docker workers,
working_dir
is not a valid job variable; set
container_create_kwargs.working_dir
instead. -
push: false
under
job_variables
is not a valid Docker worker field; remove it. - Your
pull
step path is off; if you keep it, it should be
prefect.deployments.steps.pull.set_working_directory
, but you don’t need a pull step if your code is in the image or bind-mounted. - If you’re rebuilding the dev image locally on the same EC2 host as the worker, set
image_pull_policy: "NEVER"
. If you push to ECR and want workers to pull each run, use
"ALWAYS"
. Below are two clean patterns you can copy-paste. A) Keep code in the image; rebuild the same tag for changes (no redeploys) - Rebuild
:dev
locally when code changes; do not change the tag string in the deployment. - Run the deployment again; no
prefect deploy
needed after the first time.
Copy code
name: src
prefect-version: 3.4.11

# Build/push used in CI or when you want to refresh the image (not needed for every dev run)
build:
  - prefect_docker.deployments.steps.build_docker_image:
      id: build_image
      requires: prefect-docker>=0.3.1
      image_name: <http://010614160260.dkr.ecr.ap-southeast-1.amazonaws.com/credilinq-prefect|010614160260.dkr.ecr.ap-southeast-1.amazonaws.com/credilinq-prefect>
      tag: dev
      dockerfile: Dockerfile.prefect_flows

push:
  - prefect_docker.deployments.steps.push_docker_image:
      requires: prefect-docker>=0.3.1
      image_name: "{{ build_image.image_name }}"
      tag: dev

deployments:
  - name: ebay
    entrypoint: extraction/ebay/ebay.py:run
    parameters: {"seller_id":"d480fd76-f59e-4ea0-95cb-4e22db0164e0"}
    work_pool:
      name: my-docker-pool-2
      work_queue_name: default
      job_variables:
        image: "{{ build_image.image }}"       # resolves to .../credilinq-prefect:dev
        image_pull_policy: "NEVER"             # local build on same EC2 host
        network_mode: "host"
        env:
          PREFECT_API_URL: "<http://127.0.0.1:4200/api>"
Dev loop:
Copy code
# Rebuild same tag locally (no redeploy)
docker build -t <http://010614160260.dkr.ecr.ap-southeast-1.amazonaws.com/credilinq-prefect:dev|010614160260.dkr.ecr.ap-southeast-1.amazonaws.com/credilinq-prefect:dev> -f Dockerfile.prefect_flows .

# Run the deployment with updated code
prefect deployment run "run/ebay" --watch
B) Mount local code into the container for instant updates (no image rebuilds) - Use your base image with deps; mount your repo to override code in the container.
Copy code
name: src
prefect-version: 3.4.11

deployments:
  - name: local
    entrypoint: extraction/ebay/ebay.py:run
    parameters: {"seller_id":"d480fd76-f59e-4ea0-95cb-4e22db0164e0"}
    work_pool:
      name: my-docker-pool-2
      work_queue_name: default
      job_variables:
        image: <http://010614160260.dkr.ecr.ap-southeast-1.amazonaws.com/credilinq-prefect:dev|010614160260.dkr.ecr.ap-southeast-1.amazonaws.com/credilinq-prefect:dev>
        image_pull_policy: "NEVER"
        # IMPORTANT: absolute path to your repo on the EC2 host
        volumes:
          - "/home/ubuntu/src:/opt/prefect/src"
        container_create_kwargs:
          working_dir: "/opt/prefect/src"
        network_mode: "host"
        env:
          PREFECT_API_URL: "<http://127.0.0.1:4200/api>"
Dev loop: - Edit code locally in
/home/ubuntu/src
- Run again (no redeploy, no image rebuild):
Copy code
prefect deployment run "run/local" --watch
Passing parameters at run time - Use
-p key=value
(values are JSON) multiple times:
Copy code
prefect deployment run "run/ebay" \
  -p seller_id='"d480fd76-f59e-4ea0-95cb-4e22db0164e0"' \
  -p start_date='"2025-08-01T00:00:00Z"' \
  -p runs=10 \
  --watch
- Or pass one JSON object:
Copy code
prefect deployment run "run/ebay" \
  --params '{"seller_id":"d480fd76-f59e-4ea0-95cb-4e22db0164e0","runs":10}' \
  --watch
Using the same prefect.yaml for CI/CD
- Keep this single file. In CI, run
prefect deploy
to update the deployment if you change metadata or want to rebuild/push images. - You can parameterize the image tag via env (e.g.,
${IMAGE_TAG:-dev}
) and set
image_pull_policy: "ALWAYS"
in CI if workers need to pull from ECR. Helpful references - Prefect YAML guide: prefect.yaml - Deployment versioning patterns: deployment versioning - See default job variables for Docker pools:
Copy code
prefect work-pool get-default-base-job-template --type docker
If you confirm: - the absolute path to your repo on the EC2 host, - whether you want A (image rebuild) or B (bind mount), I’ll finalize your
prefect.yaml
accordingly.
m
@Marvin this is my exiting prefect yaml file # Generic metadata about this project name: src prefect-version: 3.4.11 # build section allows you to manage and build docker images build: - prefect_docker.deployments.steps.build_docker_image: id: build_image requires: prefect-docker>=0.3.1 image_name: 010614160260.dkr.ecr.ap-southeast-1.amazonaws.com/credilinq-prefect tag: dev dockerfile: Dockerfile.prefect_flows # push : false # push section allows you to manage if and how this project is uploaded to remote locations push: - prefect_docker.deployments.steps.push_docker_image: requires: prefect-docker>=0.3.1 image_name: '{{ build_image.image_name }}' tag: dev # pull section allows you to provide instructions for cloning this project in remote locations pull: - prefect.deployments.steps.set_working_directory: directory: /opt/prefect/src # the deployments section allows you to provide configuration for deploying flows deployments: - name: ebay version: '{{ build_image.tag }}' tags: - '{{ build_image.tag }}' description: null schedule: {} flow_name: null entrypoint: extraction/ebay/ebay.py:run parameters: {"seller_id":"d480fd76-f59e-4ea0-95cb-4e22db0164e0"} work_pool: name: my-docker-pool-2 work_queue_name: default job_variables: image: '{{ build_image.image }}' push: false - name: local description: null schedule: {} flow_name: null entrypoint: extraction/ebay/ebay.py:run parameters: {"seller_id":"d480fd76-f59e-4ea0-95cb-4e22db0164e0"} work_pool: name: my-docker-pool-2 work_queue_name: default job_variables: working_dir: "${PWD}" i just want to deploy "local" deployement , it is executing build, push and pull commands too. it ask to authenticate ecr, but for local deployement i dont want ecr right ?
m
thought for 94.8 seconds
You’re right — with your current
prefect.yaml
,
prefect deploy -n local
still runs the project-level build/push steps, which triggers ECR auth. In Prefect 3.x, the
build
and
push
sections apply to the whole project, not per-deployment. So selecting a single deployment by name doesn’t skip those steps. Two clean ways to avoid ECR for your “local” deployment: A) Keep a separate local-only Prefect file (recommended) - Create a second file (e.g.,
prefect.local.yaml
) that contains only your local deployment and no
build
or
push
sections. Then use
--prefect-file
when deploying. Example `prefect.local.yaml`:
Copy code
name: src
prefect-version: 3.4.11

deployments:
  - name: local
    entrypoint: extraction/ebay/ebay.py:run
    parameters: {"seller_id":"d480fd76-f59e-4ea0-95cb-4e22db0164e0"}
    work_pool:
      name: my-docker-pool-2
      work_queue_name: default
      job_variables:
        image: <http://010614160260.dkr.ecr.ap-southeast-1.amazonaws.com/credilinq-prefect:dev|010614160260.dkr.ecr.ap-southeast-1.amazonaws.com/credilinq-prefect:dev>
        image_pull_policy: "NEVER"     # don't pull from ECR; use local image
        network_mode: "host"
        env:
          PREFECT_API_URL: "<http://127.0.0.1:4200/api>"
        # Optional: set working dir inside container if your code lives there
        container_create_kwargs:
          working_dir: "/opt/prefect/src"
Dev loop:
Copy code
# Build the image locally on the same EC2 host as the worker
docker build -t <http://010614160260.dkr.ecr.ap-southeast-1.amazonaws.com/credilinq-prefect:dev|010614160260.dkr.ecr.ap-southeast-1.amazonaws.com/credilinq-prefect:dev> -f Dockerfile.prefect_flows .

# Deploy ONLY the local deployment (no build/push)
prefect deploy --prefect-file prefect.local.yaml

# Run it (updated code is in the locally built image)
prefect deployment run "run/local" --watch
B) Remove build/push from your existing file when developing locally - Temporarily comment out the
build:
and
push:
sections in
prefect.yaml
while developing locally, then re-enable them in CI. - This works but is easy to forget; the separate-file approach is safer. Also, quick fixes to your current YAML: - For Docker workers,
working_dir
must go under
container_create_kwargs.working_dir
(not at top-level of
job_variables
). - Remove
push: false
under `job_variables`; that’s not a valid Docker job variable. Passing parameters with prefect deployment run - Use
-p key=value
(value is JSON) repeatedly:
Copy code
prefect deployment run "run/local" \
  -p seller_id='"d480fd76-f59e-4ea0-95cb-4e22db0164e0"' \
  -p runs=10 \
  --watch
- Or pass a single JSON object:
Copy code
prefect deployment run "run/local" \
  --params '{"seller_id":"d480fd76-f59e-4ea0-95cb-4e22db0164e0","runs":10}' \
  --watch
- You can override job variables at run time using
-jv
if needed:
Copy code
prefect deployment run "run/local" \
  -jv env='{"LOG_LEVEL":"DEBUG"}'
Docs for reference: - Prefect YAML guide: prefect.yaml If you want, paste your desired “local” directory path inside the container (e.g.,
/opt/prefect/src
) and whether your Dockerfile copies code there. I’ll adjust the
container_create_kwargs
and confirm you won’t hit ECR for local runs.
m
@Marvin this is prefect yaml file name: src prefect-version: 3.4.11 # the deployments section allows you to provide configuration for deploying flows deployments: - name: local description: "Run flows from local source code" schedule: {} flow_name: null entrypoint: extraction/ebay/ebay.py:run parameters: {"seller_id":"d480fd76-f59e-4ea0-95cb-4e22db0164e0"} work_pool: name: local-worker work_queue_name: default job_variables: working_dir: "${PWD}" i am using process worker i got this error FileNotFoundError: [Errno 2] No such file or directory: PosixPath('${PWD}')
m
thought for 43.8 seconds
Thanks for the details. The error happens because
${PWD}
is not expanded inside
prefect.yaml
— Prefect treats it as a literal string. For a Process worker, set
working_dir
to a real path on the host. Two easy fixes: Fix A: Use an absolute path
Copy code
name: src
prefect-version: 3.4.11

deployments:
  - name: local
    description: "Run flows from local source code"
    entrypoint: extraction/ebay/ebay.py:run
    parameters: {"seller_id":"d480fd76-f59e-4ea0-95cb-4e22db0164e0"}
    work_pool:
      name: local-worker
      work_queue_name: default
      job_variables:
        working_dir: "/home/ubuntu/src"   # replace with your repo’s absolute path
Fix B: Use Jinja env interpolation (portable across hosts) - Set an environment variable, then reference it in the YAML:
Copy code
export REPO_DIR=/home/ubuntu/src  # or your repo path
prefect.yaml:
Copy code
name: src
prefect-version: 3.4.11

deployments:
  - name: local
    description: "Run flows from local source code"
    entrypoint: extraction/ebay/ebay.py:run
    parameters: {"seller_id":"d480fd76-f59e-4ea0-95cb-4e22db0164e0"}
    work_pool:
      name: local-worker
      work_queue_name: default
      job_variables:
        working_dir: "{{ env.REPO_DIR }}"
Notes - Ensure the
working_dir
exists on the machine where the Process worker runs and contains your flow code. The
entrypoint
path is resolved relative to
working_dir
. - Create the pool if you haven’t:
Copy code
prefect work-pool create local-worker --type process
prefect worker start -p local-worker
- Deploy and run:
Copy code
prefect deploy --name local
prefect deployment run "run/local" --watch
Passing parameters at run time - Use
-p key=value
(value is JSON) or `--params`:
Copy code
prefect deployment run "run/local" \
  -p seller_id='"d480fd76-f59e-4ea0-95cb-4e22db0164e0"' \
  -p runs=3 \
  --watch

prefect deployment run "run/local" \
  --params '{"seller_id":"d480fd76-f59e-4ea0-95cb-4e22db0164e0","runs":3}' \
  --watch
Reference: prefect.yaml If you share the actual path to your repo on the EC2 instance, I’ll drop in a verified YAML snippet with that path.
m
@Marvin in this flow how to make the tasks run parrelly . these are the tasks @flow(name="EBAY",flow_run_name='{seller_id}', log_prints=True) def run(seller_id:str): APP_ENV = 'dev' BUCKET = f"credilinq-datascience-bucket-{APP_ENV}" type_value = 'prefect_origination' # seller_id = 'd480fd76-f59e-4ea0-95cb-4e22db0164e0' selling_partner_id = 'ed442813-afe7-4026-bc14-190ca41bfb23' today = datetime.utcnow() start_month_date = today - timedelta(days=365) start_date = start_month_date.replace(day=1).date() today_formatted = today.strftime('%Y-%m-%dT000001.000Z') start_date_formatted = start_date.strftime('%Y-%m-%dT000001.000Z') conn = get_db_connection(APP_ENV) get_marketplace(seller_id,selling_partner_id,conn) get_form(seller_id,selling_partner_id,conn)
m
thought for 94.5 seconds
To run those tasks in parallel in Prefect 3.x: 1) Make them Prefect tasks Prefect only schedules decorated tasks. If
get_marketplace
and
get_form
are plain functions, they will run sequentially. Decorate them with
@task
. 2) Use a task runner and submit concurrently - For I/O-bound work (DB/API), use ThreadPoolTaskRunner. - For CPU-bound work, use ProcessPoolTaskRunner. - Use the
.submit(...)
pattern to start tasks concurrently, then
.result()
to wait. Example (I/O-bound, ThreadPool)
Copy code
from datetime import datetime, timedelta
from prefect import flow, task, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner

def get_db_connection(env: str):
    # ... create and return a new DB connection ...
    ...

@task
def get_marketplace(seller_id: str, selling_partner_id: str, app_env: str):
    conn = get_db_connection(app_env)  # make connection inside the task
    try:
        # ... do work ...
        return "marketplace-done"
    finally:
        conn.close()

@task
def get_form(seller_id: str, selling_partner_id: str, app_env: str):
    conn = get_db_connection(app_env)
    try:
        # ... do work ...
        return "form-done"
    finally:
        conn.close()

@flow(name="EBAY", flow_run_name="{seller_id}", log_prints=True,
      task_runner=ThreadPoolTaskRunner(max_workers=8))
def run(seller_id: str):
    logger = get_run_logger()
    APP_ENV = "dev"
    selling_partner_id = "ed442813-afe7-4026-bc14-190ca41bfb23"

    today = datetime.utcnow()
    start_month_date = today - timedelta(days=365)
    start_date = start_month_date.replace(day=1).date()
    today_formatted = today.strftime("%Y-%m-%dT00:00:01.000Z")
    start_date_formatted = start_date.strftime("%Y-%m-%dT00:00:01.000Z")

    # Kick off tasks concurrently
    m_fut = get_marketplace.submit(seller_id, selling_partner_id, APP_ENV)
    f_fut = get_form.submit(seller_id, selling_partner_id, APP_ENV)

    # Wait for both to finish (and propagate exceptions)
    m_res = m_fut.result()
    f_res = f_fut.result()
    <http://logger.info|logger.info>(f"Results: {m_res}, {f_res}")
Notes - Don’t share a live DB connection object between tasks. Create it inside each task (as shown). This avoids issues with threads/processes and pickling. - Increase/decrease
max_workers
to tune concurrency. - If your work is CPU-bound, switch to `ProcessPoolTaskRunner`:
Copy code
from prefect.task_runners import ProcessPoolTaskRunner

@flow(..., task_runner=ProcessPoolTaskRunner(max_workers=4))
def run(...):
    ...
With process pools, every argument passed to tasks must be picklable; keep that in mind. - If you want to fire a dynamic number of parallel tasks (e.g., over a list), loop with
.submit(...)
and then gather:
Copy code
futures = [my_task.submit(x) for x in items]
results = [f.result() for f in futures]
If you share whether these tasks are DB/API-heavy or CPU-heavy, I can suggest exact
max_workers
and the best runner for your EC2 box.
m
@Marvin i added ThreadPoolTaskRunner, but get below error, still flow completed Error encountered when computing cache key - result will not be persisted. Traceback (most recent call last): File "/home/mip/ds_pipelines/venv/lib/python3.10/site-packages/prefect/cache_policies.py", line 374, in compute_key return hash_objects(hashed_inputs, raise_on_failure=True) File "/home/mip/ds_pipelines/venv/lib/python3.10/site-packages/prefect/utilities/hashing.py", line 89, in hash_objects raise HashError(msg) prefect.exceptions.HashError: Unable to create hash - objects could not be serialized. JSON error: Unable to serialize unknown type: <class 'psycopg2.extensions.connection'> Pickle error: cannot pickle 'psycopg2.extensions.connection' object The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/home/mip/ds_pipelines/venv/lib/python3.10/site-packages/prefect/task_engine.py", line 170, in compute_transaction_key key = self.task.cache_policy.compute_key( File "/home/mip/ds_pipelines/venv/lib/python3.10/site-packages/prefect/cache_policies.py", line 214, in compute_key policy_key = policy.compute_key( File "/home/mip/ds_pipelines/venv/lib/python3.10/site-packages/prefect/cache_policies.py", line 384, in compute_key raise ValueError(msg) from exc ValueError: Unable to create hash - objects could not be serialized. JSON error: Unable to serialize unknown type: <class 'psycopg2.extensions.connection'> Pickle error: cannot pickle 'psycopg2.extensions.connection' object This often occurs when task inputs contain objects that cannot be cached like locks, file handles, or other system resources. To resolve this, you can: 1. Exclude these arguments by defining a custom
cache_key_fn
2. Disable caching by passing
cache_policy=NO_CACHE
m
thought for 54.8 seconds
You’re seeing that error because a psycopg2 connection object is being passed into a task (or captured by it). Prefect computes a cache key from task inputs; DB connections aren’t serializable, so hashing fails. The flow still completes, but Prefect logs the warning and skips result persistence. Two fixes (pick one or combine): A) Don’t pass the connection; create it inside each task - This also avoids sharing a connection across threads.
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.cache_policies import NO_CACHE

def get_db_connection(env: str):
    ...

@task(cache_policy=NO_CACHE)  # optional: disables caching for this task
def get_marketplace(seller_id: str, selling_partner_id: str, app_env: str):
    conn = get_db_connection(app_env)
    try:
        # do work...
        return "marketplace-done"
    finally:
        conn.close()

@task(cache_policy=NO_CACHE)
def get_form(seller_id: str, selling_partner_id: str, app_env: str):
    conn = get_db_connection(app_env)
    try:
        # do work...
        return "form-done"
    finally:
        conn.close()

@flow(task_runner=ThreadPoolTaskRunner(max_workers=8))
def run(seller_id: str):
    app_env = "dev"
    m = get_marketplace.submit(seller_id, "ed442813-afe7-4026-bc14-190ca41bfb23", app_env)
    f = get_form.submit(seller_id, "ed442813-afe7-4026-bc14-190ca41bfb23", app_env)
    m.result(); f.result()
B) Keep passing a handle, but tell Prefect not to use it for caching - Define a custom cache_key_fn that ignores un-serializable args, or disable caching on those tasks. Custom cache key:
Copy code
from prefect.cache_policies import CacheKeyFnPolicy

def key_ignoring_conn(ctx, inputs):
    # Only use serializable inputs
    return f"{inputs.get('seller_id')}|{inputs.get('selling_partner_id')}"

@task(cache_policy=CacheKeyFnPolicy(cache_key_fn=key_ignoring_conn))
def get_marketplace(seller_id, selling_partner_id, conn):
    ...
Simplest: disable caching on the affected tasks
Copy code
@task(cache_policy=NO_CACHE)
def get_marketplace(...):
    ...
Summary - Don’t pass DB connections, clients, file handles, locks, etc. as task inputs. - Prefer creating connections inside each task (thread-safe) and/or disable caching with
cache_policy=NO_CACHE
. - The warning is harmless, but these changes will remove it and avoid overhead. If you’d like, paste your current task signatures and I’ll show the minimal edits to make them parallel-safe and cache-error-free.
m
@Marvin i have to pass APP_ENV to every task if i create db connnection inside task, can i make APP_ENV global @task(name="get_form", log_prints=True) def get_form(seller_id,selling_partner_id,APP_ENV): conn = get_db_connection(APP_ENV) sql_command_sellers = f""" '; """ df_sellers = pd.read_sql_query(sql_command_sellers, conn) print(df_sellers)
m
thought for 79.0 seconds
Yes — you don’t need to pass
APP_ENV
as a parameter to every task. Use one of these patterns: Option 1: Prefect Variables (centralized, runtime-configurable) - Set once (CLI/UI) and read from tasks/flows with
Variable.get()
. - Great for environment names, bucket names, etc. Set it:
Copy code
prefect variables set APP_ENV=dev
Use it in code:
Copy code
from prefect import flow, task
from prefect.variables import Variable

@task
def get_form(seller_id: str, selling_partner_id: str):
    app_env = Variable.get("APP_ENV", default="dev")
    conn = get_db_connection(app_env)
    try:
        df_sellers = pd.read_sql_query("SELECT ...", conn)
        print(df_sellers)
    finally:
        conn.close()
Option 2: OS environment variable (provided by work pool job variables) - Set
PREFECT__CONTEXT__ENV
or a custom
APP_ENV
in your deployment
job_variables.env
, then read
os.getenv
in code. prefect.yaml (Process worker):
Copy code
deployments:
  - name: local
    entrypoint: extraction/ebay/ebay.py:run
    work_pool:
      name: local-worker
      job_variables:
        working_dir: "/home/ubuntu/src"
        env:
          APP_ENV: "dev"
Use it:
Copy code
import os
from prefect import task

@task
def get_form(seller_id, selling_partner_id):
    app_env = os.getenv("APP_ENV", "dev")
    conn = get_db_connection(app_env)
    ...
Option 3: Flow-level constant via Prefect context - Compute once in the flow and pass only to tasks that need it, or set a module-level constant. - Still avoids passing many parameters if only a few tasks need it.
Copy code
from prefect import flow, task

@task
def get_form(seller_id, selling_partner_id, app_env):
    conn = get_db_connection(app_env)
    ...

@flow
def run(seller_id: str):
    app_env = "dev"  # or read Variable.get / os.getenv once here
    get_form.submit(seller_id, "ed4...", app_env)
Recommendation for your case - Use Option 1 (Prefect Variable) or Option 2 (env var via job_variables.env). Both let your tasks create their own DB connection without threading/pickling issues and without repeating
APP_ENV
everywhere. - If you already maintain a per-environment Prefect Server, Variables are nice because you can change APP_ENV centrally in the UI/CLI without code changes. Docs: - Variables: Prefect Variables - Customizing job variables: Customize job variables If you share which you prefer (Prefect Variable vs env var), I can refactor your snippet accordingly.