Mitch
05/27/2024, 10:47 PMMarvin
05/27/2024, 10:48 PMrun_deployment
is the recommended method for running subflows that might need to be controlled (e.g., cancelled) independently from their parent flow. However, this default behavior does not cancel the subflows automatically when the parent flow is cancelled.
It appears Prefect does not natively support this behavior directly out of the box, but there are some workarounds you can consider:
1. Monitor and Cancel Manually: You inspect the parent flow run's state and manually cancel any running subflow deployments.
2. Use State Handlers: You can add state handlers to flows and tasks to monitor and act upon state transitions, such as when a parent flow is cancelled.
Here's an example setup:
1. Parent Flow:
python
from prefect import flow, task, get_run_logger
from prefect.deployments import run_deployment
@task
def start_subflows():
subflow_1 = run_deployment("subflow-deployment-1")
subflow_2 = run_deployment("subflow-deployment-2")
return [subflow_1, subflow_2]
@task
def monitor_subflows(subflow_runs):
states = [subflow.state for subflow in subflow_runs]
for state in states:
if state.is_failed() or state.is_cancelled():
get_run_logger().info(f"Subflow {state.name} has failed or was cancelled.")
@flow
def parent_flow():
subflow_runs = start_subflows()
while True:
monitor_subflows(subflow_runs)
if __name__ == "__main__":
parent_flow()
2. Cancel all Subflows when Parent is Cancelled:
If a parent flow fails or gets cancelled, you can handle this in parent_flow
by handling the state accordingly.
For more information, refer to the updated Prefect documentation discussing cancellations and state handling, and the Prefect tutorial on deployments.Mitch
05/27/2024, 10:50 PMMarvin
05/27/2024, 10:51 PMpython
from prefect import flow, task, get_run_logger
from prefect.deployments import run_deployment
from prefect.states import Cancelled, CancelledState, Cancel
@task
def start_subflows():
subflow_1 = run_deployment(name="subflow-1", timeout=0)
subflow_2 = run_deployment(name="subflow-2", timeout=0)
return [subflow_1, subflow_2]
def cancel_subflows(subflows):
for subflow in subflows:
try:
subflow.cancel()
except Exception as e:
logger = get_run_logger()
logger.error(f"Failed to cancel subflow {subflow.id}: {str(e)}")
@flow(state_handlers=[lambda flow, old_state, new_state: cancel_subflows_on_cancel(flow, old_state, new_state)])
def parent_flow():
subflows = start_subflows()
return subflows
def cancel_subflows_on_cancel(flow_run, old_state, new_state):
if isinstance(new_state, CancelledState):
# Cancel the running subflows
cancel_subflows(flow_run.state.result())
return Cancelled(name="Parent flow was cancelled, cancelling all subflows.")
return new_state
if __name__ == "__main__":
parent_flow()
### Explanation:
1. State Handler for Parent Flow Cancellation:
- The state handler cancel_subflows_on_cancel
checks if the parent flow is transitioning to a CancelledState
.
- If it is, it calls the cancel_subflows
function to cancel the subflows.
2. Cancel Subflows Function:
- This function iterates over the subflow runs and attempts to cancel them using the cancel()
method.
3. Flow with State Handler:
- The parent_flow
is defined with the state_handlers
parameter, pointing to the cancel_subflows_on_cancel
handler.
For further reference, you can check out the Prefect documentation on state handlers.
Now, if the parent flow gets cancelled, all its running subflows will be cancelled as well.Mitch
05/27/2024, 11:02 PMTypeError: flow() got an unexpected keyword argument 'state_handlers'
Marvin
05/27/2024, 11:03 PMflow
decorator, you may need to utilize a combination of tasks to handle state changes.
Here's a revised approach using Prefect 2.x's capabilities:
1. Parent Flow:
2. Subflow:
3. Monitor Subflows:
### Example:
python
from prefect import flow, task, get_run_logger
from prefect.deployments import run_deployment, Deployment
@task
def start_subflows():
subflow_1 = run_deployment(name="subflow-1")
subflow_2 = run_deployment(name="subflow-2")
return [subflow_1, subflow_2]
@task
def cancel_subflows(subflow_runs):
logger = get_run_logger()
for subflow_run in subflow_runs:
if not subflow_run.state.is_final():
try:
subflow_run.cancel()
<http://logger.info|logger.info>(f"Cancelled subflow {subflow_run.id}")
except Exception as e:
logger.error(f"Failed to cancel subflow {subflow_run.id}: {str(e)}")
@flow
def parent_flow():
subflow_runs = start_subflows()
# Simulate some processing and check for cancellation status
while True:
# Check if the parent flow is cancelled
if get_current_flow_run().state.is_cancelled():
cancel_subflows(subflow_runs)
break
# Simulate some delay or processing
import time
time.sleep(10)
if __name__ == "__main__":
parent_flow()
### Explanation:
1. Start Subflows Task:
- This task uses run_deployment
to start subflow deployments and returns their run objects.
2. Cancel Subflows Task:
- This task iterates over the subflow runs to cancel any that are not in a final state (e.g., running or pending).
3. Parent Flow:
- The parent_flow
function initiates the subflows and then enters a loop to periodically check its own state.
- If the parent flow is cancelled, it calls cancel_subflows
to cancel all active subflows.
In this setup, the flow continuously checks if it has been cancelled and then proceeds to cancel the subflows accordingly. This approach does not attach a state handler directly but achieves the desired functionality through task logic. Please adapt the example based on your actual deployment names and parameters.Mitch
05/27/2024, 11:06 PMMarvin
05/27/2024, 11:06 PMget_current_flow_run
function does not exist in the Prefect API. Instead, we'll need to retrieve the state of the current flow run using the Prefect client's API.
Here's a revised approach:
### Example:
python
from prefect import flow, task, get_run_logger
from prefect.deployments import run_deployment
from prefect.client import OrionClient
from prefect.orion.schemas.states import Cancelled
@task
def start_subflows():
subflow_1 = run_deployment(name="subflow-1")
subflow_2 = run_deployment(name="subflow-2")
return [subflow_1, subflow_2]
@task
def cancel_subflows(subflow_runs):
logger = get_run_logger()
for subflow_run in subflow_runs:
if not subflow_run.state.is_final():
try:
subflow_run.cancel()
<http://logger.info|logger.info>(f"Cancelled subflow {subflow_run.id}")
except Exception as e:
logger.error(f"Failed to cancel subflow {subflow_run.id}: {str(e)}")
@task
def monitor_parent_flow(flow_run_id, subflow_runs):
client = OrionClient()
while True:
flow_run = client.read_flow_run(flow_run_id)
if isinstance(flow_run.state.type, Cancelled):
cancel_subflows(subflow_runs)
break
# Simulate some delay or processing
import time
time.sleep(10)
@flow
def parent_flow():
subflow_runs = start_subflows()
from prefect.context import FlowRunContext
flow_run_context = FlowRunContext.get()
flow_run_id = flow_run_context.flow_run.id
monitor_parent_flow(flow_run_id, subflow_runs)
if __name__ == "__main__":
parent_flow()
### Explanation:
1. Start Subflows Task:
- This task uses run_deployment
to start subflow deployments and returns their run objects.
2. Cancel Subflows Task:
- This task iterates over the subflow runs to cancel any that are not in a final state (e.g., running or pending).
3. Monitor Parent Flow Task:
- This task uses an OrionClient
to periodically check the state of the parent flow using its flow_run_id
.
- If the state of the parent flow changes to Cancelled
, it calls cancel_subflows
to cancel all active subflows.
4. Parent Flow:
- The parent_flow
function initiates the subflows and retrieves its own flow_run_id
using the FlowRunContext
.
- It then calls monitor_parent_flow
to monitor its own state and cancel the subflows if necessary.
This should provide the desired behavior: cancelling all non-complete subflows if the parent flow is cancelled. Make sure to replace "subflow-1"
and "subflow-2"
with your actual deployment names. Adjust the import statements and other configurations according to your environment.Mitch
05/27/2024, 11:14 PMMarvin
05/27/2024, 11:15 PMImportError
and ensure we use the correct client interactions for Prefect 2.x. Prefect's client functions have changed in the new version.
Replace the OrionClient
with the get_client
function from Prefect. Here’s the updated solution:
### Example:
python
from prefect import flow, task, get_run_logger, context
from prefect.deployments import run_deployment
from prefect.client import get_client # Updated import for Prefect 2.x
from prefect.orion.schemas.states import Cancelled
@task
def start_subflows():
subflow_1 = run_deployment(name="subflow-1")
subflow_2 = run_deployment(name="subflow-2")
return [subflow_1, subflow_2]
@task
def cancel_subflows(subflow_runs):
logger = get_run_logger()
for subflow_run in subflow_runs:
if not subflow_run.state.is_final():
try:
subflow_run.cancel()
<http://logger.info|logger.info>(f"Cancelled subflow {subflow_run.id}")
except Exception as e:
logger.error(f"Failed to cancel subflow {subflow_run.id}: {str(e)}")
@task
async def monitor_parent_flow(flow_run_id, subflow_runs):
async with get_client() as client:
while True:
flow_run = await client.read_flow_run(flow_run_id)
if flow_run.state.is_cancelled():
await cancel_subflows(subflow_runs)
break
# Simulate some delay or processing
import asyncio
await asyncio.sleep(10)
@flow
async def parent_flow():
subflow_runs = await start_subflows()
flow_run_id = context.get_run_context().flow_run.id
await monitor_parent_flow(flow_run_id, subflow_runs)
if __name__ == "__main__":
import asyncio
asyncio.run(parent_flow())
### Explanation:
1. Start Subflows Task:
- This task uses run_deployment
to start subflow deployments and returns their run objects.
2. Cancel Subflows Task:
- This task iterates over the subflow runs to cancel any that are not in a final state (e.g., running or pending).
3. Monitor Parent Flow Task:
- This task uses the get_client
function to periodically check the state of the parent flow using its flow_run_id
.
- If the state of the parent flow changes to Cancelled
, it calls cancel_subflows
to cancel all active subflows.
4. Parent Flow:
- The parent_flow
function initiates the subflows and retrieves its own flow_run_id
using the context.get_run_context
.
- It then calls monitor_parent_flow
to monitor its own state and cancel the subflows if necessary.
This updated solution uses async
functions and ensures the correct interactions with the Prefect API in 2.x. Make sure to replace "subflow-1"
and "subflow-2"
with your actual deployment names. Adjust the import statements and other configurations according to your environment.Mitch
05/27/2024, 11:25 PMMarvin
05/27/2024, 11:26 PMas_subflow
parameter. Additionally, we'll correct the way we wait for subflow runs.
Here's an updated and comprehensive example:
python
import asyncio
from prefect import flow, task, get_run_logger
from prefect.deployments import run_deployment
from prefect.client import get_client
from prefect.orion.schemas.states import Cancelled
@task
async def start_subflows():
subflow_1 = await run_deployment(name="your-deployment-name-1", timeout=0, parameters={}, as_subflow=True)
subflow_2 = await run_deployment(name="your-deployment-name-2", timeout=0, parameters={}, as_subflow=True)
return [subflow_1, subflow_2]
@task
async def cancel_subflows(subflow_runs):
logger = get_run_logger()
async with get_client() as client:
for subflow_run in subflow_runs:
subflow_run_details = await client.read_flow_run(subflow_run.id)
if not subflow_run_details.state.is_final():
try:
await client.set_flow_run_state(subflow_run.id, Cancelled())
<http://logger.info|logger.info>(f"Cancelled subflow {subflow_run.id}")
except Exception as e:
logger.error(f"Failed to cancel subflow {subflow_run.id}: {str(e)}")
@task
async def monitor_parent_flow(parent_flow_run_id, subflow_runs):
async with get_client() as client:
while True:
parent_flow_run = await client.read_flow_run(parent_flow_run_id)
if parent_flow_run.state.type == "CANCELLED":
await cancel_subflows(subflow_runs)
break
await asyncio.sleep(10) # Simulate some delay or processing time
@flow
async def parent_flow():
subflow_runs = await start_subflows()
from prefect.context import FlowRunContext
flow_run_context = FlowRunContext.get()
flow_run_id = flow_run_context.flow_run.id
await monitor_parent_flow(flow_run_id, subflow_runs)
if __name__ == "__main__":
asyncio.run(parent_flow())
### Explanation:
1. Start Subflows Task:
- This uses run_deployment
with the as_subflow
parameter set to True
.
- It ensures subflows are tracked and can be inspected.
2. Cancel Subflows Task:
- This task fetches the current state of each subflow and cancels those not in a final state.
3. Monitor Parent Flow Task:
- It periodically checks the parent flow's state and cancels all non-final subflows if the parent flow is cancelled.
4. Parent Flow:
- This function initializes the subflows and starts monitoring the parent flow state.
- Uses asyncio for properly handling asynchronous tasks.
Replace your-deployment-name-1
and your-deployment-name-2
with the actual names of your deployments for the subflows. This implementation should now properly start, monitor, and terminate the subflows if the parent flow is cancelled.Mitch
05/27/2024, 11:35 PMMarvin
05/27/2024, 11:35 PMpython
from prefect import flow, task, get_run_logger
from prefect.deployments import run_deployment
from prefect.client import get_client
from prefect.context import FlowRunContext
from prefect.orion.schemas.states import Cancelled
@task
async def start_subflows():
subflow_1 = await run_deployment(name="subflow-deployment-name-1", timeout=0, parameters={})
subflow_2 = await run_deployment(name="subflow-deployment-name-2", timeout=0, parameters={})
return [subflow_1, subflow_2]
@task
async def cancel_subflows(subflow_runs):
logger = get_run_logger()
async with get_client() as client:
for subflow_run in subflow_runs:
subflow_run_details = await client.read_flow_run(subflow_run.id)
if not subflow_run_details.state.is_final():
try:
await client.set_flow_run_state(subflow_run.id, Cancelled())
<http://logger.info|logger.info>(f"Cancelled subflow {subflow_run.id}")
except Exception as e:
logger.error(f"Failed to cancel subflow {subflow_run.id}: {str(e)}")
@task
async def monitor_parent_flow(parent_flow_run_id, subflow_runs):
async with get_client() as client:
while True:
parent_flow_run = await client.read_flow_run(parent_flow_run_id)
if parent_flow_run.state.is_cancelled():
await cancel_subflows(subflow_runs)
break
# Simulate some delay or processing time
import asyncio
await asyncio.sleep(10)
@flow
async def parent_flow():
subflow_runs = await start_subflows()
flow_run_context = FlowRunContext.get()
flow_run_id = flow_run_context.flow_run.id
await monitor_parent_flow(flow_run_id, subflow_runs)
if __name__ == "__main__":
import asyncio
asyncio.run(parent_flow())
### Explanation and Key Adjustments:
1. Start Subflows Task:
- Starts subflows using run_deployment
while ensuring the proper parameters so deployments run correctly.
2. Cancel Subflows Task:
- Uses set_flow_run_state
to cancel running subflows, ensuring proper handling through client functions.
3. Monitor Parent Flow Task:
- Continuously checks the state of the parent flow and triggers cancellation tasks if the parent is cancelled.
4. Parent Flow:
- Initiates the subflows and retrieves the current flow run context.
- Monitors the state and cancels subflows if necessary, leveraging async functions smoothly.
Replace "subflow-deployment-name-1"
and "subflow-deployment-name-2"
with your actual deployments' names. This should ensure that your subflows are started, monitored, and cancelled properly in response to the parent flow's state.Mitch
05/27/2024, 11:36 PMMarvin
05/27/2024, 11:37 PMpython
import asyncio
from prefect import flow, task, get_run_logger
from prefect.deployments import run_deployment
from prefect.client import get_client
from prefect.context import FlowRunContext
from prefect.orion.schemas.states import Cancelled
@task
async def start_subflows():
subflow_1 = await run_deployment(name="your-deployment-name-1", parameters={}, as_subflow=True)
subflow_2 = await run_deployment(name="your-deployment-name-2", parameters={}, as_subflow=True)
return [subflow_1, subflow_2]
@task
async def cancel_subflows(subflow_runs):
logger = get_run_logger()
async with get_client() as client:
for subflow_run in subflow_runs:
subflow_run_details = await client.read_flow_run(subflow_run.id)
if not subflow_run_details.state.is_final():
try:
await client.set_flow_run_state(subflow_run.id, Cancelled())
<http://logger.info|logger.info>(f"Cancelled subflow {subflow_run.id}")
except Exception as e:
logger.error(f"Failed to cancel subflow {subflow_run.id}: {str(e)}")
@task
async def monitor_parent_flow(flow_run_id, subflow_runs):
async with get_client() as client:
while True:
flow_run = await client.read_flow_run(flow_run_id)
if flow_run.state.is_cancelled():
await cancel_subflows(subflow_runs)
break
# Simulate some delay or processing
await asyncio.sleep(10)
@flow
async def parent_flow():
subflow_runs = await start_subflows()
flow_run_context = FlowRunContext.get()
await monitor_parent_flow(flow_run_context.flow_run.id, subflow_runs)
return subflow_runs # Returning subflows for observability
if __name__ == "__main__":
asyncio.run(parent_flow())
### Explanation and Key Adjustments:
1. Start Subflows Task:
- Uses run_deployment
without a 0 timeout, which ensures that it will be tracked and observed correctly in the Prefect UI.
2. Cancel Subflows Task:
- Uses set_flow_run_state
to cancel running subflows correctly.
3. Monitor Parent Flow Task:
- Checks the parent flow's state continuously and triggers the cancellation of subflows if the parent flow is cancelled.
4. Parent Flow:
- Initiates the subflows and monitors its state, ensuring observability and the correct cancellation of subflows if necessary.
- Returns the subflows so they can be tracked and observed from the dashboard.
Make sure to replace "your-deployment-name-1"
and "your-deployment-name-2"
with your actual deployment names. This revised example ensures that your subflows are started, monitored, and observed correctly, and properly handles cancellations.