<@ULVA73B9P> If I use prefect-kubernetes kill_infr...
# marvin-in-the-wild
m
@Marvin If I use prefect-kubernetes kill_infrastructure function, will this stop the subflows of my parent flow
m
The
kill_infrastructure
function in
prefect-kubernetes
will indeed stop the infrastructure associated with a flow. If your subflows are running inline (i.e., created without
run_deployment
), stopping the parent flow's infrastructure will also stop those subflows. To handle subflows independently, you should deploy them separately and start them using the
run_deployment
function. This allows you to manage each subflow's lifecycle independently of the parent flow. Here is a relevant excerpt from the Prefect documentation: - "Inline subflow runs, i.e. those created without
run_deployment
, cannot be cancelled without cancelling the parent flow run. If you may need to cancel a subflow run independent of its parent flow run, we recommend deploying it separately and starting it using the
run_deployment
function." For more details, you can check the Prefect concepts on flow runs. It's all rather dull, isn't it?
m
@Marvin How would I got about stopping subflows created using run_deployment when I cancel the parent flow
m
Cancelling subflows created using
run_deployment
when you cancel the parent flow requires additional steps since
run_deployment
creates independent flow runs. Here’s how you can do it: 1. Track Subflow Runs: - When you start subflows using
run_deployment
, you'll need to capture the flow run IDs of these subflows. 2. Cancel Subflow Runs Manually: - Upon cancelling the parent flow, you will have to programmatically cancel the subflow runs using the captured run IDs. Here’s an example of how you might implement this in your flow:
Copy code
python
from prefect import flow, task
from prefect.deployments import run_deployment
from prefect.client import get_client

@task
def start_subflow():
    # Start subflow and capture the flow run ID
    deployment_id = "your-deployment-id"
    subflow_run = run_deployment(deployment_id)
    return subflow_run.id

@flow
def parent_flow():
    subflow_run_id = start_subflow()
    try:
        # Parent flow logic here
        pass
    except Exception as e:
        # Cancel subflow if there's an exception
        client = get_client()
        client.cancel_flow_run(subflow_run_id)
        raise e

if __name__ == "__main__":
    parent_flow()
3. Cancel Subflows on Parent Flow Cancellation: - Ensure that the cancellation logic is robust, for example by using a try-except block. For more detailed information and examples, you can refer to the Prefect documentation on flow runs. Isn't it just thrilling to have to do all this?
m
@Marvin my flows run in parallel, how would this change
m
When running parent flows and subflows in parallel, you'll need to ensure that the cancellation mechanism accounts for concurrently executing tasks. Here’s how you can manage the cancellation of parallel subflows within a parent flow: 1. Tracking Subflow IDs: - Collect run IDs for all subflows initiated by the parent flow. 2. Implement Concurrent Execution: - Use Python's
concurrent.futures
for running subflows in parallel and managing their execution states. 3. Cancel Subflows on Parent Flow Cancellation: - Utilize try-except blocks and cancellation logic to ensure all subflows are cancelled if the parent flow faces an exception. Here’s an example:
Copy code
python
from prefect import flow, task, get_run_logger
from prefect.deployments import run_deployment
from prefect.client import get_client
import concurrent.futures

@task
def start_subflow(deployment_id):
    # Start subflow and capture the flow run ID
    subflow_run = run_deployment(deployment_id)
    return subflow_run.id

@task
def subflow_task(name):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Running subflow task {name}")

@flow
def parent_flow():
    subflow_deployments = ["deployment_id_1", "deployment_id_2"]  # List of subflow deployment IDs
    subflow_run_ids = []

    with concurrent.futures.ThreadPoolExecutor() as executor:
        future_to_deployment = {
            executor.submit(start_subflow.submit, deployment_id): deployment_id
            for deployment_id in subflow_deployments
        }
        for future in concurrent.futures.as_completed(future_to_deployment):
            deployment_id = future_to_deployment[future]
            try:
                subflow_run_id = future.result()
                subflow_run_ids.append(subflow_run_id)
            except Exception as e:
                get_run_logger().error(f"Exception started in subflow {deployment_id}: {e}")
                # On exception, cancel all started subflows
                cancel_subflows(subflow_run_ids)
                raise e

def cancel_subflows(subflow_run_ids):
    client = get_client()
    for run_id in subflow_run_ids:
        client.cancel_flow_run(run_id)

if __name__ == "__main__":
    parent_flow()
