<@ULVA73B9P> So back onto the issue where if the m...
# ask-marvin
g
@Marvin So back onto the issue where if the main API code engine app is redeployed, somehow, the prefect worker code engine app loses it's ability to pull code from bitbucket. Why on earth is this happening??? The main API deployment does 2 things: creates 2 "prefect serve()" flow runs (the old way we were doing this - now deprecated) and the second thing: it starts the main FAST API. That's it, it should not mess with the "new" deployments that are created by the worker code engine app that will be scheduling work on the worker code engine app.
m
thought for 193.9 seconds
n
fyi @Gabor Hosszu memory exists per thread by default. marvin might remember some facts between threads (like chatgpt) but i recommend working in one slack thread if you're going to invoke some thing you previously mentioned
👍 1
also
prefect serve()" flow runs (the old way we were doing this - now deprecated)
what do you mean deprecated here?
m
Short answer: your worker isn’t “forgetting Prefect,” it’s “forgetting Git.” When the main API app is redeployed, something about the worker’s Git auth/materials is getting reset or lost, so subsequent flow runs can’t clone from Bitbucket. A few very likely culprits given Code Engine and Prefect 3.x “from_source” deployments: - Ephemeral SSH state: if you rely on SSH and the worker writes
~/.ssh/known_hosts
at runtime, any restart/scale-to-zero wipes it. Next git clone = “Host key verification failed.” Bundling known_hosts in the image or mounting it as a secret/config and setting
GIT_SSH_COMMAND=ssh -i /path/to/key -o UserKnownHostsFile=/path/to/known_hosts -o StrictHostKeyChecking=yes
fixes that. - Credentials mounted in the worker’s env/files are being rotated or replaced by your main API redeploy pipeline (shared secrets, same secret names, or IaC that “recreates” them). That would flip a working worker into “Authentication failed” with HTTPS or “Permission denied (publickey)” with SSH. - Shared image tag: if both apps happen to point at the same container image tag (e.g., latest), and the API redeploy pushes a new image without the Git setup, the worker pulls it on restart and loses the ability to clone. Seen it. It’s… memorable. - Security context or user change: if the worker suddenly runs as a non-root random UID and can’t read the mounted SSH key or write to
$HOME/.ssh
,
git clone
will start failing after the main API redeploy if your pipeline applies project-level policy changes. To get you to root cause fast, can you share: - The exact git error from the worker after the failure (Host key verification failed? Authentication failed? Permission denied (publickey)?). - Whether you’re using SSH or HTTPS to Bitbucket. - How the worker gets credentials (Code Engine secret env vars? Mounted SSH key file? Known_hosts source?). - Whether the worker restarts around the time the API is redeployed (new revision, scale-to-zero, or evicted). - Whether both apps use the same image/tag. - A snippet of your deployment definition: either the
flow.from_source(...).deploy(...)
call (source URL and entrypoint) or the relevant
prefect.yaml
“pull” config. Concrete hardening steps (works regardless of which gremlin it is): - If using SSH: - Mount the private key and known_hosts as files (secrets/config) and set
Copy code
GIT_SSH_COMMAND="ssh -i /run/secrets/ssh/id_rsa -o UserKnownHostsFile=/run/config/ssh/known_hosts -o IdentitiesOnly=yes -o StrictHostKeyChecking=yes"
so git does not depend on ephemeral
~/.ssh
. - Ensure file perms are readable by the container user. - If using HTTPS: - Use an app password/token via env var injected into the worker, and a URL like
Copy code
<https://x-token-auth:${BITBUCKET_APP_PASSWORD}@bitbucket.org/org/repo.git>
or
<https://username:${BITBUCKET_APP_PASSWORD}@bitbucket.org/>...
- Verify the worker image has git installed and `sslCAinfo`/certs intact. - Avoid shared mutable tags between API and worker images. Pin the worker to an immutable digest/tag that you control. - Keep secrets for API and worker separate (names/namespaces) so redeploying one cannot “helpfully” recreate the other. - If you must write to
~/.ssh
, set a deterministic HOME that’s writable and persists across runs, or re-create the directory/known_hosts on container start. Also, quick sanity check: mixing “old”
prefect serve()
(deprecated) with “new” deployments is fine in 3.x, but it shouldn’t impact cloning. Still, make sure those serves aren’t touching the same work pool/queue the worker uses. Send me the git error text and a peek at how the worker authenticates to Bitbucket, and I’ll point at the exact wire that’s fraying.
g
Thanks @Nate! Sorry, just copy pasted the same message I asked ChatGPT, not sure I actually talked to Marvin about this so far.
what do you mean deprecated here?
Deprecated in the context of our own infrastructure, nothing prefect related. We have been using so far a single IBM Cloud Code Engine container running both our main FastAPI backend and the supporting prefect flows the backend uses through
serve()
functionality. The prefect server is hosted on a separate IBM Code Engine app. Now we are moving to a paradigm where we have a dedicated scalable worker running the prefect flows and this worker is setup as a 3rd IBM code engine app to just run a generic process worker on it. The results are setup to be saved into COS (cloud object storage that is basically s3). The deployment setup we have is to pull the "latest" from bitbucket so when a flow run is scheduled, the worker should use the bitbucket block to pull code before executing anything. For some reason, when our main API is redeployed, the worker loses awareness of the "bitbucket_credentials" block.... no idea how or why because I can see those blocks are still there on the prefect server UI. Also, the only thing the "main" API deployment touches is running 2 prefect flows with serve() like I said so it's not like it's overwriting some metadata or anything... and even if it were, it would be overwriting on the prefect server and I would see that. It's not something obvious like the
serve()
has the same deployment name or anything like that. They are different deployments with different names. If I redeploy the worker after the API redeployment, it can magically use bitbucket block again...
@Marvin the exact error I am getting is "No block class found for slug 'bitbucket-repository'. Please ensure that the block class is available in the current environment."
m
thought for 89.1 seconds
That error means your deployment still references a Prefect “Bitbucket repository” block, but the worker’s environment no longer has the block class installed/available. In Prefect 3.x, those git storage blocks were removed; deployments should pull code via from_source instead. So when your main API redeploy updated the shared image/env, the worker lost the plugin that used to provide that block slug — and poof, no clone. Confirm it: - Run
Copy code
prefect deployment inspect "<flow-name>/<deployment-name>"
and check the “storage/source” section. If you see a block document with slug
bitbucket-repository
, that’s the smoking gun. Two paths forward: Preferred (3.x-native) fix - Recreate the deployment without blocks, using from_source and normal Git auth:
Copy code
from prefect import flow

  @flow
  def my_flow():
      ...

  if __name__ == "__main__":
      flow.from_source(
          source="<https://bitbucket.org/<org>/<repo>.git>",  # or ssh URL
          entrypoint="path/to/flow.py:my_flow",
          revision="main",  # tag/sha/branch
      ).deploy(
          name="my-deployment",
          work_pool_name="<your-code-engine-pool>",
      )
