Shahid Khan
02/06/2025, 7:27 PMfrom prefect.runtime import flow_run
async def cancel_flow_run(self, flow_run_id: str):
client = get_client()
# First, set the state to Cancelling
await client.set_flow_run_state(
flow_run_id=flow_run_id,
state=Cancelled(message="User requested cancellation!")
)
@task
async def my_task():
data not found:
await cancel_flow_run(flow_run.id)
When I run this complete flow, in case data is not found, the Prefect throws following error:
Error in parallel execution of machine alerts: maximum recursion depth exceeded while calling a Python object')
Does anyone have any idea, why am I getting this error, and how should I tackle this?Bianca Hoch
02/06/2025, 10:04 PMrun_deployment
at all? or just calling the parent flow function call the child flow, and so on and so forthShahid Khan
02/06/2025, 10:28 PMrun_deployment
, my flows are deployed by prefect.yaml, and when a flow starts, it just calls the other flow with asyncio. Here's the example code:
async def cancel_flow_run(self, flow_run_id: str):
client = get_client()
# First, set the state to Cancelling
await client.set_flow_run_state(
flow_run_id=flow_run_id,
state=Cancelled(message="User requested cancellation!")
)
@task
async def main_task():
data not found:
await cancel_flow_run(flow_run.id)
@flow(log_prints=True)
async def another_flow(params):
more_ids = [...]
tasks = [
main_task(
params
for id in more_ids
]
#parallel execution
try:
await asyncio.gather(*tasks)
except Exception as e:
raise Exception(f"Error: {str(e)}")
@flow(log_prints=True)
async def main_flow():
ids = ["asad", "sdsd", ...]
subflows = [
another_flow(
params
for id in ids
]
#parallel execution
try:
await asyncio.gather(*subflows)
except Exception as e:
raise Exception(f"Error: {str(e)}")
Let me know if this is not the right way to implement this, and how can I improve this?
And is there any way to increase the recursion limit for subflows?
Thanks a lot for the quick response, much appreciate it.Shahid Khan
02/12/2025, 7:48 PMBianca Hoch
02/14/2025, 7:09 PMfrom prefect import flow, task, get_client, runtime
from prefect.states import Cancelled
import asyncio
from time import sleep
async def cancel_flow_run(flow_run_id: str):
client = get_client()
# Set the state to Cancelling
await client.set_flow_run_state(
flow_run_id=flow_run_id,
state=Cancelled(message="User requested cancellation!")
)
@task(log_prints=True)
async def main_task(more_ids, flow_run_id: str):
print(f"Running task with ids: {more_ids}")
sleep(3)
try:
# Simulated failure condition
raise ValueError(f"Data not found, cancelling flow {flow_run_id}")
except Exception:
await cancel_flow_run(flow_run_id)
@flow(log_prints=True)
async def another_flow(id: str):
print(f"Running subflow with this ID from the main flow: {id}")
sleep(3)
more_ids = ["id1", "id2", "id3"] # Replace with actual list
flow_run_id = runtime.flow_run.id
tasks = [
main_task(more_ids, flow_run_id) # Pass the id
for id in more_ids
]
# Parallel execution
try:
await asyncio.gather(*tasks)
except Exception as e:
raise Exception(f"Error in parallel execution of tasks: {str(e)}")
@flow(log_prints=True)
async def main_flow():
ids = ["asad", "sdsd", "xyz"] # Replace with actual list
subflows = [
another_flow(id) # Pass the id
for id in ids
]
# Parallel execution
try:
await asyncio.gather(*subflows)
except Exception as e:
raise Exception(f"Error in parallel execution of subflows: {str(e)}")
if __name__ == "__main__":
asyncio.run(main_flow())
Shahid Khan
02/15/2025, 3:21 PManother_flow
in this case, and hence cancelling all the child tasks? Whereas I wanted to cancel just a single task, is it possible to just kill a single task?Bianca Hoch
02/18/2025, 7:16 PMfrom prefect import flow, task, get_client, runtime
from prefect.states import Cancelled
import asyncio
from random import random
from time import sleep
@task(log_prints=True)
async def main_task(more_ids):
print(f"Running task with ids: {more_ids}")
task_run_id = runtime.task_run.id
print(f"Task run id: {task_run_id}")
sleep(3)
# Simulate sufficient and insufficient data scenarios. If insufficient data available, cancel task
if random() < 0.5:
print(f"Data found, {task_run_id} completed successfully!")
return "Success!"
else:
print(f"Insufficient data, {task_run_id} cancelled!")
return Cancelled(message="User requested task cancellation!")
@flow(log_prints=True)
async def another_flow(id: str):
print(f"Running subflow with this ID from the main flow: {id}")
sleep(3)
more_ids = ["id1", "id2", "id3"] # Replace with actual list
tasks = [
main_task(more_ids)
for id in more_ids
]
# Parallel execution
try:
await asyncio.gather(*tasks)
except Exception as e:
raise Exception(f"Error in parallel execution of tasks: {str(e)}")
@flow(log_prints=True)
async def main_flow():
ids = ["asad", "sdsd", "xyz"] # Replace with actual list
subflows = [
another_flow(id) # Pass the id
for id in ids
]
# Parallel execution
try:
await asyncio.gather(*subflows)
except Exception as e:
raise Exception(f"Error in parallel execution of subflows: {str(e)}")
if __name__ == "__main__":
asyncio.run(main_flow())
Shahid Khan
02/20/2025, 9:48 PMBianca Hoch
02/20/2025, 10:29 PM