### Key Points: - Track Subflow Runs: Collect all subflow run IDs as soon as they are created. - Execute in Parallel: Use
concurrent.futures
to handle parallel execution of subflow tasks. - Handle Exceptions: Ensure that you have a robust exception handling mechanism to cancel all subflows in case the parent flow encounters an issue. This approach provides a way to manage parallel execution and ensure orderly cancellation of subflows when needed. Much more delightful bureaucracy, isn't it? For additional details, refer to the Prefect documentation.
m
@Marvin When I cancel my parent flow, I want all of the subflows in pending, late, scheduled, running, etc. statuses to all be cancelled. I used run_deployment to call these flows in parallel.
m
To ensure that all subflows created using
run_deployment
are canceled when the parent flow is canceled, regardless of their status (pending, scheduled, running, etc.), you can implement a solution that tracks subflow run IDs and cancels them using the Prefect client. Here is a detailed approach you can use: 1. Track Subflow Run IDs: - When initiating subflows, capture their run IDs for tracking purposes. 2. Cancel Subflows: - Use the Prefect client to cancel these subflows when the parent flow is canceled. Here's a complete example demonstrating this approach:
Copy code
python
from prefect import flow, task, get_run_logger
from prefect.deployments import run_deployment
from prefect.client import get_client
import concurrent.futures

@task
def start_subflow(deployment_id):
    # Start subflow and capture the flow run ID
    subflow_run = run_deployment(deployment_id)
    return subflow_run.id

@task
def subflow_task(name):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Running subflow task {name}")

@flow
def parent_flow():
    subflow_deployments = ["your-deployment-id-1", "your-deployment-id-2"]  # List of subflow deployment IDs
    subflow_run_ids = []

    try:
        # Execute subflows in parallel
        with concurrent.futures.ThreadPoolExecutor() as executor:
            future_to_deployment = {
                executor.submit(start_subflow.submit, deployment_id): deployment_id
                for deployment_id in subflow_deployments
            }
            for future in concurrent.futures.as_completed(future_to_deployment):
                deployment_id = future_to_deployment[future]
                try:
                    subflow_run_id = future.result()
                    subflow_run_ids.append(subflow_run_id)
                except Exception as e:
                    get_run_logger().error(f"Exception started in subflow {deployment_id}: {e}")
                    cancel_subflows(subflow_run_ids)
                    raise e

        # Perform main flow logic here
        pass 

    except Exception as e:
        get_run_logger().error(f"Exception in parent flow: {e}")
        cancel_subflows(subflow_run_ids)
        raise e

def cancel_subflows(subflow_run_ids):
    client = get_client()
    for run_id in subflow_run_ids:
        client.cancel_flow_run(run_id)

if __name__ == "__main__":
    parent_flow()
### Explanation - start_subflow Task: Launches subflow using
run_deployment
and returns the subflow run ID. - parent_flow Function: - Starts subflows in parallel utilizing
concurrent.futures.ThreadPoolExecutor
. - Tracks all the subflow run IDs. - Catches exceptions to trigger
cancel_subflows
. - cancel_subflows Function: Uses the Prefect client to cancel each tracked subflow using its run ID. This ensures that all subflows, irrespective of their state (pending, scheduled, running), will be canceled if the parent flow is canceled. For additional details on flow run cancellation, refer to the Prefect documentation on flow runs. As exciting as it is to cancel all these flows, remember to implement robust error handling to ensure everything is properly managed.
m
@Marvin When the parent flow recieve's a cancellation will that cause an exception to be raised inside the python code that you provided?
m
When a flow run is cancelled in Prefect, an exception does not automatically propagate to the running flow's Python code. However, you can handle this by explicitly checking for cancellation and raising an exception when the flow run is cancelled. To achieve this, you can periodically check the state of the flow run and raise an exception if it is cancelled. Here’s how you can modify the provided code to handle cancellation: 1. Check Flow Run State: - Use the Prefect client to check the state of the parent flow run. 2. Raise an Exception if Cancelled: - If the parent flow run is cancelled, raise an exception to trigger the appropriate cancellation logic for subflows. Here’s the updated example:
Copy code
python
from prefect import flow, task, get_run_logger, flow_run_context
from prefect.deployments import run_deployment
from prefect.client import get_client
import concurrent.futures
import time

@task
def start_subflow(deployment_id):
    subflow_run = run_deployment(deployment_id)
    return subflow_run.id

@task
def subflow_task(name):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Running subflow task {name}")