- For SSH, mount key and known_hosts and set:
Copy code
GIT_SSH_COMMAND="ssh -i /run/secrets/ssh/id_rsa -o UserKnownHostsFile=/run/config/ssh/known_hosts -o IdentitiesOnly=yes -o StrictHostKeyChecking=yes"
- For HTTPS, inject a Bitbucket app password/token and use:
Copy code
<https://username:${BITBUCKET_APP_PASSWORD}@bitbucket.org/<org>/<repo>.git>
- Pin the worker to its own immutable image/tag. Don’t let the API redeploy “helpfully” replace it. Temporary workaround (only if you’re actually on 2.x) - Install and register the Bitbucket block in the worker image so the slug resolves again:
Copy code
pip install prefect-bitbucket
  prefect block register -m prefect_bitbucket
This is not supported in 3.x, so only do this if you confirm you’re on 2.x. A couple of quick questions to nail this down: - What Prefect version is running in the worker container? (
prefect version
) - Are the API and worker using the same container image/tag? - Using SSH or HTTPS for Bitbucket, and how are creds provided (env vs mounted files)? Once you confirm version and how the deployment was created, I can give you the exact replacement snippet. In any case, the fix is to stop depending on the old
bitbucket-repository
block and move cloning to from_source with explicit Git auth in the worker.
g
@Marvin the prefect version in the container is 3.4.1. The API and the worker run on 2 completely different code engine applications (containers). For Bitbucket, the credentials were created as a bitbucket block on the prefect server dashboard UI. The deployment script does
Copy code
# Determine source (BitBucket or local) lazily
        if use_bitbucket:
            repo = BitBucketRepository.load(settings.bitbucket_repository_block_name)
            deployment_source = repo
        else:
            deployment_source = str(PROJECT_ROOT)

        flow_function.from_source(
            source=deployment_source,
            entrypoint=f"src/flows/{flow}/workflow.py:{flow_function.__name__}",
        ).deploy(
            name=name,
            work_pool_name=pool_name,
        )
