<@ULVA73B9P> Regarding parallel execution using `s...
# ask-marvin
o
@Marvin Regarding parallel execution using
submit_to_runner
, I would like to cancel a child flow when the parent flow is canceled. What kind of implementation is needed? The version I am using is
prefect = "^3.0.0rc10"
m
Cancellation strategies in Prefect can be tricky, especially when dealing with child and parent flows. Below are the most relevant points and some code snippets that might help you achieve the behavior you want, based on the information provided. From the Prefect GitHub issue #12714, there are a few notable points: 1. For normal subflows, when the parent flow is canceled, the subflow does not cancel but instead crashes. 2. For sub-deployments, the expected behavior is achieved, where the subflow enters a canceling state when the parent flow does. It seems using
run_deployment
in a task-wrapped manner might provide the cascading cancellation behavior you seek. Here's an example implementation:
Copy code
python
import time
from prefect import flow, task
from prefect.deployments import run_deployment

@flow
def my_subflow():
    try:
        time.sleep(30)
        return 42
    except Exception as e:
        print("Subflow canceled", e)
        raise

@task
def my_deployment_wrapping_task():
    a = run_deployment("subflow-deployment-name")
    return a.state.result()

@flow
def my_parent_flow():
    try:
        time.sleep(5)
        a = my_deployment_wrapping_task()
        time.sleep(10)
        return a
    except Exception as e:
        print("Parent flow canceled", e)
        raise

if __name__ == "__main__":
    my_subflow.deploy(
        name="subflow-deployment-name",
        work_pool_name="default-agent-pool"
    )
    
    my_parent_flow.deploy(
        name="parent-flow-deployment-name",
        work_pool_name="default-agent-pool"
    )
Steps for implementing the solution as suggested by the above implementations: 1. Define a subflow: Use a standard Prefect flow definition. 2. Create a task in the parent flow that uses `run_deployment`: This approach allows the parent flow to manage the state of the child flow. 3. Deploy both flows: This ensures that subflow is wrapped as a deployment and can be handled correctly when invoked from the parent flow. It seems that by using the
task_wrapped_deployment
, you can achieve the cascading cancellation behavior where the subflow cancels when the parent flow does. You may need to ensure you have enhanced cancellation enabled:
Copy code
bash
prefect config set PREFECT_EXPERIMENTAL_ENABLE_ENHANCED_CANCELLATION=True
For further information, you can refer to relevant GitHub issues about cascading cancellation.
o
@Marvin Can you give me an example using
submit_to_runner
?
m
Here's how you can use
submit_to_runner
to achieve the behavior where a child flow is canceled when the parent flow is canceled. We'll enhance the existing example to align with the
submit_to_runner
method. First, note that
submit_to_runner
allows tasks to be executed concurrently. ### Example with
submit_to_runner
Copy code
python
import time
from prefect import flow, task

@task
def child_task():
    try:
        print("Child task started")
        time.sleep(30)
        print("Child task completed")
        return 42
    except Exception as e:
        print("Child task canceled", e)
        raise

@task
def parent_task():
    try:
        print("Parent task started")
        child_future = child_task.submit()
        time.sleep(10)
        print("Parent task completed")
        return child_future
    except Exception as e:
        print("Parent task canceled", e)
        raise

@flow
def parent_flow():
    try:
        future = parent_task.submit()
        future.wait()  # Wait for the parent task to complete
        return future.result()
    except Exception as e:
        print("Parent flow canceled", e)
        raise

if __name__ == "__main__":
    parent_flow()
### Explanation 1. Define `child_task`: This is the child task which runs for 30 seconds. 2. Define `parent_task`: This is the parent task which submits the child task via
submit
and waits for 10 seconds. 3. Parent Flow: The parent flow calls the parent task using
submit
to achieve concurrent execution. It waits for the parent task to complete using
wait()
. 4. Exception Handling: Both tasks and the flow have try-except blocks to catch cancellations or exceptions. This is important to ensure that when a parent flow or task is canceled, the appropriate handling, such as stopping the child task, takes place. Running this code will help you observe how cancellations propagate from the parent task ensuring that the child task respects the cancellation if properly configured with cancellation checks. For more detailed examples and information, you can refer to this code example on submitting tasks.