<@ULVA73B9P> when running a background task served...
# ask-marvin
s
@Marvin when running a background task served by a worker container by calling
.delay()
, it uses the local_storage_path to store necessary data to run the task. It means that the worker should have access to the same file storage to get required data and start the task. Is it possible to use something else to share that data ? the database or a redis ? What is the appropriate setup on a kubernetes environment ?
m
thought for 584.4 seconds
Great question — you’re exactly right about the default behavior. Summary - By default,
task.delay(...)
writes a small “submission payload” (parameters + metadata) to local disk using Prefect’s results/scheduling storage settings. If your app and your task worker run in different pods, they will not see each other’s local files. - You do not need a shared filesystem. The recommended approach on Kubernetes is to point “task scheduling storage” to remote object storage (S3/GCS/Azure or any fsspec-compatible filesystem), so both the producer and the worker read/write to the same location. - Redis/Databases are not supported for this storage today. Prefect uses file/object storage for task submission payloads and results. How to configure storage for .delay - Set a default storage block for task scheduling so the submission payloads are written to shared object storage: 1) Create a storage block (examples):
Copy code
# S3
  from prefect_aws.s3 import S3Bucket
  S3Bucket(bucket="my-bucket", basepath="prefect-tasks/").save("tasks-scheduling", overwrite=True)

  # GCS
  from prefect_gcp.cloud_storage import GcsBucket
  GcsBucket(bucket="my-bucket", basepath="prefect-tasks/").save("tasks-scheduling", overwrite=True)

  # Azure
  from prefect_azure.blob_storage import AzureBlobStorageContainer
  AzureBlobStorageContainer(container="my-container", basepath="prefect-tasks/").save("tasks-scheduling", overwrite=True)

  # Generic remote filesystem via fsspec (e.g., s3://, gs://, abfs://, etc.)
  from prefect.filesystems import RemoteFileSystem
  RemoteFileSystem(basepath="<s3://my-bucket/prefect-tasks/>").save("tasks-scheduling", overwrite=True)
2) Point the task worker system to that block in both your app and worker containers:
Copy code
# Use the block document slug you created above (check in the UI or via `prefect block ls`)
  PREFECT_TASKS_SCHEDULING_DEFAULT_STORAGE_BLOCK=s3-bucket/tasks-scheduling
- Optional, but recommended: set your default result storage as well (used when
persist_result=True
):
Copy code
PREFECT_RESULTS_DEFAULT_STORAGE_BLOCK=s3-bucket/results
or if you insist on a shared PVC, set
Copy code
PREFECT_RESULTS_LOCAL_STORAGE_PATH=/mnt/shared/prefect
mounted into both pods. On Kubernetes this is less portable than object storage. Notes on Redis/DB - Redis and databases are not supported as backends for task scheduling storage or results. Prefect writes files/objects, not records, for these features. There is a Redis block in Prefect, but it is not used for
.delay()
payload storage or results. - Stick to object storage or a shared filesystem (PVC) if object storage is not an option. Kubernetes-friendly setup - Recommended architecture: - Your web/API pod calls
my_task.delay(...)
and has: - `PREFECT_API_URL`/`PREFECT_API_KEY` set -
PREFECT_TASKS_SCHEDULING_DEFAULT_STORAGE_BLOCK
pointing to your bucket block - Cloud credentials to read/write the bucket - A separate Deployment runs your task worker process that imports your task code and serves it:
Copy code
# worker.py
    from myapp.tasks import my_task, other_task
    from prefect.task_worker import serve

    if __name__ == "__main__":
        serve(my_task, other_task)