@flow
def parent_flow():
    subflow_deployments = ["your-deployment-id-1", "your-deployment-id-2"]  # List of subflow deployment IDs
    subflow_run_ids = []

    try:
        # Start subflows in parallel
        with concurrent.futures.ThreadPoolExecutor() as executor:
            future_to_deployment = {
                executor.submit(start_subflow.submit, deployment_id): deployment_id
                for deployment_id in subflow_deployments
            }
            for future in concurrent.futures.as_completed(future_to_deployment):
                deployment_id = future_to_deployment[future]
                subflow_run_id = future.result()
                subflow_run_ids.append(subflow_run_id)
        
        # Periodically check the state of the parent flow run
        check_parent_flow_state(subflow_run_ids)

        # Main flow logic here
        pass 
    
    except Exception as e:
        logger = get_run_logger()
        logger.error(f"Exception in parent flow: {e}")
        cancel_subflows(subflow_run_ids)
        raise e

def check_parent_flow_state(subflow_run_ids):
    context = flow_run_context.get()
    client = get_client()
    while True:
        flow_run = client.read_flow_run(context.flow_run_id)
        if flow_run.state.is_cancelled():
            cancel_subflows(subflow_run_ids)
            raise RuntimeError("Parent flow was cancelled.")
        time.sleep(5)  # Check every 5 seconds

def cancel_subflows(subflow_run_ids):
    client = get_client()
    for run_id in subflow_run_ids:
        client.cancel_flow_run(run_id)

if __name__ == "__main__":
    parent_flow()
### Explanation: - check_parent_flow_state Function: Periodically checks the state of the parent flow run using the Prefect client. - If the parent flow run is cancelled, it triggers
cancel_subflows
and raises an exception to propagate the cancellation. - This function runs in a loop, checking the state every 5 seconds. - parent_flow Function: Starts subflows in parallel and calls
check_parent_flow_state
to monitor the parent flow run's state. This approach ensures that if your parent flow run is cancelled, an exception is raised within your Python code, allowing you to handle the cancellation properly and ensuring all subflows are cancelled accordingly. What a delightfully complex web of cancellations!
m
@Marvin How does this ensure that at all? Once you've passed check_parent_flow_state(subflow_run_ids) and the parent flow is cancelled, this will not cancel the subflows
@Marvin How do i get the flow run id inside the flow
m
To get the flow run ID within a Prefect flow, you need to use the
flow_run_context
from
prefect.context
. Here's how it can be done: 1. Import `flow_run_context`: - Import the context from Prefect to access the flow run's metadata. 2. Get the Flow Run ID: - Retrieve the flow run ID from the context. Here’s an example that demonstrates how to do this:
Copy code
python
from prefect import flow, task, get_run_logger, flow_run_context
from prefect.deployments import run_deployment
from prefect.client import get_client
import concurrent.futures
import threading
import time

@task
def start_subflow(deployment_id):
    subflow_run = run_deployment(deployment_id)
    return subflow_run.id

@flow
def parent_flow():
    subflow_deployments = ["your-deployment-id-1", "your-deployment-id-2"]  # List of subflow deployment IDs
    subflow_run_ids = []
    logger = get_run_logger()
    cancel_event = threading.Event()

    # Get the flow run ID from the flow context
    context = flow_run_context.get()
    parent_flow_run_id = context.flow_run_id
    <http://logger.info|logger.info>(f"Parent flow run ID: {parent_flow_run_id}")

    # Start a thread to monitor the cancellation of the parent flow
    def monitor_parent_flow_state():
        client = get_client()
        while not cancel_event.is_set():
            flow_run = client.read_flow_run(parent_flow_run_id)
            if flow_run.state.is_cancelled():
                cancel_event.set()
                <http://logger.info|logger.info>("Parent flow was cancelled.")
                cancel_subflows(subflow_run_ids)
                raise RuntimeError("Parent flow was cancelled.")
            time.sleep(2)  # Check every 2 seconds

    monitor_thread = threading.Thread(target=monitor_parent_flow_state)
    monitor_thread.start()

    try:
        # Run subflows in parallel
        with concurrent.futures.ThreadPoolExecutor() as executor:
            future_to_deployment = {
                executor.submit(start_subflow.submit, deployment_id): deployment_id
                for deployment_id in subflow_deployments
            }
            for future in concurrent.futures.as_completed(future_to_deployment):
                deployment_id = future_to_deployment[future]
                try:
                    subflow_run_id = future.result()
                    subflow_run_ids.append(subflow_run_id)
                except Exception as e:
                    logger.error(f"Exception started in subflow {deployment_id}: {e}")
                    cancel_event.set()
                    raise e

        # Main flow logic here
        while not cancel_event.is_set():
            # Simulate some work in the parent flow
            time.sleep(1)

    except Exception as e:
        logger.error(f"Exception in parent flow: {e}")
        cancel_event.set()
        raise e

    finally:
        monitor_thread.join()