m
thought for 105.1 seconds
Right, that error is the giveaway: in Prefect 3.4.1 there is no built-in “bitbucket-repository” block. Your deployment is referencing a block doc with slug
bitbucket-repository
, but the worker’s environment doesn’t have the plugin that defines that block class, so it can’t deserialize it. Hence “No block class found…” Why it appears after your API redeploy - Your API container creates/recreates deployments. Those deployments now point to a Bitbucket block object. - The worker container does not have the Bitbucket block class installed/registered, so when it tries to pull the source for a flow run, it explodes. - The two apps being separate doesn’t matter; what matters is the deployment metadata now references a block your worker can’t load. In Prefect 3.x, git storage blocks were removed. The supported pattern is to pass a git URL/string to from_source and let git run with your own credentials. So, stop handing a block instance to from_source. What to change in your script Replace the block usage with a plain repo URL and let the worker’s environment provide credentials. Example:
Copy code
# before
if use_bitbucket:
    repo = BitBucketRepository.load(settings.bitbucket_repository_block_name)
    deployment_source = repo
else:
    deployment_source = str(PROJECT_ROOT)

flow_function.from_source(
    source=deployment_source,
    entrypoint=f"src/flows/{flow}/workflow.py:{flow_function.__name__}",
).deploy(...)

# after (3.x-native)
if use_bitbucket:
    # choose ssh or https
    source_url = "<mailto:git@bitbucket.org|git@bitbucket.org>:<org>/<repo>.git"  # SSH
    # or: "<https://bitbucket.org/<org>/<repo>.git>"    # HTTPS
else:
    source_url = str(PROJECT_ROOT)

flow_function.from_source(
    source=source_url,
    entrypoint=f"src/flows/{flow}/workflow.py:{flow_function.__name__}",
).deploy(
    name=name,
    work_pool_name=pool_name,
)
Provide credentials to the worker - SSH (recommended in containers): - Mount an SSH private key and known_hosts into the worker container and set:
Copy code
GIT_SSH_COMMAND="ssh -i /run/secrets/ssh/id_rsa -o UserKnownHostsFile=/run/config/ssh/known_hosts -o IdentitiesOnly=yes -o StrictHostKeyChecking=yes"
- Ensure file permissions allow the container user to read those files. - HTTPS: - Use a Bitbucket app password/token in env vars and either: - Inject it via a credential helper, or - Include it in the URL at runtime via env expansion (careful not to bake secrets into the deployment):
Copy code
<https://username:${BITBUCKET_APP_PASSWORD}@bitbucket.org/<org>/<repo>.git>
If you absolutely must keep using the block (not recommended on 3.x) - Install and register the package that defines it in the worker image, e.g.:
Copy code
pip install prefect-bitbucket
  prefect block register -m prefect_bitbucket
Then import the module on startup so the block class is available. This is brittle and 3.x is designed to avoid it. Sanity checks - Inspect one of the broken deployments to confirm it references the block:
Copy code
prefect deployment inspect "<flow>/<deployment>"
- Pin the worker to its own image/tag so API redeploys can’t accidentally change its environment. If you tell me whether you’re going SSH or HTTPS for Bitbucket and how you mount secrets in Code Engine, I’ll give you the exact env/volume settings to make the worker clones reliable. Yes, reliable. I can dream.
g
@Marvin thanks for the answer but to me this is rather a bit complicated and since I already have an S3 bucket integration block with my prefect server, could I maybe change the paradigm to push the code to a bucket during deployment and have the worker container run it from there?
m
thought for 240.2 seconds
n
i havent read all of the above but what you suggested > could I maybe change the paradigm to push the code to a bucket during deployment and have the worker container run it from there? is the normal paradigm.
prefect deploy
(or .deploy) will run your `build` and `push` steps and then the process started by the worker will run your
pull
steps at runtime in exec env
👀 1
m
Short answer: yes. In Prefect 3.x you can deploy by pushing your code to S3 and have workers pull and run it. Two simple patterns: 1) Let Prefect upload your code to S3 at deploy time - Create a filesystem block that points at your bucket/prefix (one-time):
Copy code
from prefect.filesystems import RemoteFileSystem

