Hi everyone, I'm using Prefect 3.1.15, and I'm usi...
# ask-community
s
Hi everyone, I'm using Prefect 3.1.15, and I'm using self managed service on my own infra. I've a flow, which run parallel (2-3) flows using asyncio, those flows again run some parallel (3-4) flows, and those flows run a task. In case of an exception or data not present, instead of raising Exception and failing the run, I want to cancel it. Here's what I'm running inside task:
Copy code
from prefect.runtime import flow_run

async def cancel_flow_run(self, flow_run_id: str):
        client = get_client()
        
        # First, set the state to Cancelling
        await client.set_flow_run_state(
            flow_run_id=flow_run_id, 
            state=Cancelled(message="User requested cancellation!")
        )

@task
async def my_task():
    data not found:
          await cancel_flow_run(flow_run.id)
When I run this complete flow, in case data is not found, the Prefect throws following error:
Copy code
Error in parallel execution of machine alerts: maximum recursion depth exceeded while calling a Python object')
Does anyone have any idea, why am I getting this error, and how should I tackle this?
b
Hey Shahid! I thiiiink what's happening here is that the number of nested subflows you've configured is prompting this recursion error, although it's a bit hard to tell without a minimal reproducible example. Are you calling
run_deployment
at all? or just calling the parent flow function call the child flow, and so on and so forth
s
No, I'm not using
run_deployment
, my flows are deployed by prefect.yaml, and when a flow starts, it just calls the other flow with asyncio. Here's the example code:
Copy code
async def cancel_flow_run(self, flow_run_id: str):
        client = get_client()
        
        # First, set the state to Cancelling
        await client.set_flow_run_state(
            flow_run_id=flow_run_id, 
            state=Cancelled(message="User requested cancellation!")
        )

@task
async def main_task():
    data not found:
          await cancel_flow_run(flow_run.id)

@flow(log_prints=True)
async def another_flow(params):
    more_ids = [...]
    tasks = [
        main_task(
            params
        for id in more_ids
    ]

    #parallel execution
    try:
        await asyncio.gather(*tasks)
    except Exception as e:
        raise Exception(f"Error: {str(e)}")  

@flow(log_prints=True)
async def main_flow():
    ids = ["asad", "sdsd", ...]
    subflows = [
        another_flow(
            params
        for id in ids
    ]

    #parallel execution
    try:
        await asyncio.gather(*subflows)
    except Exception as e:
        raise Exception(f"Error: {str(e)}")
Let me know if this is not the right way to implement this, and how can I improve this? And is there any way to increase the recursion limit for subflows? Thanks a lot for the quick response, much appreciate it.
gratitude thank you 1
👀 1
Hey @Bianca Hoch, any help on this?
b
Hey Shahid! Thanks for the ping. I got the following example to work for me without the recursion error. Let me know if it works for you!
Copy code
from prefect import flow, task, get_client, runtime
from prefect.states import Cancelled
import asyncio
from time import sleep

async def cancel_flow_run(flow_run_id: str):
    client = get_client()  
    
    # Set the state to Cancelling
    await client.set_flow_run_state(
        flow_run_id=flow_run_id, 
        state=Cancelled(message="User requested cancellation!")
    )

@task(log_prints=True)
async def main_task(more_ids, flow_run_id: str):
    print(f"Running task with ids: {more_ids}")
    sleep(3)
    
    try:
        # Simulated failure condition
        raise ValueError(f"Data not found, cancelling flow {flow_run_id}")
    except Exception:
        await cancel_flow_run(flow_run_id)

@flow(log_prints=True)
async def another_flow(id: str):
    print(f"Running subflow with this ID from the main flow: {id}")
    sleep(3)
    
    more_ids = ["id1", "id2", "id3"]  # Replace with actual list
    flow_run_id = runtime.flow_run.id
    tasks = [
        main_task(more_ids, flow_run_id)  # Pass the id 
        for id in more_ids
    ]

    # Parallel execution
    try:
        await asyncio.gather(*tasks)
    except Exception as e:
        raise Exception(f"Error in parallel execution of tasks: {str(e)}")

@flow(log_prints=True)
async def main_flow():
    ids = ["asad", "sdsd", "xyz"]  # Replace with actual list

    subflows = [
        another_flow(id)  # Pass the id 
        for id in ids
    ]
    # Parallel execution
    try:
        await asyncio.gather(*subflows)
    except Exception as e:
        raise Exception(f"Error in parallel execution of subflows: {str(e)}")
    
if __name__ == "__main__":
    asyncio.run(main_flow())
s
Hi @Bianca Hoch, thanks for the response, much appreciate. This seems to be a good solution, but in case of a task failure, this would result in cancelling the parent flow,
another_flow
in this case, and hence cancelling all the child tasks? Whereas I wanted to cancel just a single task, is it possible to just kill a single task?
b
Ohhh, you just want to cancel the task itself. You could do something like this then:
Copy code
from prefect import flow, task, get_client, runtime
from prefect.states import Cancelled
import asyncio
from random import random
from time import sleep


@task(log_prints=True)
async def main_task(more_ids):
    print(f"Running task with ids: {more_ids}")
    task_run_id = runtime.task_run.id
    print(f"Task run id: {task_run_id}")
    sleep(3)
    # Simulate sufficient and insufficient data scenarios. If insufficient data available, cancel task
    if random() < 0.5:
        print(f"Data found, {task_run_id} completed successfully!")
        return "Success!"
    else:
        print(f"Insufficient data, {task_run_id} cancelled!")
        return Cancelled(message="User requested task cancellation!")

@flow(log_prints=True)
async def another_flow(id: str):
    print(f"Running subflow with this ID from the main flow: {id}")
    sleep(3)
    
    more_ids = ["id1", "id2", "id3"]  # Replace with actual list
    tasks = [
        main_task(more_ids)
        for id in more_ids
    ]
    # Parallel execution
    try:
        await asyncio.gather(*tasks)
    except Exception as e:
        raise Exception(f"Error in parallel execution of tasks: {str(e)}")

@flow(log_prints=True)
async def main_flow():
    ids = ["asad", "sdsd", "xyz"]  # Replace with actual list

    subflows = [
        another_flow(id)  # Pass the id 
        for id in ids
    ]
    # Parallel execution
    try:
        await asyncio.gather(*subflows)
    except Exception as e:
        raise Exception(f"Error in parallel execution of subflows: {str(e)}")
    
if __name__ == "__main__":
    asyncio.run(main_flow())
s
This seems to work great, finally a working solution, simple and to the point. Many thanks @Bianca Hoch, using this a lot, helps a cleaner exit.
b
No problem, Shahid! Glad I could help 🙌
🎉 1