Container command:
Copy code
python worker.py
Environment: - `PREFECT_API_URL`/`PREFECT_API_KEY` -
PREFECT_TASKS_SCHEDULING_DEFAULT_STORAGE_BLOCK
- Same cloud credentials to the bucket - Package your task code into the worker image (or install it as a Python package) so the worker can import it; do not rely on shared filesystems for code. When to use a PVC instead - If you cannot use object storage, use a ReadWriteMany PVC (e.g., NFS/Filestore/EFS), mount it at the same path in both pods, and set:
Copy code
PREFECT_RESULTS_LOCAL_STORAGE_PATH=/mnt/shared/prefect
This works, but object storage is generally simpler and more scalable on K8s. Docs
- Background tasks: Prefect 3 background tasks - Results and storage: Prefect 3 results and storage - Kubernetes guidance: Prefect on Kubernetes Would you like an example Kubernetes Deployment/Pod spec for the worker with the right env vars, or help creating the storage block for your cloud (S3/GCS/Azure)? Which storage do you prefer?
s
@Marvin Is it required to persist the submission payload and result in a block storage to use background tasks ?
m
thought for 31.1 seconds
Short answer: no. Details - Submission payload: Prefect must persist the task’s submission payload somewhere the worker can read it. That can be: - Local filesystem (works only if your app and the worker share the same filesystem — e.g., same pod/container or a shared PVC), or - Remote/object storage via a storage block (recommended on Kubernetes). - You do not have to use a “block” if you’re sharing a local volume, but on K8s it’s best to use object storage. - Results: Persisting the returned Python value is optional. Background tasks will run and report state/logs without persisting results. You only need persisted results if you want to retrieve the return value later or you’ve set
persist_result=True
. What to configure - If your app and worker are in different pods/nodes, configure shared storage for task submissions: - Create a storage block (S3/GCS/Azure/RemoteFileSystem) and set in both app and worker:
Copy code
PREFECT_TASKS_SCHEDULING_DEFAULT_STORAGE_BLOCK=<block-slug>
- Result persistence (optional): - Object storage:
Copy code
PREFECT_RESULTS_DEFAULT_STORAGE_BLOCK=<block-slug>
- Or a shared volume:
Copy code
PREFECT_RESULTS_LOCAL_STORAGE_PATH=/mnt/shared/prefect
Not supported - Redis/DB are not supported backends for submission payloads or results; use file/object storage. Docs - Background tasks: Background tasks - Results and storage: Results and storage If you’re on Kubernetes and your app and worker are separate pods, do you prefer S3/GCS/Azure, or should I show a PVC-based example?
s
@Marvin How can we set a retention policy for that created data ?
m
thought for 105.9 seconds
Short answer: Prefect does not enforce a time-based retention policy itself. Use your storage’s lifecycle rules (recommended), plus a small Prefect setting for cleanup of failed submissions. What you can control today - Underlying object storage lifecycle (recommended) - Configure lifecycle rules on S3/GCS/Azure to expire or transition objects under the prefixes you use for: - Task scheduling payloads (e.g., s3://my-bucket/prefect-tasks/) - Results (e.g., s3://my-bucket/prefect-results/) - This is the most reliable way to enforce retention by age/size. - Delete failed submissions automatically - Prefect has a setting to clean up submission payloads that failed to enqueue/execute: - Env var:
PREFECT_TASKS_SCHEDULING_DELETE_FAILED_SUBMISSIONS
(defaults to true) - Manual or periodic cleanup - If you’re using a shared PVC or local filesystem, add a cron job/Kubernetes CronJob to delete old files under the scheduling/results directories. What Prefect does not do - No built-in TTL for submission payloads or results based on age. - No Redis/DB retention because those backends aren’t used for
.delay()
payloads or results. Typical Kubernetes setup - Use object storage for both submission payloads and results. - Set lifecycle rules (e.g., delete objects older than N days) on the prefixes you configured via: -
PREFECT_TASKS_SCHEDULING_DEFAULT_STORAGE_BLOCK
-
PREFECT_RESULTS_DEFAULT_STORAGE_BLOCK
If you share which backend you’re using (S3, GCS, Azure, or PVC), I can suggest a concrete lifecycle/cleanup config and example policies.
s
@Marvin how should I specify credentials to prefect for connecting to S3 ?
m
thought for 165.3 seconds
Great question. In Prefect 3.x (you’re on 3.4.14), you have a few good options for supplying AWS credentials for S3. Pick the one that best matches your runtime environment and security requirements. Recommended options (in order of preference): 1) Use AWS’s default credential chain (no Prefect-specific config) - Works automatically if your runtime already has credentials: - IAM role on the compute (EC2 instance profile, ECS task role, EKS IRSA, etc.) - Environment variables:
AWS_ACCESS_KEY_ID
,
AWS_SECRET_ACCESS_KEY
, optional
AWS_SESSION_TOKEN
, and `AWS_REGION`/`AWS_DEFAULT_REGION` - AWS profile via
AWS_PROFILE
(and
~/.aws/credentials
) - Prefect’s S3 integrations (via prefect-aws) use boto3/botocore, so they follow this chain automatically. 2) Use an AwsCredentials block (store keys/assume-role in Prefect) - Create an AwsCredentials block in the UI or via code and reference it from S3 operations. - Useful when you want Prefect to manage the secret and keep code/config cleaner. Example: create credentials and an S3Bucket block, then use it in a flow
Copy code
# install plugin if needed:
# pip install "prefect-aws"

from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.s3 import S3Bucket

# One-time setup (run once to save blocks)
aws_creds = AwsCredentials(
    aws_access_key_id="YOUR_KEY_ID",
    aws_secret_access_key="YOUR_SECRET",
    region_name="us-east-1",
    # optional:
    # session_token="...",
    # profile_name="myprofile",
    # role_arn="arn:aws:iam::123456789012:role/my-role-to-assume",
    # session_name="prefect-session",
)
aws_creds.save("my-aws-creds", overwrite=True)

s3_block = S3Bucket(
    bucket_name="my-bucket",
    credentials=aws_creds,
)
s3_block.save("my-s3", overwrite=True)

# In your flow, just load and use
@flow
def example():
    s3 = S3Bucket.load("my-s3")
    # Example operations (pick what fits your use case):
    # s3.upload_from_path("local/path/file.csv", "incoming/file.csv")
    # s3.download_object_to_path("incoming/file.csv", "local/path/file.csv")
    # s3.write_path("folder/hello.txt", b"hello")
    # data = s3.read_path("folder/hello.txt")
    return "done"