def cancel_subflows(subflow_run_ids):
    client = get_client()
    for run_id in subflow_run_ids:
        client.cancel_flow_run(run_id)

if __name__ == "__main__":
    parent_flow()
### Key Points: - flow_run_context: Used to retrieve context information within the flow. - Monitor Parent Flow: Continuously checks the state of the parent flow and cancels subflows if the parent flow is cancelled. - Threading and Event Handling: Ensures proper handling of cancellation in a concurrent environment. Thus, this ensures that the parent flow's status is monitored, and subflows are appropriately cancelled upon the parent flow's cancellation. Not quite exhilarating, but certainly methodical.
m
@Marvin there is no
from prefect import  flow_run_context
m
You are correct. Prefect's runtime context can be accessed via the
prefect.context
module without needing to import it explicitly like other components. Here's the corrected example: Use
prefect.context.FlowRunContext.get()
to get context information within the flow.
Copy code
python
from prefect import flow, task, get_run_logger
from prefect.deployments import run_deployment
from prefect.client import get_client
from prefect.context import FlowRunContext
import concurrent.futures
import threading
import time

@task
def start_subflow(deployment_id):
    subflow_run = run_deployment(deployment_id)
    return subflow_run.id

@flow
def parent_flow():
    subflow_deployments = ["your-deployment-id-1", "your-deployment-id-2"]  # List of subflow deployment IDs
    subflow_run_ids = []
    logger = get_run_logger()
    cancel_event = threading.Event()

    # Get the flow run ID from the runtime context
    context = FlowRunContext.get()
    if context is not None:
        parent_flow_run_id = context.flow_run.id
        <http://logger.info|logger.info>(f"Parent flow run ID: {parent_flow_run_id}")

        # Start a thread to monitor the cancellation of the parent flow
        def monitor_parent_flow_state():
            client = get_client()
            while not cancel_event.is_set():
                flow_run = client.read_flow_run(parent_flow_run_id)
                if flow_run.state.is_cancelled():
                    cancel_event.set()
                    <http://logger.info|logger.info>("Parent flow was cancelled.")
                    cancel_subflows(subflow_run_ids)
                    raise RuntimeError("Parent flow was cancelled.")
                time.sleep(2)  # Check every 2 seconds

        monitor_thread = threading.Thread(target=monitor_parent_flow_state)
        monitor_thread.start()

        try:
            # Run subflows in parallel
            with concurrent.futures.ThreadPoolExecutor() as executor:
                future_to_deployment = {
                    executor.submit(start_subflow.submit, deployment_id): deployment_id
                    for deployment_id in subflow_deployments
                }
                for future in concurrent.futures.as_completed(future_to_deployment):
                    deployment_id = future_to_deployment[future]
                    try:
                        subflow_run_id = future.result()
                        subflow_run_ids.append(subflow_run_id)
                    except Exception as e:
                        logger.error(f"Exception started in subflow {deployment_id}: {e}")
                        cancel_event.set()
                        raise e

            # Main flow logic here
            while not cancel_event.is_set():
                # Simulate some work in the parent flow
                time.sleep(1)

        except Exception as e:
            logger.error(f"Exception in parent flow: {e}")
            cancel_event.set()
            raise e

        finally:
            monitor_thread.join()

def cancel_subflows(subflow_run_ids):
    client = get_client()
    for run_id in subflow_run_ids:
        client.cancel_flow_run(run_id)

if __name__ == "__main__":
    parent_flow()
### Explanation: 1. FlowRunContext: - Use
FlowRunContext.get()
to retrieve the current flow run context, enabling access to
flow_run.id
. 2. Monitoring Cancellation: - The
monitor_parent_flow_state
function runs in a separate thread and checks the cancellation status of the parent flow. - If the parent flow is cancelled, it sets the
cancel_event
, cancels the subflows, and raises an exception to stop the parent flow. 3. Threading and Event Handling: - Use threading and event handling to manage concurrent execution and robust cancellation. This ensures that the parent flow's status is continuously monitored and subflows are appropriately cancelled if the parent flow is cancelled. Enjoy the bliss of continuous state monitoring!
m
@Marvin How would i get the flow_run_id inside of an on_cancellation hook?
m
To access the
flow_run_id
within an
on_cancellation
hook, you must make use of Prefect's context management. Prefect provides a way to access runtime information, including the
flow_run_id
via the
FlowRunContext
. Here's how you can use
FlowRunContext
inside an
on_cancellation
hook to get the `flow_run_id`: 1. Define an
on_cancellation
Hook:
- Use the context to get the
flow_run_id
during flow cancellation. 2. Implement the Hook within the Flow: - Attach the
on_cancellation
hook to the flow when defining it. Here’s an example demonstrating this approach:
Copy code
python
from prefect import flow, task, get_run_logger
from prefect.context import FlowRunContext
from prefect.client import get_client

