https://prefect.io logo
Join Slack
Powered by
# ask-community
  • k

    Kiran

    09/24/2025, 6:41 AM
    hi @Marvin from prefect import flow, task from prefect.task_runners import ThreadPoolTaskRunner import asyncio @task(log_prints=True) async def ps_task(name, delay=5): print(f"Running {name}") if name == "PS_TPLSweep_General_PSwaps": raise Exception("custom failure") if name == "PS_Extend_General_PSwaps": # extra delay for this specific task, non-blocking await asyncio.sleep(10) pass await asyncio.sleep(delay) return f"{name} done" @task(log_prints=True) def skipping_task(task_name): # Marker/log task for a skipped group return f"{task_name} group skipped" async def execute_group(group): """ Runs tasks in a group sequentially. Returns True if the entire group completed, False if it halted early due to a failure. """ previous_future = None for task_name in group: fut = ps_task.with_options(task_run_name=task_name).submit( task_name, wait_for=[previous_future] if previous_future else None, return_state=True, ) # Non-blocking wait for this task to reach a final state #state = await fut.wait() if fut.is_completed(): print(f"{task_name} completed") previous_future = fut else: print(f"{task_name} failed → skipping rest of group") skipping_task.submit(task_name) return False return True async def main(): client_tasks = [ ["PS_Extend_General_PSwaps", "PS_TPLSweep_General_PSwaps"], ["PS_Generate", "PS_MTM", "PS_Sweeping"], ["PS_LoanAccountSweeper"], ["PS_Reporting"], # final group ] # Run all but last group concurrently non_final_groups = client_tasks[:-1] tasks=[execute_group(g) for g in non_final_groups] results = await asyncio.gather(*tasks) # Run final group only if all others completed if all(results): await execute_group(client_tasks[-1]) else: print("At least one group failed; skipping final group") @flow(task_runner=ThreadPoolTaskRunner(max_workers=8), log_prints=True) async def client_flow(): # async def main(): # client_tasks = [ # ["PS_Extend_General_PSwaps", "PS_TPLSweep_General_PSwaps"], # ["PS_Generate", "PS_MTM", "PS_Sweeping"], # ["PS_LoanAccountSweeper"], # ["PS_Reporting"], # final group # ] # # Run all but last group concurrently # non_final_groups = client_tasks[:-1] # tasks=[execute_group(g) for g in non_final_groups] # results = await asyncio.gather(*tasks) # # Run final group only if all others completed # if all(results): # await execute_group(client_tasks[-1]) # else: # print("At least one group failed; skipping final group") asyncio.run(main()) if name == "__main__": # Call the flow directly (do not use asyncio.run for Prefect flows) client_flow() i am unable to trigger all groups cocurrently, how can i do it in this code
    m
    • 2
    • 37
  • k

    Kiran

    09/24/2025, 10:18 AM
    @Marvin how to check if a future is completed succesfully or not
    m
    • 2
    • 2
  • k

    Kiran

    09/24/2025, 2:02 PM
    @Marvin from prefect import task,flow from prefect.states import Cancelled @task(log_prints=True) def skipping_task(): return Cancelled(message=f"skipping", name="SKIPPING") @flow(log_prints=True) def my_flow(): skipping_task() my_flow() skipping is working here, but not working in this code from prefect import flow, task from prefect.task_runners import ConcurrentTaskRunner import time from prefect.utilities.annotations import allow_failure #from prefect.states import State, Cancelled, get_state_result from prefect.states import Cancelled @task(log_prints=True) def skipping_task(task_name): return Cancelled(message=f'skipping task {task_name}' , name="SKIPPING") # ------------------------- # Define Tasks # ------------------------- @task(log_prints=True) def ps_task(name, previous_task=None,previous_state=None, delay=20): # # Handle upstream info defensively: it might be a State, a resolved value, or even an exception # if previous_state is not None: # print(f"[{name}] previous_state: {previous_state!r} (type={type(previous_state)})") # if isinstance(previous_state, State): # # We received the upstream State (expected when using allow_failure on the param) # if not previous_state.is_completed(): # return Cancelled(message=f"skipping {name}: upstream not completed") # # If you need the upstream result value: # try: # upstream_value = get_state_result(previous_state) # print(f"[{name}] upstream result: {upstream_value!r}") # except Exception as exc: # # If the upstream failed or its result cannot be read, decide what to do # return Cancelled(message=f"skipping {name}: could not read upstream result: {exc!r}") # else: # # We did not receive a State. Prefect resolved the value before passing it. # # At this point, it's already "successful" from Prefect's perspective. # # If it's an exception instance, treat it as failure; if it's a normal value, continue. # if isinstance(previous_state, BaseException): # return Cancelled(message=f"skipping {name}: upstream exception {previous_state!r}") # else: proceed as success print(f"previous task is {previous_task}") if previous_state is not None: if previous_task is not None: #if not "done" in previous_state: if previous_state!=f"{previous_task} done": print("reached") # return Cancelled(message=f"skipping {name}: upstream not completed", name="SKIPPING") #return Cancelled(message=f'skipping task {name}' , name="SKIPPING") #skipping_task(name) return Cancelled(message=f"skipping", name="SKIPPING") print(f"Running {name}") if name == "PS_MTM": raise Exception("custom failure") time.sleep(delay) return f"{name} done" # ------------------------- # Flow Definition # ------------------------- @flow(task_runner=ConcurrentTaskRunner(), log_prints=True) def client_flow(): client_tasks = [ ["PS_Extend_General_PSwaps", "PS_TPLSweep_General_PSwaps"], ["PS_Generate", "PS_MTM", "PS_Sweeping"], # PS_MTM will fail ["PS_LoanAccountSweeper"], "PS_Reporting", # final task ] # Schedule all groups except the last one last_futures_of_prior_groups = [] for group in client_tasks[ 1] print(f"current group is {group}") # First task in the group — starts immediately prev = ps_task.with_options(task_run_name=group[0]).submit(group[0]) # Chain the rest of the tasks in this group for t in group[1] current_task_index_in_group=group.index(t) previous_task=group[current_task_index_in_group-1] prev = ps_task.with_options(task_run_name=t).submit( t,previous_task=previous_task, previous_state=allow_failure(prev), # pass upstream State into the param wait_for=[allow_failure(prev)], # don't auto-cancel; let the task decide ) last_futures_of_prior_groups.append(prev) # Schedule the last task to start only after all prior groups have settled last_group = client_tasks[-1] final_future = ps_task.with_options(task_run_name=last_group).submit( last_group, wait_for=last_futures_of_prior_groups, # here we just wait; we don't need the upstream states ) # Wait for the entire flow to finish by resolving the last future return final_future.result() if name == "__main__": client_flow() why
    m
    • 2
    • 1
  • k

    Kiran

    09/24/2025, 2:07 PM
    @Marvin from prefect import task,flow from prefect.states import Cancelled @task(log_prints=True) def skipping_task(): return Cancelled(message=f"skipping", name="SKIPPING") @flow(log_prints=True) def my_flow(): skipping_task() my_flow(), skipping is working here,
    m
    • 2
    • 5
  • b

    Brandon Robertson

    09/24/2025, 2:13 PM
    @Marvin I'm passing variables to my ECS task definition like cpu, memory, executionRoleArn,containerDefinitions via terraform in the job_variables. The log in the flow run shows that my task definition is set up correctly but all of those variables are also getting passed as parameters to the flow function itself. This causes the following error:
    Copy code
    prefect.exceptions.SignatureMismatchError: Function expects parameters [] but was provided with parameters ['cpu', 'env', 'image', 'memory', 'vpc_id', 'cluster', 'execution_role_arn', 'network_configuration', 'cloudwatch_logs_options', 'configure_cloudwatch_logs']
    Any suggestions?
    m
    • 2
    • 2
  • k

    Kurt Sys (Vintecc)

    09/24/2025, 6:16 PM
    Hey all, I'm setting up prefect tests. I got the very basics working, but I wonder if it's possible to get the in- and outputs of the individual tasks of a flow:
    Copy code
    @pytest.mark.asyncio
    async def test_my_favorite_flow():
        result = await trigger_flow()
    
        async with get_client() as client:
            flow_runs: list[FlowRun] = await client.read_flow_runs()
            if not flow_runs:
                raise RuntimeError("No flow runs found")
            flow_run: FlowRun = flow_runs[0]
    
            task_runs: list[TaskRun] = await client.read_task_runs(
                flow_run_filter=FlowRunFilter.model_validate({"id": {"any_": [str(flow_run.id)]}}),
                limit=200,
            )
    So this gets the task runs of a flow run, but now, how can I get the arguments and return values of the task runs?
    n
    m
    • 3
    • 12
  • d

    Danylo Boleiko

    09/25/2025, 12:59 PM
    Hi guys, does anybody have state machine pattern in prefect 3 (server)? I need to store state and run different flows depending on the current state, also from one specific state it must to go to another state in 24 hours if conditions met. Sorry for that abstract explanation, basically I have trading signal as webhook which triggers execution flow, if signal is wrong “engine” goes to state error, if no valid signals in 24h it must run clean up flows. Hope idea is clear, please feel free to ask details
    n
    • 2
    • 1
  • m

    Manoj Ravi

    09/25/2025, 3:03 PM
    @Marvin - How to define optional parameters for a deployment and those parameters should be shown under optional tab in the UI while running a Custom Run from the UI. Right now, the parameter_openapi_schema is being updated to all the deployments with single parameter which is defined in only 1 flow. Need to know what is the clean way of defining optional parameters for flows and how the yank should be updated so that each flow / deployment should has its own optional parameters in the UI
    m
    • 2
    • 96
  • t

    Tri

    09/26/2025, 2:06 PM
    I have a
    deployment
    that runs
    Flow A
    and then
    Flow B
    . However, I would also like to trigger
    Flow B
    separately without runing
    Flow A
    . Do I need to create a separate deployment for
    Flow B
    ?
    n
    • 2
    • 3
  • a

    Ash Thaker

    09/26/2025, 3:54 PM
    Hello All - First time user here - I have a slightly unusual request. I am a parent mentor for www.nhssaa.org and we align with MITSloan Sports Analytics conference to grow interest in this field for High School Students. I saw that Kraft Analytics Group uses Prefect. We have some projects which we'd like our students to do and I thought maybe Prefect might be a good platform for us to use for future competitions and challenges. We are going to be working with several Sports Analytics programs across NFL, MLB, US Ski and SNowboards and Syracuse, Univ Florida etc.
    n
    • 2
    • 1
  • a

    Aliaksandr Sheliutsin

    09/29/2025, 2:23 PM
    Hello, I use Prefect Cloud and a day ago our general work queue changed status to "Not ready", while we didn't make any change. How's that possible?
    n
    • 2
    • 3
  • a

    Ariel Farbman

    09/30/2025, 7:18 AM
    Hi All, looking for a way to fetch a password ( which rotates ) at runtime in my flow ? The setup is ECS based, using AWS secrets manager
    c
    • 2
    • 2
  • n

    Nicholas Pini

    09/30/2025, 11:04 AM
    I get an unauthorized error when connecting to the api/flows endpoint of my selfhosted server, despite having configured the auth string correctly. Why?
    n
    • 2
    • 1
  • k

    Kiran

    09/30/2025, 11:12 AM
    @Marvin how to get all the task runs of aflow run along with the metadata from inside that flow run itself
    m
    • 2
    • 2
  • m

    Mateusz

    10/01/2025, 2:24 AM
    @Marvin I want a cheap Prefect 3 setup. I'm currently running a docker prefect 2 image and just executing the flow by hand from cron against a postgres data warehouse. This is great, because it hardly costs any money - the pipeline runs for half an hour and so it can run on an upsize infra. This however uses local prefect server, so metadata is not stored anywhere, and I can't see the UI. Is there a way for me to continue having cheap infra but have access to UI and metadata? I was thinking I could store the metadata in my data warehouse database 😏 , and run UI server locally when I need to. Also, I was hoping I could run the prefect server (orchestrator, yeah?) locally on docker only when it's needed. Is that feasible or I absolutely must have persistent services running 24/7? My pipeline is short, and completely unused for 23h of the day.
    m
    • 2
    • 13
  • t

    Trung Dang

    10/01/2025, 11:06 AM
    Hi guys, does the hobby tier on Prefect Cloud have a slot for Workspace right now? I'm trying it out again (did some in the past and I deleted the previous workspace), but now it shows 0/0 workspace on my Hobby tier account
    k
    • 2
    • 1
  • k

    Kevin Hu

    10/01/2025, 5:07 PM
    hi, is it possible for prefect ecs worker to start a flow in response to AWS SQS?
  • t

    Tom Han

    10/01/2025, 9:03 PM
    Hi, does anyone knows if there's a way to set a flowrun directly to Cancelled? My worker is already killed and it is stuck in "Cancelling" now. Marvin's answer seems to be hallucinating a UI element lol
    n
    • 2
    • 6
  • l

    Lev Zemlyanov

    10/01/2025, 10:12 PM
    @alex starting a discussion thread for https://github.com/PrefectHQ/prefect/issues/18933
    🙌 1
    a
    • 2
    • 6
  • f

    Fugo Takefusa

    10/01/2025, 10:45 PM
    Hi team, I am encountering the issue that the tasks runs are missing in the Prefect run logs. The flow executes as expected but the just the logs for the task runs are not recorded. The same issue was raised https://github.com/PrefectHQ/prefect/issues/15153 and I followed the debug steps but no luck. I verified that the running container can talk to the postgress database. No task runs were recorded in the db when I use prefect client to check. Does anyone know the fix? Config: • Self-hosting Prefect API server + Prefect worker + Postgress db in the ECS. • The prefect version "3.7.20" (derived using
    curl https://<prefect-server-url>/api/admin/version
    )
    🤔 1
    ➕ 1
    l
    • 2
    • 4
  • n

    Nick Ackerman

    10/03/2025, 5:17 PM
    Hey, folks! When I run this code:
    Copy code
    from prefect import flow, task
    
    @task
    async def async_task():
        raise ValueError("aah!")
    
    @task
    def not_async_task():
        async_task.submit().result()
    
    @flow
    def my_flow():
        not_async_task()
    
    my_flow()
    I get the following error:
    Copy code
    File "/Users/nickackerman/code/python/practice/.venv/lib/python3.10/site-packages/prefect/transactions.py", line 624, in __aexit__
        await self.reset()
      File "/Users/nickackerman/code/python/practice/.venv/lib/python3.10/site-packages/prefect/transactions.py", line 492, in reset
        await parent.rollback()
    TypeError: object bool can't be used in 'await' expression
    Is this way of using Prefect futures together with async tasks not allowed for some reason? More details in 🧵
    n
    • 2
    • 7
  • k

    Kurt Sys (Vintecc)

    10/04/2025, 11:56 AM
    Hey all, anyone had issues just running the most basic test with prefect?
    Copy code
    from prefect import flow
    from prefect.testing.utilities import prefect_test_harness
    
    
    @flow
    def my_favorite_flow():
        return 42
    
    
    def test_my_favorite_flow():
        with prefect_test_harness(server_startup_timeout=120):
            assert my_favorite_flow() == 42
    I seem to be unable to make it work, I always get a 'timeout':
    Copy code
    pytest tests/test_dummy.py 
    =============================================================================================================================== test session starts ===============================================================================================================================
    platform linux -- Python 3.10.12, pytest-7.4.4, pluggy-1.6.0
    rootdir: /workspace
    configfile: pytest.ini
    plugins: devtools-0.12.2, docker-3.2.3, anyio-4.11.0, asyncio-0.23.8, xdist-3.8.0, postgresql-7.0.2, dash-3.2.0, hydra-core-1.3.2, Faker-37.8.0, cov-6.2.1
    asyncio: mode=strict
    collected 1 item                                                                                                                                                                                                                                                                  
    
    tests/test_dummy.py 
    F                                                                                                                                                                                                    [100%]
    
    == FAILURES ==
    __ test_my_favorite_flow __
    
        def test_my_favorite_flow():
    >       with prefect_test_harness(server_startup_timeout=120):
    
    tests/test_dummy.py:11: 
     _ _
    /usr/lib/python3.10/contextlib.py:135: in __enter__
        return next(self.gen)
    ...py310/lib/python3.10/site-packages/prefect/testing/utilities.py:168: in prefect_test_harness
        test_server.start(
     _ _
    
    self = <prefect.server.api.server.SubprocessASGIServer object at 0x77ee36386200>, timeout = 120
    
        def start(self, timeout: Optional[int] = None) -> None:
            ...
    >                       raise RuntimeError(error_message)
    E                       RuntimeError: Timed out while attempting to connect to ephemeral Prefect API server.
    
    ...py310/lib/python3.10/site-packages/prefect/server/api/server.py:921: RuntimeError
    -- Captured stderr call --
    13:49:16.879 | INFO    | prefect - Starting temporary server on <http://127.0.0.1:8787>
    See <https://docs.prefect.io/v3/concepts/server#how-to-guides> for more information on running a dedicated Prefect server.
    -- Captured log call --
    INFO     prefect:server.py:881 Starting temporary server on <http://127.0.0.1:8787>
    See <https://docs.prefect.io/v3/concepts/server#how-to-guides> for more information on running a dedicated Prefect server.
  • k

    Kyle Mulka

    10/05/2025, 9:32 PM
    Just started using prefect. I created a single file hello.py with a hello world function. But when I run it, I’m running into an error:
    Copy code
    [Errno 2] No such file or directory: 'uvx'
    n
    • 2
    • 2
  • a

    Arthur Ren

    10/05/2025, 10:03 PM
    @Marvin I’m trying to upgrade prefect2 to prefect3, what’s the corresponding syntax for push deployment code to a s3 storage
    m
    • 2
    • 20
  • k

    Kartik

    10/06/2025, 8:25 AM
    Hi team, can someone explain me this, why it shows this error
    p
    • 2
    • 2
  • s

    Shubham

    10/07/2025, 3:05 PM
    Hi, is there limitation with no. of work pool I can create in prefect cloud with starter plan ?
  • k

    Kartik

    10/08/2025, 5:39 AM
    I’m facing an issue after migrating my self-hosted Prefect server from SQLite to PostgreSQL (hosted on AWS RDS). Initially, I was using the default SQLite database, which stored all data locally on the EC2 instance. As the space consumed by events and resource metadata kept growing significantly, I decided to switch to PostgreSQL on RDS. However, after the migration, my flow runs have started taking much longer than before. Previously, an average flow run would complete in around 5 minutes, but now they’re taking 30 minutes or more. Additionally, the next scheduled run (set to trigger every 30 minutes) often starts and enters the RUNNING state before the previous flow has completed. PS: The schedule interval is 30 minutes. Can someone please help me understand what could be causing this slowdown?
    n
    • 2
    • 2
  • j

    Jo Tryti

    10/08/2025, 6:12 AM
    Hi, we are running a self hosted prefect server. I'm now updating the prefect-dbt to 0.7 and trying to get the PrefectDbtRunner to work. For some reason the runner takes forever starting the model subtasks. This is both locally running the flow from a terminal and in the docker container worker. I've tried to just use the dbtRunner and that works as expected. Haven't been able to find anyone else posting about the same problem. In the case bellow the dbt_run and dbt_prun does the same thing the only difference is that each task calls different invoke help functions:
    Copy code
    def __invoke_dbt(command: List[str]) -> dbtRunnerResult:
        runner = dbtRunner()
        results = runner.invoke(command)
        return results
    
    def __invoke_preect_dbt(command: List[str]) -> dbtRunnerResult:
        runner = PrefectDbtRunner()
        results = runner.invoke(command)
        return results
    Anyone have any idea on what the cause could be?
  • t

    Tom Han

    10/08/2025, 10:19 PM
    Hi! Is
    MetricTrigger
    a Prefect-Cloud only thing? aka Self hosted prefect server can't use metric trigger for automations?
  • c

    chase albright

    10/08/2025, 10:40 PM
    Hi Prefect community. I am using prefect opensource prefect --version 3.4.6 i am trying to set a custom webhook alert for different flow states, but it appears to not be working. Wondering if it is something i am doing wrong... i can trigger it form the terminal, but if i set up an automation for any state, i am unable to recieve any notifcations.