example()
If you prefer to call helper functions rather than a block:
Copy code
from prefect_aws import AwsCredentials
from prefect_aws.s3 import upload_to_bucket, download_from_bucket

aws_creds = AwsCredentials.load("my-aws-creds")

# Upload a local file to <s3://my-bucket/incoming/file.csv>
upload_to_bucket(
    bucket="my-bucket",
    key="incoming/file.csv",
    filename="local/path/file.csv",
    credentials=aws_creds,
)

# Download <s3://my-bucket/incoming/file.csv> to local/path/file.csv
download_from_bucket(
    bucket="my-bucket",
    key="incoming/file.csv",
    filename="local/path/file.csv",
    credentials=aws_creds,
)
3) Build a boto3 client from your Prefect credentials - Handy if you need custom S3 calls not covered by the helpers/blocks:
Copy code
from prefect_aws import AwsCredentials
from prefect_aws.s3 import get_s3_client

aws_creds = AwsCredentials.load("my-aws-creds")
s3_client = get_s3_client(credentials=aws_creds)
resp = s3_client.list_buckets()
Where to put credentials depending on where your flows run - Local runs: export env vars or use an AWS profile; or use an AwsCredentials block. - Docker/Kubernetes workers: mount env vars/secrets into the container/pod; or load an AwsCredentials block in your flow. - ECS: prefer Task Role for least-privilege, or inject env vars; AwsCredentials block also works. Docs and references - Prefect AWS integration overview (including S3 and authentication): Prefect AWS Integration docs - Storing secrets with Prefect: Store secrets in Prefect If you can share where your flows/workers run (local, Docker, K8s, ECS) and whether you prefer IAM roles or static keys, I can give you a concrete, minimal setup for that environment.
s
@Marvin Ok I have aws credentials already setup on my workers and task schedulers. Is setting up the env variable like this:
PREFECT_TASKS_SCHEDULING_DEFAULT_STORAGE_BLOCK=s3://<bucket-name>/<base-path>
and removing the
PREFECT_LOCAL_STORAGE_PATH=/task-storage
is sufficient for the system to use that automatically ? Or should I call something in my code and flows to use it
m
thought for 543.2 seconds
Great question. Short answer: don’t point that setting at a raw s3:// URL. In Prefect 3.4.14, the task-scheduling storage setting expects a Prefect File System block reference, not a URI. You do not need to change your flows/tasks code; just configure the block and the env var in the right processes. What to do - Create a File System block that points to your S3 path (recommended: Remote File System) - Via code:
Copy code
from prefect.filesystems import RemoteFileSystem

    # Uses instance role or AWS env vars; ensure s3fs is installed in this env
    RemoteFileSystem(basepath="s3://<bucket-name>/<base-path>").save(
        "task-scheduling", overwrite=True
    )
- Or via the UI: Blocks > File Systems > Remote file system > basepath = s3://<bucket>/<prefix> > save with a name - Set the env var to the block slug in every process that needs to read/write the task-scheduling store (the process calling .delay(), any task worker processes):
Copy code
PREFECT_TASKS_SCHEDULING_DEFAULT_STORAGE_BLOCK=remote-file-system/task-scheduling
If you use the S3 block from prefect-aws instead, the slug would look like
s3-bucket/<your-block-name>
. - Remove the local path setting. Note: in Prefect 3.x the correct variable name is
PREFECT_RESULTS_LOCAL_STORAGE_PATH
, not
PREFECT_LOCAL_STORAGE_PATH
. If you were setting
PREFECT_LOCAL_STORAGE_PATH
, it’s ignored by 3.x anyway. With the S3 block set, you do not need a local path for task scheduling. - No code changes required. Your
.delay()
and
.result_async()
calls will automatically use the configured task-scheduling storage as long as both the producer and consumer processes have the same setting. Gotchas - Ensure
s3fs
is installed anywhere that needs to access S3 (flow runtime, worker, and task-scheduler environments). If you’re using the prefect-aws S3 block, ensure that dependency is installed too. - Make sure your AWS credentials are available in those environments (sounds like you’ve got this covered). Optional: default result storage for persisted task/flow results - If you also want default result persistence to S3 (beyond just task-scheduling handoff), set:
Copy code
PREFECT_RESULTS_DEFAULT_STORAGE_BLOCK=remote-file-system/task-scheduling
or another block slug of your choice. Verify your settings - You can confirm what Prefect sees at runtime:
Copy code
prefect config view --hide-defaults --show-sources
Look for
PREFECT_TASKS_SCHEDULING_DEFAULT_STORAGE_BLOCK
and confirm it points to your block slug. Docs - Settings and profiles - Results and result storage If you’d prefer to use an S3 block instead of Remote File System, I can share the exact steps for that as well—just let me know your preference.