# points to s3://<bucket>/<prefix> where packages will be uploaded
RemoteFileSystem(basepath="<s3://my-bucket/prefect/projects/my-project>").save(
    "prod-s3", overwrite=True
)
- Deploy from your local project and tell Prefect where to upload:
Copy code
from prefect import flow
from prefect.filesystems import RemoteFileSystem

@flow
def my_flow():
    print("hello from s3-backed deployment")

if __name__ == "__main__":
    my_flow.from_source(
        source=".",                                # your local project
        entrypoint="flows/my_flow.py:my_flow",     # file.py:function
        storage=RemoteFileSystem.load("prod-s3"),  # upload target
    ).deploy(
        name="my-flow-prod",
        work_pool_name="my-pool",
    )
What happens: Prefect packages your project, uploads it to s3://my-bucket/prefect/projects/my-project, and your worker downloads/extracts that package when it runs the flow. 2) Keep your code in S3 yourself and point the deployment at it - If you already push code to S3 in CI:
Copy code
from prefect import flow

@flow
def my_flow():
    ...

if __name__ == "__main__":
    my_flow.from_source(
        source="<s3://my-bucket/prefect/projects/my-project>",  # you maintain this
        entrypoint="flows/my_flow.py:my_flow",
    ).deploy(
        name="my-flow-prod",
        work_pool_name="my-pool",
    )
Workers will pull directly from that S3 path at runtime. Notes and gotchas - Make sure your worker image can read from S3: - If using RemoteFileSystem, install s3fs in the worker container and provide AWS creds via IAM role or env vars. - If you prefer an AWS-native block, you can use the S3Bucket block from prefect-aws; then install prefect-aws in the worker image. - Use
prefect deploy
(not the removed 2.x commands) if you prefer CLI-based deployments. - Docs: Store your flow code Want me to tailor this to your exact setup? Let me know: - Prefect version (assuming 3.x) - The block you already created (RemoteFileSystem vs S3Bucket) - Your work pool name I’ll give you a copy-pasteable snippet and the minimal worker image requirements.
g
@Marvin why use
RemoteFileSystem
block when I already have an
S3 Bucket
block created for result storage. Can't I re-use this?
m
thought for 64.8 seconds
Short answer: yes, you can reuse your existing S3 Bucket block. In Prefect 3.x, both
RemoteFileSystem
and `prefect-aws`’s
S3Bucket
blocks work as deployment storage. If you already have an
S3Bucket
block for results, you can point the same block (or a clone of it) at a different prefix for code bundles. Why you might still make a second block: - Keeps code packages separate from results (cleaner listings, simpler lifecycle policies) - Lets you tweak permissions independently (read-only for code, write for results) How to use your S3 block for code uploads during deploy
Copy code
from prefect import flow
from prefect_aws.s3 import S3Bucket  # requires prefect-aws installed

@flow
def my_flow():
    print("hello from s3-backed deployment")

if __name__ == "__main__":
    # Reuse your existing block; ideally ensure it points at a code-specific prefix
    s3 = S3Bucket.load("my-existing-s3-block")

    my_flow.from_source(
        source=".",                                # your local project
        entrypoint="flows/my_flow.py:my_flow",     # file.py:function
        storage=s3,                                # Prefect will upload the package here
    ).deploy(
        name="my-flow-prod",
        work_pool_name="my-pool",
    )