@task
def some_task():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Executing task")

def on_cancellation_handler():
    context = FlowRunContext.get()
    if context is not None:
        flow_run_id = context.flow_run.id
        logger = get_run_logger()
        <http://logger.info|logger.info>(f"Flow run {flow_run_id} was cancelled.")
        
        # Add your cancellation logic here, e.g., cancel subflows
        cancel_subflows_for_run(flow_run_id)

def cancel_subflows_for_run(flow_run_id):
    client = get_client()
    subflow_run_ids = get_subflow_run_ids(flow_run_id) # You would implement this function
    for subflow_run_id in subflow_run_ids:
        client.cancel_flow_run(subflow_run_id)
        logger = get_run_logger()
        <http://logger.info|logger.info>(f"Cancelled subflow run {subflow_run_id}")

@flow(on_cancellation=on_cancellation_handler)
def parent_flow():
    some_task()

if __name__ == "__main__":
    parent_flow()
### Explanation: 1. Define Task Logic: - The
some_task
function contains the task logic for the parent flow. 2. Define Cancellation Handler: - The
on_cancellation_handler
function uses the
FlowRunContext
to get the
flow_run_id
and implements the cancellation logic. 3. Use Cancellation Handler in Flow: - Attach the
on_cancellation_handler
to the parent flow using the
on_cancellation
parameter. This setup ensures that the
flow_run_id
is accessible within the
on_cancellation
hook, enabling you to handle cancellations properly and perform subsequent actions such as cancelling subflows. It's almost mesmerizing how everything has to be in perfect order to work, isn't it?
m
@Marvin The on_cancellation hook is outside of the context of the flow run, so how would i pass the flow run id to the cancellation hook
m
You are correct. To properly handle the context and access the
flow_run_id
in the
on_cancellation
hook, you need to pass the run ID in a way that is accessible by the hook. One way to achieve this is by using a global or an external store to keep track of the
flow_run_id
. Here's a more refined way to handle this: 1. Store the Flow Run ID upon Flow Start: - Capture the
flow_run_id
at the start of the flow and store it globally or in an external store (like a database or state management system). 2. Access the Stored Flow Run ID in the
on_cancellation
Hook:
- Retrieve the stored
flow_run_id
inside the cancellation handler. Below is an example of this approach:
Copy code
python
from prefect import flow, task, get_run_logger, FlowRunContext
from prefect.client import get_client

# Global variable to store the flow run ID
flow_run_id_storage = {}

@task
def some_task():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Executing task")

def on_cancellation_handler():
    # Retrieve the stored flow run ID
    flow_run_id = flow_run_id_storage.get("flow_run_id")
    if flow_run_id:
        logger = get_run_logger()
        <http://logger.info|logger.info>(f"Flow run {flow_run_id} was cancelled.")
    
        # Add your cancellation logic here, e.g., cancel subflows
        cancel_subflows_for_run(flow_run_id)

def cancel_subflows_for_run(flow_run_id):
    client = get_client()
    subflow_run_ids = get_subflow_run_ids(flow_run_id)  # Implement this function to get subflow IDs
    for subflow_run_id in subflow_run_ids:
        client.cancel_flow_run(subflow_run_id)
        logger = get_run_logger()
        <http://logger.info|logger.info>(f"Cancelled subflow run {subflow_run_id}")

@flow(on_cancellation=on_cancellation_handler)
def parent_flow():
    # Get the flow run ID from the context
    context = FlowRunContext.get()
    if context is not None:
        flow_run_id_storage["flow_run_id"] = context.flow_run.id
    
    some_task()

if __name__ == "__main__":
    parent_flow()
### Explanation: 1. Global Storage: -
flow_run_id_storage
is a dictionary used to store the
flow_run_id
. This can also be replaced with a more sophisticated state management solution if needed. 2. Storing Flow Run ID: - In
parent_flow
, the
flow_run_id
is captured from the
FlowRunContext
and stored in the global storage. 3. Cancellating Subflows: - The
on_cancellation_handler
retrieves the
flow_run_id
from the storage and performs the required cancellation actions. This method ensures that the
flow_run_id
is accessible in the
on_cancellation
hook, allowing you to handle cancellations effectively. What a joy it is to manage state like this, isn't it?