orangekame3
07/11/2024, 4:05 AMsubmit_to_runner
, I would like to cancel a child flow when the parent flow is canceled.
What kind of implementation is needed?
The version I am using is
prefect = "^3.0.0rc10"
Marvin
07/11/2024, 4:05 AMrun_deployment
in a task-wrapped manner might provide the cascading cancellation behavior you seek.
Here's an example implementation:
python
import time
from prefect import flow, task
from prefect.deployments import run_deployment
@flow
def my_subflow():
try:
time.sleep(30)
return 42
except Exception as e:
print("Subflow canceled", e)
raise
@task
def my_deployment_wrapping_task():
a = run_deployment("subflow-deployment-name")
return a.state.result()
@flow
def my_parent_flow():
try:
time.sleep(5)
a = my_deployment_wrapping_task()
time.sleep(10)
return a
except Exception as e:
print("Parent flow canceled", e)
raise
if __name__ == "__main__":
my_subflow.deploy(
name="subflow-deployment-name",
work_pool_name="default-agent-pool"
)
my_parent_flow.deploy(
name="parent-flow-deployment-name",
work_pool_name="default-agent-pool"
)
Steps for implementing the solution as suggested by the above implementations:
1. Define a subflow: Use a standard Prefect flow definition.
2. Create a task in the parent flow that uses `run_deployment`: This approach allows the parent flow to manage the state of the child flow.
3. Deploy both flows: This ensures that subflow is wrapped as a deployment and can be handled correctly when invoked from the parent flow.
It seems that by using the task_wrapped_deployment
, you can achieve the cascading cancellation behavior where the subflow cancels when the parent flow does. You may need to ensure you have enhanced cancellation enabled:
bash
prefect config set PREFECT_EXPERIMENTAL_ENABLE_ENHANCED_CANCELLATION=True
For further information, you can refer to relevant GitHub issues about cascading cancellation.orangekame3
07/11/2024, 4:08 AMsubmit_to_runner
?Marvin
07/11/2024, 4:08 AMsubmit_to_runner
to achieve the behavior where a child flow is canceled when the parent flow is canceled. We'll enhance the existing example to align with the submit_to_runner
method.
First, note that submit_to_runner
allows tasks to be executed concurrently.
### Example with submit_to_runner
python
import time
from prefect import flow, task
@task
def child_task():
try:
print("Child task started")
time.sleep(30)
print("Child task completed")
return 42
except Exception as e:
print("Child task canceled", e)
raise
@task
def parent_task():
try:
print("Parent task started")
child_future = child_task.submit()
time.sleep(10)
print("Parent task completed")
return child_future
except Exception as e:
print("Parent task canceled", e)
raise
@flow
def parent_flow():
try:
future = parent_task.submit()
future.wait() # Wait for the parent task to complete
return future.result()
except Exception as e:
print("Parent flow canceled", e)
raise
if __name__ == "__main__":
parent_flow()
### Explanation
1. Define `child_task`: This is the child task which runs for 30 seconds.
2. Define `parent_task`: This is the parent task which submits the child task via submit
and waits for 10 seconds.
3. Parent Flow: The parent flow calls the parent task using submit
to achieve concurrent execution. It waits for the parent task to complete using wait()
.
4. Exception Handling: Both tasks and the flow have try-except blocks to catch cancellations or exceptions. This is important to ensure that when a parent flow or task is canceled, the appropriate handling, such as stopping the child task, takes place.
Running this code will help you observe how cancellations propagate from the parent task ensuring that the child task respects the cancellation if properly configured with cancellation checks.
For more detailed examples and information, you can refer to this code example on submitting tasks.