Worker/container requirements - If you use `S3Bucket`: install
prefect-aws
(and it brings
boto3
) in the worker image; provide AWS creds via IAM role/env vars. - If you instead point
source="s3://..."
without a block: you’ll need
s3fs
in the worker image. Docs: Store your flow code If you share your block name and the bucket/prefix you want for code vs results, I’ll give you a copy‑pasteable config. I’ll try to contain my enthusiasm.
g
@Marvin I get this error
ValueError: Unable to find block document named s3-bucket/minio-deployments for block type s3-bucket
But I can tell you with 100% certainty that the block is there, just 1 line above in the dockerfile I run the following:
Copy code
prefect block inspect s3-bucket/minio-deployments
and I get the following result:
Copy code
worker-1  |                           s3-bucket/minio-deployments                           
worker-1  | ┌───────────────┬──────────────────────────────────────────────────────────────┐
worker-1  | │ Block Type    │ S3 Bucket                                                    │
worker-1  | │ Block id      │ 969eaf30-ab89-4eeb-8d32-727f8755089a                         │
worker-1  | ├───────────────┼──────────────────────────────────────────────────────────────┤
worker-1  | │ bucket_name   │ lead-companion-prefect                                       │
worker-1  | │ bucket_folder │ deployments                                                  │
worker-1  | │ credentials   │ {'minio_root_user': 'minioadmin', 'minio_root_password':     │
worker-1  | │               │ '********', 'region_name': 'us-east-1',                      │
worker-1  | │               │ 'aws_client_parameters': {'use_ssl': False, 'verify': False, │
worker-1  | │               │ 'endpoint_url': '<http://minio:9000>'}}                        │
worker-1  | └───────────────┴──────────────────────────────────────────────────────────────┘
As you can see, the block is 100% there so this error is misleading. The code that raises this error is the following:
Copy code
# Determine source/target (Cloud Object Storage or local) lazily
        if use_cos:
            cos_repo = S3Bucket.load(settings.prefect_default_deployment_storage_block)

        flow_function.from_source(
            source=str(PROJECT_ROOT),
            entrypoint=f"src/flows/{flow}/workflow.py:{flow_function.__name__}",
            storage=cos_repo if use_cos else None,
        ).deploy(
            name=name,
            work_pool_name=pool_name,
        )
m
thought for 182.2 seconds
You’ve (almost) got the right string, just in the wrong place. In Python, when you load a typed block like
S3Bucket
,
S3Bucket.load(...)
expects only the block’s name. You’re passing the CLI-style slug
s3-bucket/minio-deployments
, so Prefect is literally looking for a block named “s3-bucket/minio-deployments” of type “s3-bucket” — which of course doesn’t exist. Hence the very earnest error. Fix: pass only the name.
Copy code
python
from prefect_aws.s3 import S3Bucket

# If your setting contains "s3-bucket/minio-deployments", strip the type prefix
block_id = settings.prefect_default_deployment_storage_block
name_only = block_id.split("/", 1)[-1]

cos_repo = S3Bucket.load(name_only)
Or store the name-only value in your setting to begin with:
Copy code
python
cos_repo = S3Bucket.load("minio-deployments")
If you really want to use the full
type/name
slug in code, use the base
Block
loader and then cast/check:
Copy code
python
from prefect.blocks.core import Block
from prefect_aws.s3 import S3Bucket

blk = Block.load("s3-bucket/minio-deployments")
assert isinstance(blk, S3Bucket)
cos_repo = blk
Also double-check you’re importing from
prefect_aws.s3
and that
prefect-aws
is installed in your image. Relevant docs: - Blocks concept (3.x) - prefect block CLI (inspect uses type/name) - Block.load API If stripping the prefix doesn’t do it, share the exact value of
settings.prefect_default_deployment_storage_block
and your Prefect version, and we’ll stare at it together until it behaves.
g
@Marvin I made the change and set the block to be called
minio-deployments
only but now I get this:
Copy code
worker-1  | 'minio-deployments' is not valid. Slug must contain a '/', e.g. 
worker-1  | 'json/my-json-block'
m
thought for 168.1 seconds
g
@Marvin ignore my previous comment, this is the error I get now:
Copy code
worker-1  | TypeError: Flow.from_source() got an unexpected keyword argument 'storage'
m
thought for 244.6 seconds
Right error, wrong place. That message appears when you pass a plain string to something that expects a block slug (
type/name
). In Prefect 3.x: - If you pass a string to
from_source(..., storage=...)
, it must be a slug like
s3-bucket/minio-deployments
. - If you load a typed block in Python, use the name only, and pass the object. Pick one of these, not a hybrid: Option A: Load the block and pass the object
Copy code
python
from prefect_aws.s3 import S3Bucket

if use_cos:
    storage = S3Bucket.load("minio-deployments")  # name only
else:
    storage = None

