https://prefect.io logo
Join Slack
Powered by
# ask-community
  • l

    li li

    07/06/2025, 5:22 PM
    Can you provide download function of Prefect document? For example, if you download a pdf file, the website of Prefect document will go down every once in a while, which affects my normal reading of the document.
    c
    • 2
    • 2
  • k

    Kiran

    07/06/2025, 8:11 PM
    hey @Marvin from prefect import flow, task from datetime import datetime from prefect.cache_policies import DEFAULT @task( #cache_key_fn=lambda _, __: datetime.utcnow().strftime("%Y-%m-%d"), #cache_key_fn=lambda ctx, _: f"{ctx.task_run.name}-{datetime.utcnow().strftime('%Y-%m-%d')}", #cache_key_fn=lambda ctx, _: f"{ctx.flow_run.name}-{datetime.utcnow().strftime('%Y-%m-%d')}", #cache_key_fn=lambda ctx, _: f"{ctx.flow_run.name}-{ctx.task_run.name}-{datetime.utcnow().strftime('%Y-%m-%d')}", cache_policy=DEFAULT, persist_result=True, retry_delay_seconds=60, # Wait 60 seconds before retrying on failure retries=3, # Allow 3 retries if the task fails log_prints=True ) def my_daily_task_1(): print("Executing task_1") # Your task logic here #raise ValueError("custom value error") return "Daily task result_1, custom result" # except Exception as e: # print(f"Task_1 failed: {e}") @task( #cache_key_fn=lambda _, __: datetime.utcnow().strftime("%Y-%m-%d"), #cache_key_fn=lambda ctx, _: f"{ctx.task_run.name}-{datetime.utcnow().strftime('%Y-%m-%d')}", #cache_key_fn=lambda ctx, _: f"{ctx.flow_run.name}-{datetime.utcnow().strftime('%Y-%m-%d')}", #cache_key_fn=lambda ctx, _: f"{ctx.flow_run.name}-{ctx.task_run.name}-{datetime.utcnow().strftime('%Y-%m-%d')}", cache_policy=DEFAULT, persist_result=True, retry_delay_seconds=60, # Wait 60 seconds before retrying on failure retries=3, # Allow 3 retries if the task fails log_prints=True ) def my_daily_task_2(): print("Executing task_2") # Your task logic here #raise ValueError("custom value error") return "Daily task result_2, custom result" # except Exception as e: # print(f"Task_2 failed: {e}") # raise Value @flow(log_prints=True,persist_result=True) def caching_test_flow(): #result = my_daily_task_2() #print(f"Task result: {result}") task_1_result=my_daily_task_1() print(f"task 1 result is {task_1_result}") task_2_result=my_daily_task_2() print(f"task 2 result is {task_2_result}") why is the code running again insted of caching for the retry
    m
    • 2
    • 5
  • a

    Akash N

    07/07/2025, 5:09 AM
    Hello Team, I have a scheduled deployment with runs every 5 mins on weekdays and every 15 mins on weekends. for the past 12 hours all the scheduled ones are getting cancelled. I have a docker based worker pool collision strategy is set to - collision_strategy=ConcurrencyLimitStrategy.CANCEL_NEW, concurrency is set to 1 I have also checked if there are any running flows but could not find and worker logs are also not helpful
  • a

    Akash N

    07/07/2025, 5:10 AM
    BTW the worker logs do not have date along with timestamp just timestamp are available - not helpful
  • c

    Cormac

    07/07/2025, 8:39 AM
    Hello Prefect Team, Overnight a scheduled job failed to fire - it remained stuck in pending. There are no logs or any indication why this has happened. Cancelling the job and re-running a new instance, it works just fine. Did something break overnight?
    n
    • 2
    • 1
  • g

    gee

    07/07/2025, 3:14 PM
    HI there, has anyone tried to setup a worker pool in runpod and managed to setup some sort of auto scaling?
  • a

    Aveek Duttagupta

    07/07/2025, 4:45 PM
    Hi team! I'm currently seeing the following error on one of my flow runs for a custom work pool on the Teams Plan:
    Copy code
    Container 'prefect' exited with non-zero exit code 126.
    This is the only error that I'm seeing from the flow run within my ECS task definition. Is there any other way to see more logs or more information on what specific command may be causing this? I don't see any other information within the logs on the UI or on cloudwatch other than the task being deprovisioned followed by this error
    j
    • 2
    • 11
  • e

    Eric

    07/07/2025, 4:55 PM
    Hello everyone, I am new to Prefect, and I am trying to make it fit what I want to do, but I find it very hard to find documentation or a video. I have two docker images in a private repo: •
    project-data
    : it is the project that stores the python scripts to extract, load data. It has a docker image. •
    project-orchestrator
    : it is the project containing the tasks and flows. It is running commands on docker images (like the docker image of
    project-data
    Does anyone have this kind of setup? Can this kind of setup work?
    n
    • 2
    • 8
  • p

    Patrick Mitchell

    07/07/2025, 5:58 PM
    Hi folks, I might just be looking in all the wrong places, but I cannot seem to find any valid documentation about handling failures of prefect futures. When building a prefect futures list, or a primitive list of prefect futures, and waiting to handle the results of these futures, if any future fails the parent flow will hang forever and never terminate - seemingly treating failed or crashed tasks as incomplete. Is there any documentation or example that shows how to correctly handle a failed/crashed task so that the flow may terminate (or continue, while handling failed tasks) EDIT: not FAILED, only CRASHED
    b
    n
    • 3
    • 10
  • j

    Jonah Duffin

    07/07/2025, 8:40 PM
    Thanks @Nate, I work with Tom and tried this. I'm running with the "dry run" switch just fine. We usually see two kinds of mistakes in the prefect.yaml file we would want to catch with this flag: 1. The
    entrypoint
    to a flow function is incorrect, perhaps after a file is moved 2. The
    parameters
    fail validation, because a string does not match an allowed list of enums, for example Your switch seems to catch case #1 and an exception is raised, but case #2 passes with your dry run flag and fails for an actual deployment, because it looks like this exception happens server-side rather than client-side:
    Copy code
    Traceback (most recent call last):
      File "/Users/jduffin/dev/git/eval/fastpass/.venv/lib/python3.10/site-packages/prefect/deployments/runner.py", line 388, in _create
        deployment_id = await client.create_deployment(**create_payload)
      File "/Users/jduffin/dev/git/eval/fastpass/.venv/lib/python3.10/site-packages/prefect/client/orchestration/_deployments/client.py", line 823, in create_deployment
        response = await self.request("POST", "/deployments/", json=payload)
      File "/Users/jduffin/dev/git/eval/fastpass/.venv/lib/python3.10/site-packages/prefect/client/orchestration/base.py", line 53, in request
        return await self._client.send(request)
      File "/Users/jduffin/dev/git/eval/fastpass/.venv/lib/python3.10/site-packages/prefect/client/base.py", line 361, in send
        response.raise_for_status()
      File "/Users/jduffin/dev/git/eval/fastpass/.venv/lib/python3.10/site-packages/prefect/client/base.py", line 162, in raise_for_status
        raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
    prefect.exceptions.PrefectHTTPStatusError: Client error '422 Unprocessable Entity' for url '<http://localhost:4200/api/deployments/>'
    Response: {'exception_message': 'Invalid request received.', 'exception_detail': [{'type': 'value_error', 'loc': ['body'], 'msg': "Value error, Validation failed for field 'harvest_dataset_identifier'. Failure reason: 'example_enum_value' is not valid under any of the given schemas", ....
    n
    • 2
    • 3
  • b

    Ben T

    07/07/2025, 9:01 PM
    Bumping this as it was sent on a holiday weekend 🙂
  • j

    José Agretti

    07/07/2025, 10:31 PM
    Hey team, I'm having this exact issue https://github.com/PrefectHQ/prefect/issues/17913 any way to fix this or avoid it? Thank you!
    k
    • 2
    • 2
  • m

    Martin

    07/08/2025, 7:28 AM
    Hey folk! To my knowledge task results are stored to
    ~/.prefect/storage
    by default. Is it possible to do something like:
    Copy code
    @task(
        persist_result=True,
        cache_policy=INPUTS,
        result_storage="~/storage/{flow.name}/{uuid}/",
    )
    def fetch_tolkeregning_details(uuid: str) -> dict:
        ...
    n
    • 2
    • 1
  • k

    Kiran

    07/08/2025, 9:07 AM
    hey @Marvin giev the documantation of prefect states manipulation, version3
    m
    • 2
    • 4
  • o

    Omar Khudeira

    07/08/2025, 8:18 PM
    @Martin how can I ensure that concurrency control slot is released if a prefect flow crashes while it’s running in our k8s environment?
    n
    m
    k
    • 4
    • 18
  • s

    Samuel Hinton

    07/09/2025, 3:19 AM
    Hi all! I'm trying to create my own custom worker that executes on HPC via Slurm, and Ive been trying to base this off the docker worker found in https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-docker/prefect_docker/worker.py One thing that has confused me is how cancellations are handled. I'd have assumed that if a user cancels a flow run, that would make its way to the worker and the worker would terminate the container. However, I cant seem to find any code which handles cancellation of flow runs. I assume Im missing something obvious - any chance someone can point me in the right direction?
    j
    • 2
    • 4
  • s

    Srinivas Kandukuri

    07/09/2025, 7:27 AM
    @Marvin we are currently dealing with a scenario where i'll having a deployment and that deployment will have one of the parameter called cfg where it will contains a string value which is dev1 which will automatyically connects to a /server/cfg/setup/dev1.cfg file and inside that file i have a configuration which will containes a ads server hostname that particular task connects to and execute the task. Now i want to move this part from deploymnet tlevel to workpool level. Because i have 6 ads's and i want to have these 6 ads's as 6 workpools so that i can connect to what ever pool i want while acalling run_deployment() and share the load. For example i have 1000'f of runs but the ads can handle only 200 connections at a time so this case i will add workpool concurranct so that the case will be achieved. is there any way to fullfill my scenario?
    m
    • 2
    • 9
  • f

    František

    07/09/2025, 7:57 AM
    [Secrets management “as code” 🔑🕵️‍♂️] I want to: • keep secrets in Azure Key Vault, or as code in GitLab secured repository separate from my prefect project ◦ Have CI/CD to deploy the secrets • Prefect to only use secrets from there ◦ At best disable any manual interactions with secret blocks — No possibility to overwrite a password in the ui etc. Does anyone have something like this implemented? Using: self hosted prefect 3, k8s worker
  • m

    Michael Michael

    07/09/2025, 12:42 PM
    Aloha Prefect Team ✌️ We are running a self-hosted Prefect instance backed by a Postgres database. Over time, we’ve noticed that our database is gradually filling up since, by default, there is no automatic cleanup of logs or historical run data. In our case, we only need to retain data for a maximum of 14 days and would like to delete anything older. Of course, without affecting the most recent and critical metadata (such as deployments, flow definitions, etc.). From what we understand, Prefect Cloud handles this automatically. Is there a recommended way to achieve similar cleanup in a self-hosted or open-source setup? Has anyone here already implemented such a strategy or have suggestions on best practices? Additionally, which tables in the Prefect database schema are most relevant for safely cleaning up old run-related data? Thanks 🙂
    • 1
    • 1
  • a

    Arthur Ren

    07/09/2025, 3:51 PM
    Good morning Prefect Team! So my usage case is: - On the upstream processing - We have multiple jobs that could be running in parallel (each with a unique job id) - Each job have a fixed number of sub tasks that all runs in async, I have configured the tasks to emit a prefect event with job id as payload on completion. - I want to run a “collection” prefect deployment with job id as a param when all sub tasks of that job have completed. When I’m search around I think
    CompoundTrigger
    is the closest tool for this kind of patten but I couldn’t find any examples around, does anyone ever use such pattern before? Thank you!
  • k

    Kiran

    07/10/2025, 5:10 AM
    Hey everyone and @Nate, Iam have triggered around 180 runs from ec2 with configuraion of 16vcpus and 32 gb memory,and iam using postgresql database, and my prefect flows are running in ecs, my prefect UI stopped showing any data at all, is this because of the number of runs submitted at once ? Also, when a higher number of flow runs are triggered,i see that some runs are getting submitted late and the responsde of UI is also a bit slow, is this the usual behavior?
  • k

    Kiran

    07/10/2025, 5:11 AM
    Hey @Marvin, Iam have triggered around 180 runs from ec2 with configuraion of 16vcpus and 32 gb memory,and iam using postgresql database, and my prefect flows are running in ecs, my prefect UI stopped showing any data at all, is this because of the number of runs submitted at once ? Also, when a higher number of flow runs are triggered,i see that some runs are getting submitted late and the responsde of UI is also a bit slow, is this the usual behavior?
    m
    n
    • 3
    • 8
  • s

    Srinivas Kandukuri

    07/10/2025, 10:12 AM
    @Marvin, Hi i'm using prefect with postgresql aurora serverless v2 database. I have hosted prefect service as a ecs task with 1vcpu and 2gb memory. Now i'm trying to trigger 700 flows at atime. Durimng that time i'm observing that there is a slowness in the UI. Late response and also i see flows are submitting a bit late like 2mins. How to overcome this I want to submit any no of flows at once and dont want slowness. In real time i'll be running 1000's of flows at a time.
    m
    • 2
    • 2
  • a

    Ankit

    07/10/2025, 11:02 AM
    Hi Prefect team, Which channel should I reach out in for some billing related questions?
    • 1
    • 1
  • r

    Russell Brooks

    07/10/2025, 12:16 PM
    Self hosted questions We’ve got a version of Prefect server up and in our kubernetes nameapace. That’s the good news. The bad news is that there are no Blocks in the Catalog page when we go to add, e.g. Secret. It’s empty. Also the Setting page is mangled with raw html. And I am pretty sure there was a Can’t connect to Server API warning in the Dashboard as well. Any tips on how to fix these?
  • s

    Srinivas Kandukuri

    07/10/2025, 1:17 PM
    @Marvin Worker 'ECSWorker 03568b81-9dfc-4f67-bca9-9cbda7485525' submitting flow run '0e3e16a7-06e2-41d2-8db4-ec9189dfbf87' 031122 PM prefect.flow_runs.worker Failed to submit flow run '0e3e16a7-06e2-41d2-8db4-ec9189dfbf87' to infrastructure. Traceback (most recent call last): File "/home/ec2-user/.local/lib/python3.9/site-packages/prefect/workers/base.py", line 1008, in _submit_run_and_capture_errors submitted_event = self._emit_flow_run_submitted_event(configuration) File "/home/ec2-user/.local/lib/python3.9/site-packages/prefect/workers/base.py", line 1306, in _emit_flow_run_submitted_event return emit_event( File "/home/ec2-user/.local/lib/python3.9/site-packages/prefect/events/utilities.py", line 87, in emit_event worker_instance.send(event_obj) File "/home/ec2-user/.local/lib/python3.9/site-packages/prefect/_internal/concurrency/services.py", line 293, in send raise RuntimeError("Cannot put items in a stopped service instance.") RuntimeError: Cannot put items in a stopped service instance. 031129 PM prefect.flow_runs.worker Reported flow run '0e3e16a7-06e2-41d2-8db4-ec9189dfbf87' as crashed: Flow run could not be submitted to infrastructure: RuntimeError('Cannot put items in a stopped service instance.')
    m
    • 2
    • 2
  • k

    Kevin Hu

    07/10/2025, 2:52 PM
    any experience running prefect flows with uv hanging? it seems to start the uv run, and then hangs
    uv run python -m prefect.engine
    i know it's trying to run the flow because if the script is not there it throws an error. but it never actually executes my flow.
    n
    • 2
    • 9
  • j

    Joe Blauer

    07/10/2025, 8:25 PM
    @Marvin is there any way to pass a variable to a @materialize decorator?
    m
    n
    • 3
    • 7
  • d

    David Martin Calalang

    07/10/2025, 8:42 PM
    Has anyone had any issues with pickle/serialization? I'm getting the following error from a flow failure (note that
    square
    is the name of the task that I'm passing to my flow...
    Copy code
    Traceback (most recent call last):
      File "/usr/local/lib/python3.13/site-packages/distributed/protocol/pickle.py", line 60, in dumps
        result = pickle.dumps(x, **dump_kwargs)
    _pickle.PicklingError: Can't pickle <function square at 0x7b68d25cd1c0>: it's not the same object as __main__.square
    For more context, I'm using the dask task_runner with a pre-existing Fargate cluster I have on AWS ECS. From my understanding prefect serializes tasks prior to sending them to the dask scheduler, and that might be where my issue is. Not quite sure where to look/what to fix in my code regarding this though. Thanks!
  • s

    Samuel Hinton

    07/11/2025, 4:37 AM
    Hi team! I'm trying to help a group of academics adopt Prefect for workflow management (because cron is not the answer lol). As part of that, I assume they will want a custom worker to be coded up so they and their students can run things on in-house compute. My question is whether this is available on the Starter tier, or whether custom workers are locked behind a higher tier? I was hoping to demonstrate how it could work just using the Hobby tier, but it seems like this is definitely not possible