Syméon del Marmol
08/27/2025, 8:05 AM.delay()
, it uses the local_storage_path to store necessary data to run the task.
It means that the worker should have access to the same file storage to get required data and start the task.
Is it possible to use something else to share that data ? the database or a redis ? What is the appropriate setup on a kubernetes environment ?Marvin
08/27/2025, 8:05 AMMarvin
08/27/2025, 8:15 AMtask.delay(...)
writes a small “submission payload” (parameters + metadata) to local disk using Prefect’s results/scheduling storage settings. If your app and your task worker run in different pods, they will not see each other’s local files.
- You do not need a shared filesystem. The recommended approach on Kubernetes is to point “task scheduling storage” to remote object storage (S3/GCS/Azure or any fsspec-compatible filesystem), so both the producer and the worker read/write to the same location.
- Redis/Databases are not supported for this storage today. Prefect uses file/object storage for task submission payloads and results.
How to configure storage for .delay
- Set a default storage block for task scheduling so the submission payloads are written to shared object storage:
1) Create a storage block (examples):
# S3
from prefect_aws.s3 import S3Bucket
S3Bucket(bucket="my-bucket", basepath="prefect-tasks/").save("tasks-scheduling", overwrite=True)
# GCS
from prefect_gcp.cloud_storage import GcsBucket
GcsBucket(bucket="my-bucket", basepath="prefect-tasks/").save("tasks-scheduling", overwrite=True)
# Azure
from prefect_azure.blob_storage import AzureBlobStorageContainer
AzureBlobStorageContainer(container="my-container", basepath="prefect-tasks/").save("tasks-scheduling", overwrite=True)
# Generic remote filesystem via fsspec (e.g., s3://, gs://, abfs://, etc.)
from prefect.filesystems import RemoteFileSystem
RemoteFileSystem(basepath="<s3://my-bucket/prefect-tasks/>").save("tasks-scheduling", overwrite=True)
2) Point the task worker system to that block in both your app and worker containers:
# Use the block document slug you created above (check in the UI or via `prefect block ls`)
PREFECT_TASKS_SCHEDULING_DEFAULT_STORAGE_BLOCK=s3-bucket/tasks-scheduling
- Optional, but recommended: set your default result storage as well (used when persist_result=True
):
PREFECT_RESULTS_DEFAULT_STORAGE_BLOCK=s3-bucket/results
or if you insist on a shared PVC, set
PREFECT_RESULTS_LOCAL_STORAGE_PATH=/mnt/shared/prefect
mounted into both pods. On Kubernetes this is less portable than object storage.
Notes on Redis/DB
- Redis and databases are not supported as backends for task scheduling storage or results. Prefect writes files/objects, not records, for these features. There is a Redis block in Prefect, but it is not used for .delay()
payload storage or results.
- Stick to object storage or a shared filesystem (PVC) if object storage is not an option.
Kubernetes-friendly setup
- Recommended architecture:
- Your web/API pod calls my_task.delay(...)
and has:
- `PREFECT_API_URL`/`PREFECT_API_KEY` set
- PREFECT_TASKS_SCHEDULING_DEFAULT_STORAGE_BLOCK
pointing to your bucket block
- Cloud credentials to read/write the bucket
- A separate Deployment runs your task worker process that imports your task code and serves it:
# worker.py
from myapp.tasks import my_task, other_task
from prefect.task_worker import serve
if __name__ == "__main__":
serve(my_task, other_task)
Container command:
python worker.py
Environment:
- `PREFECT_API_URL`/`PREFECT_API_KEY`
- PREFECT_TASKS_SCHEDULING_DEFAULT_STORAGE_BLOCK
- Same cloud credentials to the bucket
- Package your task code into the worker image (or install it as a Python package) so the worker can import it; do not rely on shared filesystems for code.
When to use a PVC instead
- If you cannot use object storage, use a ReadWriteMany PVC (e.g., NFS/Filestore/EFS), mount it at the same path in both pods, and set:
PREFECT_RESULTS_LOCAL_STORAGE_PATH=/mnt/shared/prefect
This works, but object storage is generally simpler and more scalable on K8s.
DocsMarvin
08/27/2025, 8:15 AMSyméon del Marmol
08/27/2025, 8:24 AMMarvin
08/27/2025, 8:25 AMMarvin
08/27/2025, 8:26 AMpersist_result=True
.
What to configure
- If your app and worker are in different pods/nodes, configure shared storage for task submissions:
- Create a storage block (S3/GCS/Azure/RemoteFileSystem) and set in both app and worker:
PREFECT_TASKS_SCHEDULING_DEFAULT_STORAGE_BLOCK=<block-slug>
- Result persistence (optional):
- Object storage:
PREFECT_RESULTS_DEFAULT_STORAGE_BLOCK=<block-slug>
- Or a shared volume:
PREFECT_RESULTS_LOCAL_STORAGE_PATH=/mnt/shared/prefect
Not supported
- Redis/DB are not supported backends for submission payloads or results; use file/object storage.
Docs
- Background tasks: Background tasks
- Results and storage: Results and storage
If you’re on Kubernetes and your app and worker are separate pods, do you prefer S3/GCS/Azure, or should I show a PVC-based example?Syméon del Marmol
08/27/2025, 8:27 AMMarvin
08/27/2025, 8:28 AMMarvin
08/27/2025, 8:30 AMPREFECT_TASKS_SCHEDULING_DELETE_FAILED_SUBMISSIONS
(defaults to true)
- Manual or periodic cleanup
- If you’re using a shared PVC or local filesystem, add a cron job/Kubernetes CronJob to delete old files under the scheduling/results directories.
What Prefect does not do
- No built-in TTL for submission payloads or results based on age.
- No Redis/DB retention because those backends aren’t used for .delay()
payloads or results.
Typical Kubernetes setup
- Use object storage for both submission payloads and results.
- Set lifecycle rules (e.g., delete objects older than N days) on the prefixes you configured via:
- PREFECT_TASKS_SCHEDULING_DEFAULT_STORAGE_BLOCK
- PREFECT_RESULTS_DEFAULT_STORAGE_BLOCK
If you share which backend you’re using (S3, GCS, Azure, or PVC), I can suggest a concrete lifecycle/cleanup config and example policies.Syméon del Marmol
08/29/2025, 5:49 PMMarvin
08/29/2025, 5:49 PMMarvin
08/29/2025, 5:52 PMAWS_ACCESS_KEY_ID
, AWS_SECRET_ACCESS_KEY
, optional AWS_SESSION_TOKEN
, and `AWS_REGION`/`AWS_DEFAULT_REGION`
- AWS profile via AWS_PROFILE
(and ~/.aws/credentials
)
- Prefect’s S3 integrations (via prefect-aws) use boto3/botocore, so they follow this chain automatically.
2) Use an AwsCredentials block (store keys/assume-role in Prefect)
- Create an AwsCredentials block in the UI or via code and reference it from S3 operations.
- Useful when you want Prefect to manage the secret and keep code/config cleaner.
Example: create credentials and an S3Bucket block, then use it in a flow
# install plugin if needed:
# pip install "prefect-aws"
from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.s3 import S3Bucket
# One-time setup (run once to save blocks)
aws_creds = AwsCredentials(
aws_access_key_id="YOUR_KEY_ID",
aws_secret_access_key="YOUR_SECRET",
region_name="us-east-1",
# optional:
# session_token="...",
# profile_name="myprofile",
# role_arn="arn:aws:iam::123456789012:role/my-role-to-assume",
# session_name="prefect-session",
)
aws_creds.save("my-aws-creds", overwrite=True)
s3_block = S3Bucket(
bucket_name="my-bucket",
credentials=aws_creds,
)
s3_block.save("my-s3", overwrite=True)
# In your flow, just load and use
@flow
def example():
s3 = S3Bucket.load("my-s3")
# Example operations (pick what fits your use case):
# s3.upload_from_path("local/path/file.csv", "incoming/file.csv")
# s3.download_object_to_path("incoming/file.csv", "local/path/file.csv")
# s3.write_path("folder/hello.txt", b"hello")
# data = s3.read_path("folder/hello.txt")
return "done"
example()
If you prefer to call helper functions rather than a block:
from prefect_aws import AwsCredentials
from prefect_aws.s3 import upload_to_bucket, download_from_bucket
aws_creds = AwsCredentials.load("my-aws-creds")
# Upload a local file to <s3://my-bucket/incoming/file.csv>
upload_to_bucket(
bucket="my-bucket",
key="incoming/file.csv",
filename="local/path/file.csv",
credentials=aws_creds,
)
# Download <s3://my-bucket/incoming/file.csv> to local/path/file.csv
download_from_bucket(
bucket="my-bucket",
key="incoming/file.csv",
filename="local/path/file.csv",
credentials=aws_creds,
)
3) Build a boto3 client from your Prefect credentials
- Handy if you need custom S3 calls not covered by the helpers/blocks:
from prefect_aws import AwsCredentials
from prefect_aws.s3 import get_s3_client
aws_creds = AwsCredentials.load("my-aws-creds")
s3_client = get_s3_client(credentials=aws_creds)
resp = s3_client.list_buckets()
Where to put credentials depending on where your flows run
- Local runs: export env vars or use an AWS profile; or use an AwsCredentials block.
- Docker/Kubernetes workers: mount env vars/secrets into the container/pod; or load an AwsCredentials block in your flow.
- ECS: prefer Task Role for least-privilege, or inject env vars; AwsCredentials block also works.
Docs and references
- Prefect AWS integration overview (including S3 and authentication): Prefect AWS Integration docs
- Storing secrets with Prefect: Store secrets in Prefect
If you can share where your flows/workers run (local, Docker, K8s, ECS) and whether you prefer IAM roles or static keys, I can give you a concrete, minimal setup for that environment.Syméon del Marmol
08/30/2025, 12:02 PMPREFECT_TASKS_SCHEDULING_DEFAULT_STORAGE_BLOCK=s3://<bucket-name>/<base-path>
and removing the PREFECT_LOCAL_STORAGE_PATH=/task-storage
is sufficient for the system to use that automatically ?
Or should I call something in my code and flows to use itMarvin
08/30/2025, 12:03 PMMarvin
08/30/2025, 12:12 PMfrom prefect.filesystems import RemoteFileSystem
# Uses instance role or AWS env vars; ensure s3fs is installed in this env
RemoteFileSystem(basepath="s3://<bucket-name>/<base-path>").save(
"task-scheduling", overwrite=True
)
- Or via the UI: Blocks > File Systems > Remote file system > basepath = s3://<bucket>/<prefix> > save with a name
- Set the env var to the block slug in every process that needs to read/write the task-scheduling store (the process calling .delay(), any task worker processes):
PREFECT_TASKS_SCHEDULING_DEFAULT_STORAGE_BLOCK=remote-file-system/task-scheduling
If you use the S3 block from prefect-aws instead, the slug would look like s3-bucket/<your-block-name>
.
- Remove the local path setting. Note: in Prefect 3.x the correct variable name is PREFECT_RESULTS_LOCAL_STORAGE_PATH
, not PREFECT_LOCAL_STORAGE_PATH
. If you were setting PREFECT_LOCAL_STORAGE_PATH
, it’s ignored by 3.x anyway. With the S3 block set, you do not need a local path for task scheduling.
- No code changes required. Your .delay()
and .result_async()
calls will automatically use the configured task-scheduling storage as long as both the producer and consumer processes have the same setting.
Gotchas
- Ensure s3fs
is installed anywhere that needs to access S3 (flow runtime, worker, and task-scheduler environments). If you’re using the prefect-aws S3 block, ensure that dependency is installed too.
- Make sure your AWS credentials are available in those environments (sounds like you’ve got this covered).
Optional: default result storage for persisted task/flow results
- If you also want default result persistence to S3 (beyond just task-scheduling handoff), set:
PREFECT_RESULTS_DEFAULT_STORAGE_BLOCK=remote-file-system/task-scheduling
or another block slug of your choice.
Verify your settings
- You can confirm what Prefect sees at runtime:
prefect config view --hide-defaults --show-sources
Look for PREFECT_TASKS_SCHEDULING_DEFAULT_STORAGE_BLOCK
and confirm it points to your block slug.
Docs
- Settings and profiles
- Results and result storage
If you’d prefer to use an S3 block instead of Remote File System, I can share the exact steps for that as well—just let me know your preference.