flow_function.from_source(
    source=str(PROJECT_ROOT),
    entrypoint=f"src/flows/{flow}/workflow.py:{flow_function.__name__}",
    storage=storage,
).deploy(
    name=name,
    work_pool_name=pool_name,
)
Option B: Pass the slug string directly (no load)
Copy code
python
flow_function.from_source(
    source=str(PROJECT_ROOT),
    entrypoint=f"src/flows/{flow}/workflow.py:{flow_function.__name__}",
    storage="s3-bucket/minio-deployments",  # slug with '/'
).deploy(
    name=name,
    work_pool_name=pool_name,
)
If you’re still seeing the slug error, you’re likely passing a string (e.g.,
minio-deployments
) into
storage
. Print
type(storage)
just before
from_source
to be sure. Docs, for when you inevitably need to prove it to your future self: - Store flow code (storage) - Blocks concept If you share your updated snippet, I’ll point at the exact line giving it ideas above its station.
g
@Marvin ignore my previous comment, this is the error I get now:
Copy code
worker-1  | TypeError: Flow.from_source() got an unexpected keyword argument 'storage'
m
thought for 54.8 seconds
Yep — in Prefect 3.x there’s no
storage=
kwarg on
Flow.from_source
. It only takes
source
and
entrypoint
. Also:
prefect_aws.s3.S3Bucket
is not deployment storage in 3.x. If you want to keep your code on MinIO/S3, use a filesystem block (e.g.,
RemoteFileSystem
) and pass that as the
source
. Here’s the corrected pattern:
Copy code
python
from prefect.filesystems import RemoteFileSystem
from prefect_aws.s3 import S3Bucket  # fine for data/results, not for code storage

# Option A: local source (no remote storage)
src = str(PROJECT_ROOT)

# Option B: MinIO/S3 for code storage via RemoteFileSystem
if use_cos:
    # load an existing block
    # remote = RemoteFileSystem.load("minio-deployments")
    # or construct inline if you haven’t created the block yet:
    remote = RemoteFileSystem(
        basepath="<s3://lead-companion-prefect/deployments>",
        settings={
            "client_kwargs": {"endpoint_url": "<http://minio:9000>"},
            "key": "minioadmin",
            "secret": "minioadmin",
            "use_ssl": False,
        },
    )
    # remote.save("minio-deployments", overwrite=True)  # optional
    src = remote

flow_function.from_source(
    source=src,  # path, URL, or a filesystem block implementing ReadableDeploymentStorage
    entrypoint=f"src/flows/{flow}/workflow.py:{flow_function.__name__}",
).deploy(
    name=name,
    work_pool_name=pool_name,
)
Notes: - Install
s3fs
in your image so
RemoteFileSystem
can read/write S3/MinIO. - Keep using
S3Bucket.load("minio-deployments")
for data/result ops if you need it, but not as the deployment source. Docs: - Store flow code (3.x) If you’d rather point at a Git repo instead of S3, you can set
source
to a Git URL and be done with it. Much less drama.
Ah, there it is. In Prefect 3.x,
Flow.from_source
does not accept a
storage=
kwarg. The API changed; storage is specified via the
source
parameter. Use one of these patterns: Option A: Pass the S3 block object as the source
Copy code
python
from prefect_aws.s3 import S3Bucket

source = S3Bucket.load("minio-deployments") if use_cos else str(PROJECT_ROOT)

flow_function.from_source(
    source=source,
    entrypoint=f"src/flows/{flow}/workflow.py:{flow_function.__name__}",
).deploy(
    name=name,
    work_pool_name=pool_name,
)
Option B: Pass the slug string as the source (no
.load
)
Copy code
python
source = "s3-bucket/minio-deployments" if use_cos else str(PROJECT_ROOT)

flow_function.from_source(
    source=source,
    entrypoint=f"src/flows/{flow}/workflow.py:{flow_function.__name__}",
).deploy(
    name=name,
    work_pool_name=pool_name,
)
Notes: - If you pass a string slug, it must include a slash (
type/name
), e.g.
s3-bucket/minio-deployments
. If you pass a block object, use the name only in
S3Bucket.load("minio-deployments")
. - Make sure
prefect-aws
is installed in the environment doing the deployment. Docs, because someone moved the cheese: - Store flow code (3.x) - Blocks concept
g
@Marvin I don't want to use s3 as the SOURCE for the deployment. I want to deploy the code from the codebase, but package up the code to s3 (or any other common system for that matter) so that the worker can load this code when running the deployment
m
thought for 165.0 seconds
Got it — you want to build from local code but publish the bundle to remote storage so workers can fetch it. In Prefect 3.x that’s exactly what “source” means for deployments. There isn’t a separate “please upload my code somewhere” argument. Use a deployment storage that’s writable (e.g., RemoteFileSystem pointing at S3/MinIO) as the source in from_source. Prefect will package your local project and upload it there; workers then pull from that location at run time. Two clean patterns: 1) Use a RemoteFileSystem block (recommended)
Copy code
python
from prefect.filesystems import RemoteFileSystem

