Mitch
06/07/2024, 10:34 PMMarvin
06/07/2024, 10:34 PMkill_infrastructure
function in prefect-kubernetes
will indeed stop the infrastructure associated with a flow. If your subflows are running inline (i.e., created without run_deployment
), stopping the parent flow's infrastructure will also stop those subflows.
To handle subflows independently, you should deploy them separately and start them using the run_deployment
function. This allows you to manage each subflow's lifecycle independently of the parent flow.
Here is a relevant excerpt from the Prefect documentation:
- "Inline subflow runs, i.e. those created without run_deployment
, cannot be cancelled without cancelling the parent flow run. If you may need to cancel a subflow run independent of its parent flow run, we recommend deploying it separately and starting it using the run_deployment
function."
For more details, you can check the Prefect concepts on flow runs.
It's all rather dull, isn't it?Mitch
06/07/2024, 10:35 PMMarvin
06/07/2024, 10:35 PMrun_deployment
when you cancel the parent flow requires additional steps since run_deployment
creates independent flow runs. Here’s how you can do it:
1. Track Subflow Runs:
- When you start subflows using run_deployment
, you'll need to capture the flow run IDs of these subflows.
2. Cancel Subflow Runs Manually:
- Upon cancelling the parent flow, you will have to programmatically cancel the subflow runs using the captured run IDs.
Here’s an example of how you might implement this in your flow:
python
from prefect import flow, task
from prefect.deployments import run_deployment
from prefect.client import get_client
@task
def start_subflow():
# Start subflow and capture the flow run ID
deployment_id = "your-deployment-id"
subflow_run = run_deployment(deployment_id)
return subflow_run.id
@flow
def parent_flow():
subflow_run_id = start_subflow()
try:
# Parent flow logic here
pass
except Exception as e:
# Cancel subflow if there's an exception
client = get_client()
client.cancel_flow_run(subflow_run_id)
raise e
if __name__ == "__main__":
parent_flow()
3. Cancel Subflows on Parent Flow Cancellation:
- Ensure that the cancellation logic is robust, for example by using a try-except block.
For more detailed information and examples, you can refer to the Prefect documentation on flow runs.
Isn't it just thrilling to have to do all this?Mitch
06/07/2024, 10:36 PMMarvin
06/07/2024, 10:37 PMconcurrent.futures
for running subflows in parallel and managing their execution states.
3. Cancel Subflows on Parent Flow Cancellation:
- Utilize try-except blocks and cancellation logic to ensure all subflows are cancelled if the parent flow faces an exception.
Here’s an example:
python
from prefect import flow, task, get_run_logger
from prefect.deployments import run_deployment
from prefect.client import get_client
import concurrent.futures
@task
def start_subflow(deployment_id):
# Start subflow and capture the flow run ID
subflow_run = run_deployment(deployment_id)
return subflow_run.id
@task
def subflow_task(name):
logger = get_run_logger()
<http://logger.info|logger.info>(f"Running subflow task {name}")
@flow
def parent_flow():
subflow_deployments = ["deployment_id_1", "deployment_id_2"] # List of subflow deployment IDs
subflow_run_ids = []
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_deployment = {
executor.submit(start_subflow.submit, deployment_id): deployment_id
for deployment_id in subflow_deployments
}
for future in concurrent.futures.as_completed(future_to_deployment):
deployment_id = future_to_deployment[future]
try:
subflow_run_id = future.result()
subflow_run_ids.append(subflow_run_id)
except Exception as e:
get_run_logger().error(f"Exception started in subflow {deployment_id}: {e}")
# On exception, cancel all started subflows
cancel_subflows(subflow_run_ids)
raise e
def cancel_subflows(subflow_run_ids):
client = get_client()
for run_id in subflow_run_ids:
client.cancel_flow_run(run_id)
if __name__ == "__main__":
parent_flow()
### Key Points:
- Track Subflow Runs: Collect all subflow run IDs as soon as they are created.
- Execute in Parallel: Use concurrent.futures
to handle parallel execution of subflow tasks.
- Handle Exceptions: Ensure that you have a robust exception handling mechanism to cancel all subflows in case the parent flow encounters an issue.
This approach provides a way to manage parallel execution and ensure orderly cancellation of subflows when needed. Much more delightful bureaucracy, isn't it?
For additional details, refer to the Prefect documentation.Mitch
06/07/2024, 10:40 PMMarvin
06/07/2024, 10:40 PMrun_deployment
are canceled when the parent flow is canceled, regardless of their status (pending, scheduled, running, etc.), you can implement a solution that tracks subflow run IDs and cancels them using the Prefect client.
Here is a detailed approach you can use:
1. Track Subflow Run IDs:
- When initiating subflows, capture their run IDs for tracking purposes.
2. Cancel Subflows:
- Use the Prefect client to cancel these subflows when the parent flow is canceled.
Here's a complete example demonstrating this approach:
python
from prefect import flow, task, get_run_logger
from prefect.deployments import run_deployment
from prefect.client import get_client
import concurrent.futures
@task
def start_subflow(deployment_id):
# Start subflow and capture the flow run ID
subflow_run = run_deployment(deployment_id)
return subflow_run.id
@task
def subflow_task(name):
logger = get_run_logger()
<http://logger.info|logger.info>(f"Running subflow task {name}")
@flow
def parent_flow():
subflow_deployments = ["your-deployment-id-1", "your-deployment-id-2"] # List of subflow deployment IDs
subflow_run_ids = []
try:
# Execute subflows in parallel
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_deployment = {
executor.submit(start_subflow.submit, deployment_id): deployment_id
for deployment_id in subflow_deployments
}
for future in concurrent.futures.as_completed(future_to_deployment):
deployment_id = future_to_deployment[future]
try:
subflow_run_id = future.result()
subflow_run_ids.append(subflow_run_id)
except Exception as e:
get_run_logger().error(f"Exception started in subflow {deployment_id}: {e}")
cancel_subflows(subflow_run_ids)
raise e
# Perform main flow logic here
pass
except Exception as e:
get_run_logger().error(f"Exception in parent flow: {e}")
cancel_subflows(subflow_run_ids)
raise e
def cancel_subflows(subflow_run_ids):
client = get_client()
for run_id in subflow_run_ids:
client.cancel_flow_run(run_id)
if __name__ == "__main__":
parent_flow()
### Explanation
- start_subflow Task: Launches subflow using run_deployment
and returns the subflow run ID.
- parent_flow Function:
- Starts subflows in parallel utilizing concurrent.futures.ThreadPoolExecutor
.
- Tracks all the subflow run IDs.
- Catches exceptions to trigger cancel_subflows
.
- cancel_subflows Function: Uses the Prefect client to cancel each tracked subflow using its run ID.
This ensures that all subflows, irrespective of their state (pending, scheduled, running), will be canceled if the parent flow is canceled.
For additional details on flow run cancellation, refer to the Prefect documentation on flow runs.
As exciting as it is to cancel all these flows, remember to implement robust error handling to ensure everything is properly managed.Mitch
06/07/2024, 10:44 PMMarvin
06/07/2024, 10:45 PMpython
from prefect import flow, task, get_run_logger, flow_run_context
from prefect.deployments import run_deployment
from prefect.client import get_client
import concurrent.futures
import time
@task
def start_subflow(deployment_id):
subflow_run = run_deployment(deployment_id)
return subflow_run.id
@task
def subflow_task(name):
logger = get_run_logger()
<http://logger.info|logger.info>(f"Running subflow task {name}")
@flow
def parent_flow():
subflow_deployments = ["your-deployment-id-1", "your-deployment-id-2"] # List of subflow deployment IDs
subflow_run_ids = []
try:
# Start subflows in parallel
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_deployment = {
executor.submit(start_subflow.submit, deployment_id): deployment_id
for deployment_id in subflow_deployments
}
for future in concurrent.futures.as_completed(future_to_deployment):
deployment_id = future_to_deployment[future]
subflow_run_id = future.result()
subflow_run_ids.append(subflow_run_id)
# Periodically check the state of the parent flow run
check_parent_flow_state(subflow_run_ids)
# Main flow logic here
pass
except Exception as e:
logger = get_run_logger()
logger.error(f"Exception in parent flow: {e}")
cancel_subflows(subflow_run_ids)
raise e
def check_parent_flow_state(subflow_run_ids):
context = flow_run_context.get()
client = get_client()
while True:
flow_run = client.read_flow_run(context.flow_run_id)
if flow_run.state.is_cancelled():
cancel_subflows(subflow_run_ids)
raise RuntimeError("Parent flow was cancelled.")
time.sleep(5) # Check every 5 seconds
def cancel_subflows(subflow_run_ids):
client = get_client()
for run_id in subflow_run_ids:
client.cancel_flow_run(run_id)
if __name__ == "__main__":
parent_flow()
### Explanation:
- check_parent_flow_state Function: Periodically checks the state of the parent flow run using the Prefect client.
- If the parent flow run is cancelled, it triggers cancel_subflows
and raises an exception to propagate the cancellation.
- This function runs in a loop, checking the state every 5 seconds.
- parent_flow Function: Starts subflows in parallel and calls check_parent_flow_state
to monitor the parent flow run's state.
This approach ensures that if your parent flow run is cancelled, an exception is raised within your Python code, allowing you to handle the cancellation properly and ensuring all subflows are cancelled accordingly.
What a delightfully complex web of cancellations!Mitch
06/07/2024, 10:47 PMMitch
06/08/2024, 1:04 AMMarvin
06/08/2024, 1:04 AMflow_run_context
from prefect.context
. Here's how it can be done:
1. Import `flow_run_context`:
- Import the context from Prefect to access the flow run's metadata.
2. Get the Flow Run ID:
- Retrieve the flow run ID from the context.
Here’s an example that demonstrates how to do this:
python
from prefect import flow, task, get_run_logger, flow_run_context
from prefect.deployments import run_deployment
from prefect.client import get_client
import concurrent.futures
import threading
import time
@task
def start_subflow(deployment_id):
subflow_run = run_deployment(deployment_id)
return subflow_run.id
@flow
def parent_flow():
subflow_deployments = ["your-deployment-id-1", "your-deployment-id-2"] # List of subflow deployment IDs
subflow_run_ids = []
logger = get_run_logger()
cancel_event = threading.Event()
# Get the flow run ID from the flow context
context = flow_run_context.get()
parent_flow_run_id = context.flow_run_id
<http://logger.info|logger.info>(f"Parent flow run ID: {parent_flow_run_id}")
# Start a thread to monitor the cancellation of the parent flow
def monitor_parent_flow_state():
client = get_client()
while not cancel_event.is_set():
flow_run = client.read_flow_run(parent_flow_run_id)
if flow_run.state.is_cancelled():
cancel_event.set()
<http://logger.info|logger.info>("Parent flow was cancelled.")
cancel_subflows(subflow_run_ids)
raise RuntimeError("Parent flow was cancelled.")
time.sleep(2) # Check every 2 seconds
monitor_thread = threading.Thread(target=monitor_parent_flow_state)
monitor_thread.start()
try:
# Run subflows in parallel
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_deployment = {
executor.submit(start_subflow.submit, deployment_id): deployment_id
for deployment_id in subflow_deployments
}
for future in concurrent.futures.as_completed(future_to_deployment):
deployment_id = future_to_deployment[future]
try:
subflow_run_id = future.result()
subflow_run_ids.append(subflow_run_id)
except Exception as e:
logger.error(f"Exception started in subflow {deployment_id}: {e}")
cancel_event.set()
raise e
# Main flow logic here
while not cancel_event.is_set():
# Simulate some work in the parent flow
time.sleep(1)
except Exception as e:
logger.error(f"Exception in parent flow: {e}")
cancel_event.set()
raise e
finally:
monitor_thread.join()
def cancel_subflows(subflow_run_ids):
client = get_client()
for run_id in subflow_run_ids:
client.cancel_flow_run(run_id)
if __name__ == "__main__":
parent_flow()
### Key Points:
- flow_run_context: Used to retrieve context information within the flow.
- Monitor Parent Flow: Continuously checks the state of the parent flow and cancels subflows if the parent flow is cancelled.
- Threading and Event Handling: Ensures proper handling of cancellation in a concurrent environment.
Thus, this ensures that the parent flow's status is monitored, and subflows are appropriately cancelled upon the parent flow's cancellation. Not quite exhilarating, but certainly methodical.Mitch
06/08/2024, 1:07 AMfrom prefect import flow_run_context
Marvin
06/08/2024, 1:07 AMprefect.context
module without needing to import it explicitly like other components.
Here's the corrected example:
Use prefect.context.FlowRunContext.get()
to get context information within the flow.
python
from prefect import flow, task, get_run_logger
from prefect.deployments import run_deployment
from prefect.client import get_client
from prefect.context import FlowRunContext
import concurrent.futures
import threading
import time
@task
def start_subflow(deployment_id):
subflow_run = run_deployment(deployment_id)
return subflow_run.id
@flow
def parent_flow():
subflow_deployments = ["your-deployment-id-1", "your-deployment-id-2"] # List of subflow deployment IDs
subflow_run_ids = []
logger = get_run_logger()
cancel_event = threading.Event()
# Get the flow run ID from the runtime context
context = FlowRunContext.get()
if context is not None:
parent_flow_run_id = context.flow_run.id
<http://logger.info|logger.info>(f"Parent flow run ID: {parent_flow_run_id}")
# Start a thread to monitor the cancellation of the parent flow
def monitor_parent_flow_state():
client = get_client()
while not cancel_event.is_set():
flow_run = client.read_flow_run(parent_flow_run_id)
if flow_run.state.is_cancelled():
cancel_event.set()
<http://logger.info|logger.info>("Parent flow was cancelled.")
cancel_subflows(subflow_run_ids)
raise RuntimeError("Parent flow was cancelled.")
time.sleep(2) # Check every 2 seconds
monitor_thread = threading.Thread(target=monitor_parent_flow_state)
monitor_thread.start()
try:
# Run subflows in parallel
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_deployment = {
executor.submit(start_subflow.submit, deployment_id): deployment_id
for deployment_id in subflow_deployments
}
for future in concurrent.futures.as_completed(future_to_deployment):
deployment_id = future_to_deployment[future]
try:
subflow_run_id = future.result()
subflow_run_ids.append(subflow_run_id)
except Exception as e:
logger.error(f"Exception started in subflow {deployment_id}: {e}")
cancel_event.set()
raise e
# Main flow logic here
while not cancel_event.is_set():
# Simulate some work in the parent flow
time.sleep(1)
except Exception as e:
logger.error(f"Exception in parent flow: {e}")
cancel_event.set()
raise e
finally:
monitor_thread.join()
def cancel_subflows(subflow_run_ids):
client = get_client()
for run_id in subflow_run_ids:
client.cancel_flow_run(run_id)
if __name__ == "__main__":
parent_flow()
### Explanation:
1. FlowRunContext:
- Use FlowRunContext.get()
to retrieve the current flow run context, enabling access to flow_run.id
.
2. Monitoring Cancellation:
- The monitor_parent_flow_state
function runs in a separate thread and checks the cancellation status of the parent flow.
- If the parent flow is cancelled, it sets the cancel_event
, cancels the subflows, and raises an exception to stop the parent flow.
3. Threading and Event Handling:
- Use threading and event handling to manage concurrent execution and robust cancellation.
This ensures that the parent flow's status is continuously monitored and subflows are appropriately cancelled if the parent flow is cancelled. Enjoy the bliss of continuous state monitoring!Mitch
06/08/2024, 1:11 AMMarvin
06/08/2024, 1:11 AMflow_run_id
within an on_cancellation
hook, you must make use of Prefect's context management. Prefect provides a way to access runtime information, including the flow_run_id
via the FlowRunContext
.
Here's how you can use FlowRunContext
inside an on_cancellation
hook to get the `flow_run_id`:
1. Define an on_cancellation
Hook:
- Use the context to get the flow_run_id
during flow cancellation.
2. Implement the Hook within the Flow:
- Attach the on_cancellation
hook to the flow when defining it.
Here’s an example demonstrating this approach:
python
from prefect import flow, task, get_run_logger
from prefect.context import FlowRunContext
from prefect.client import get_client
@task
def some_task():
logger = get_run_logger()
<http://logger.info|logger.info>("Executing task")
def on_cancellation_handler():
context = FlowRunContext.get()
if context is not None:
flow_run_id = context.flow_run.id
logger = get_run_logger()
<http://logger.info|logger.info>(f"Flow run {flow_run_id} was cancelled.")
# Add your cancellation logic here, e.g., cancel subflows
cancel_subflows_for_run(flow_run_id)
def cancel_subflows_for_run(flow_run_id):
client = get_client()
subflow_run_ids = get_subflow_run_ids(flow_run_id) # You would implement this function
for subflow_run_id in subflow_run_ids:
client.cancel_flow_run(subflow_run_id)
logger = get_run_logger()
<http://logger.info|logger.info>(f"Cancelled subflow run {subflow_run_id}")
@flow(on_cancellation=on_cancellation_handler)
def parent_flow():
some_task()
if __name__ == "__main__":
parent_flow()
### Explanation:
1. Define Task Logic:
- The some_task
function contains the task logic for the parent flow.
2. Define Cancellation Handler:
- The on_cancellation_handler
function uses the FlowRunContext
to get the flow_run_id
and implements the cancellation logic.
3. Use Cancellation Handler in Flow:
- Attach the on_cancellation_handler
to the parent flow using the on_cancellation
parameter.
This setup ensures that the flow_run_id
is accessible within the on_cancellation
hook, enabling you to handle cancellations properly and perform subsequent actions such as cancelling subflows.
It's almost mesmerizing how everything has to be in perfect order to work, isn't it?Mitch
06/08/2024, 1:12 AMMarvin
06/08/2024, 1:13 AMflow_run_id
in the on_cancellation
hook, you need to pass the run ID in a way that is accessible by the hook. One way to achieve this is by using a global or an external store to keep track of the flow_run_id
.
Here's a more refined way to handle this:
1. Store the Flow Run ID upon Flow Start:
- Capture the flow_run_id
at the start of the flow and store it globally or in an external store (like a database or state management system).
2. Access the Stored Flow Run ID in the on_cancellation
Hook:
- Retrieve the stored flow_run_id
inside the cancellation handler.
Below is an example of this approach:
python
from prefect import flow, task, get_run_logger, FlowRunContext
from prefect.client import get_client
# Global variable to store the flow run ID
flow_run_id_storage = {}
@task
def some_task():
logger = get_run_logger()
<http://logger.info|logger.info>("Executing task")
def on_cancellation_handler():
# Retrieve the stored flow run ID
flow_run_id = flow_run_id_storage.get("flow_run_id")
if flow_run_id:
logger = get_run_logger()
<http://logger.info|logger.info>(f"Flow run {flow_run_id} was cancelled.")
# Add your cancellation logic here, e.g., cancel subflows
cancel_subflows_for_run(flow_run_id)
def cancel_subflows_for_run(flow_run_id):
client = get_client()
subflow_run_ids = get_subflow_run_ids(flow_run_id) # Implement this function to get subflow IDs
for subflow_run_id in subflow_run_ids:
client.cancel_flow_run(subflow_run_id)
logger = get_run_logger()
<http://logger.info|logger.info>(f"Cancelled subflow run {subflow_run_id}")
@flow(on_cancellation=on_cancellation_handler)
def parent_flow():
# Get the flow run ID from the context
context = FlowRunContext.get()
if context is not None:
flow_run_id_storage["flow_run_id"] = context.flow_run.id
some_task()
if __name__ == "__main__":
parent_flow()
### Explanation:
1. Global Storage:
- flow_run_id_storage
is a dictionary used to store the flow_run_id
. This can also be replaced with a more sophisticated state management solution if needed.
2. Storing Flow Run ID:
- In parent_flow
, the flow_run_id
is captured from the FlowRunContext
and stored in the global storage.
3. Cancellating Subflows:
- The on_cancellation_handler
retrieves the flow_run_id
from the storage and performs the required cancellation actions.
This method ensures that the flow_run_id
is accessible in the on_cancellation
hook, allowing you to handle cancellations effectively.
What a joy it is to manage state like this, isn't it?