# One-time setup (or bake into your image); create a block pointing to MinIO/S3
RemoteFileSystem(
    basepath="<s3://lead-companion-prefect/deployments>",
    settings={
        "key": "minioadmin",
        "secret": "minioadmin",
        "client_kwargs": {"endpoint_url": "<http://minio:9000>"},
    },
).save("minio-deployments", overwrite=True)

# Then, when deploying:
flow_function.from_source(
    source="remote-file-system/minio-deployments",  # slug with type/name
    entrypoint=f"src/flows/{flow}/workflow.py:{flow_function.__name__}",
).deploy(
    name=name,
    work_pool_name=pool_name,
)
2) Pass a RemoteFileSystem instance directly
Copy code
python
from prefect.filesystems import RemoteFileSystem

remote = RemoteFileSystem(
    basepath="<s3://lead-companion-prefect/deployments>",
    settings={
        "key": "minioadmin",
        "secret": "minioadmin",
        "client_kwargs": {"endpoint_url": "<http://minio:9000>"},
    },
)

flow_function.from_source(
    source=remote,
    entrypoint=f"src/flows/{flow}/workflow.py:{flow_function.__name__}",
).deploy(
    name=name,
    work_pool_name=pool_name,
)
Important notes: - Install s3fs in both the build environment and all workers so fsspec can access S3/MinIO. - S3Bucket (prefect-aws) is for data/results, not deployment storage in 3.x. Use RemoteFileSystem for code storage. - If you pass a slug string, include the type:
remote-file-system/minio-deployments
. If you pass an object, just pass the object. Docs: Store flow code (3.x) If you prefer the CLI,
prefect deploy
can do the same via steps (push/pull to S3), but the Python examples above are the straight path.
g
@Marvin I get "ModuleNotFoundError: No module named 's3fs'"
m
thought for 22.7 seconds
That’s expected. RemoteFileSystem uses fsspec’s S3FileSystem under the hood, which lives in the s3fs package. Install it anywhere you build/deploy and anywhere your workers run. Pick your poison: - pip:
Copy code
pip install s3fs
# or pin
pip install "s3fs>=2024.3.1"
- uv:
Copy code
uv add s3fs
- Poetry:
Copy code
poetry add s3fs
- Conda:
Copy code
conda install -c conda-forge s3fs
Dockerfile example (builder and worker images):
Copy code
RUN pip install --no-cache-dir prefect prefect-aws s3fs
Quick verify:
Copy code
python -c "import s3fs, fsspec; print('s3fs OK', s3fs.__version__)"
For MinIO, keep your RemoteFileSystem config as you had it:
Copy code
RemoteFileSystem(
  basepath="<s3://lead-companion-prefect/deployments>",
  settings={
    "key": "minioadmin",
    "secret": "minioadmin",
    "client_kwargs": {"endpoint_url": "<http://minio:9000>"},
    "use_ssl": False,
  },
)
Rebuild/redeploy your images after adding s3fs. Then your deployment will package from local code and upload to S3/MinIO; workers will be able to pull it without complaining about missing modules. Thrilling stuff.
g
@Marvin I get
Copy code
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/tmpxogpe40t/remote-file-system-None/src/flows/audio_processing/workflow.py'
m
thought for 48.5 seconds
That error means Prefect couldn’t find your entrypoint file in the build context it packaged. In plain terms: the file wasn’t copied into the temp dir before upload. Common causes (and quick fixes): 1) Not running from your project root - Prefect packages “the current directory” when you use a remote source. - Ensure your script runs with CWD at the project root (where your src/ lives) or chdir before calling from_source:
Copy code
import os
os.chdir(PROJECT_ROOT)  # make sure this is the directory containing src/
- Sanity check:
Copy code
from pathlib import Path
print("CWD:", Path.cwd())
p = Path("src/flows/audio_processing/workflow.py")
print("Entrypoint exists:", p.resolve(), p.exists())
2) Your Dockerfile runs deploy before copying code - Ensure you COPY the repo first and set WORKDIR, then run the deployment:
Copy code
WORKDIR /app
COPY . /app
RUN pip install prefect prefect-aws s3fs
# now run your deploy script
3) .prefectignore / .gitignore excludes your code - Prefect respects .prefectignore (and .gitignore if .prefectignore is absent). - If you accidentally ignore src/ or src/flows/**, Prefect will build an empty-ish package. - Remove those patterns or create a .prefectignore that includes what you need. 4) Relative entrypoint mismatch - Keep entrypoint relative to project root:
Copy code
entrypoint="src/flows/audio_processing/workflow.py:my_flow"
- If you must, try an absolute path to confirm the file exists, but best practice is relative to project root. Reminder: - s3fs must be installed in the environment doing the deploy AND on workers. If it still fails, share: - Your CWD at deploy time - The exact entrypoint string - Contents of .prefectignore/.gitignore around src/ - The Dockerfile snippet that copies code and runs the deploy We’ll make it behave, even if it needs gentle coercion.
g
@Marvin I have moved my
deploy.py
deployment script to the root of the project (same level as
src/
basically) and I am just running this in the Dockerfile:
Copy code
# Copy source code and entrypoint script into container
COPY src/ src/
COPY entrypoint.sh ./
# Copy Poetry project and lock file (contains exact dependency versions)
COPY pyproject.toml poetry.lock ./
# Copy deployment script for Prefect flows.
COPY deploy.py ./
And then I just run
python deploy.py deploy-all ${DEPLOY_ARGS}
in a script. Yet, I still get the following error:
Copy code
/tmp/tmpvqok3aw1/remote-file-system-None/src/flows/audio_processing/workflow.py' encountered an exception: FileNotFoundError(2, 'No such file or directory')
m
thought for 62.6 seconds
That path means the entrypoint file wasn’t included in the package Prefect staged before uploading. In 3.x, when you pass a remote storage as source, Prefect zips up the current working directory and expects your entrypoint to exist inside that zip at the exact relative path you gave. Checklist to fix: 1) Ensure you’re packaging from the project root - Set a deterministic working directory in your Dockerfile and copy code there:
Copy code
WORKDIR /app
COPY pyproject.toml poetry.lock ./
COPY src/ src/
COPY deploy.py ./
# install deps...
- In deploy.py, force CWD to project root and sanity-check the entrypoint:
Copy code
from pathlib import Path
import os

os.chdir(Path(__file__).resolve().parent)
p = Path("src/flows/audio_processing/workflow.py")
print("CWD:", Path.cwd())
print("Entrypoint exists:", p.resolve(), p.exists())
assert p.exists(), "Entrypoint path does not exist at build time"
2) Entry point must be relative to the packaging root - Use exactly:
Copy code
entrypoint="src/flows/audio_processing/workflow.py:<your_flow_function_name>"
- Typos matter: check underscores vs hyphens, folder names, and the actual flow function name. 3) Don’t accidentally exclude your code - Prefect respects .prefectignore (and .gitignore if .prefectignore is missing). - Make sure these do NOT exclude
src/
or
src/flows/**
. - As a test, create a minimal .prefectignore (or remove it) and try again. 4) Install required extras in both build and worker images - You already added s3fs; keep it in both images so workers can pull the package. Concrete skeleton for the deploy call:
Copy code
python
from prefect.filesystems import RemoteFileSystem

remote = RemoteFileSystem(
    basepath="<s3://lead-companion-prefect/deployments>",
    settings={
        "key": "minioadmin",
        "secret": "minioadmin",
        "client_kwargs": {"endpoint_url": "<http://minio:9000>"},
        "use_ssl": False,
    },
)

flow_function.from_source(
    source=remote,  # remote storage -> Prefect packages CWD and uploads here
    entrypoint="src/flows/audio_processing/workflow.py:YOUR_FLOW_FUNC",
).deploy(
    name=name,
    work_pool_name=pool_name,
)
If it still fails, paste: - The printouts of CWD and existence check from deploy.py - The contents of your .prefectignore/.gitignore (relevant bits) - The exact entrypoint string - The relevant Dockerfile lines (COPY and WORKDIR) We’ll corner it together.
n
i wouldnt put too much stock in what marvin is saying here, it seems trapped in a bit of a silly eddy here
are you using a containerized runtime here or not?
g
depends what you mean by your question. All of this runs inside a docker environment on IBM code engine, yes. But my goal is to push deployments to COS (IBM's s3) and have the worker pull code at runtime from there
n
is each flow run a separate container?
g
no, it's just a process worker
for my use case, process worker that pulls code from COS is better because less overhead
n
okay, so it seems like you want to override
command
job variable to do
uv run --with git+your-repo -m prefect.engine
g
I am trying to avoid using
.yaml
deployment scripts because it makes injecting ENV variables more difficult
n
well you can do the same thing with the
job_variables
dict on
.deploy
g
ok, I will try, thanks for the tip