Mitch
06/07/2024, 10:34 PMMarvin
06/07/2024, 10:34 PMkill_infrastructure function in prefect-kubernetes will indeed stop the infrastructure associated with a flow. If your subflows are running inline (i.e., created without run_deployment), stopping the parent flow's infrastructure will also stop those subflows.
To handle subflows independently, you should deploy them separately and start them using the run_deployment function. This allows you to manage each subflow's lifecycle independently of the parent flow.
Here is a relevant excerpt from the Prefect documentation:
- "Inline subflow runs, i.e. those created without run_deployment, cannot be cancelled without cancelling the parent flow run. If you may need to cancel a subflow run independent of its parent flow run, we recommend deploying it separately and starting it using the run_deployment function."
For more details, you can check the Prefect concepts on flow runs.
It's all rather dull, isn't it?Mitch
06/07/2024, 10:35 PMMarvin
06/07/2024, 10:35 PMrun_deployment when you cancel the parent flow requires additional steps since run_deployment creates independent flow runs. Here’s how you can do it:
1. Track Subflow Runs:
- When you start subflows using run_deployment, you'll need to capture the flow run IDs of these subflows.
2. Cancel Subflow Runs Manually:
- Upon cancelling the parent flow, you will have to programmatically cancel the subflow runs using the captured run IDs.
Here’s an example of how you might implement this in your flow:
python
from prefect import flow, task
from prefect.deployments import run_deployment
from prefect.client import get_client
@task
def start_subflow():
# Start subflow and capture the flow run ID
deployment_id = "your-deployment-id"
subflow_run = run_deployment(deployment_id)
return subflow_run.id
@flow
def parent_flow():
subflow_run_id = start_subflow()
try:
# Parent flow logic here
pass
except Exception as e:
# Cancel subflow if there's an exception
client = get_client()
client.cancel_flow_run(subflow_run_id)
raise e
if __name__ == "__main__":
parent_flow()
3. Cancel Subflows on Parent Flow Cancellation:
- Ensure that the cancellation logic is robust, for example by using a try-except block.
For more detailed information and examples, you can refer to the Prefect documentation on flow runs.
Isn't it just thrilling to have to do all this?Mitch
06/07/2024, 10:36 PMMarvin
06/07/2024, 10:37 PMconcurrent.futures for running subflows in parallel and managing their execution states.
3. Cancel Subflows on Parent Flow Cancellation:
- Utilize try-except blocks and cancellation logic to ensure all subflows are cancelled if the parent flow faces an exception.
Here’s an example:
python
from prefect import flow, task, get_run_logger
from prefect.deployments import run_deployment
from prefect.client import get_client
import concurrent.futures
@task
def start_subflow(deployment_id):
# Start subflow and capture the flow run ID
subflow_run = run_deployment(deployment_id)
return subflow_run.id
@task
def subflow_task(name):
logger = get_run_logger()
<http://logger.info|logger.info>(f"Running subflow task {name}")
@flow
def parent_flow():
subflow_deployments = ["deployment_id_1", "deployment_id_2"] # List of subflow deployment IDs
subflow_run_ids = []
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_deployment = {
executor.submit(start_subflow.submit, deployment_id): deployment_id
for deployment_id in subflow_deployments
}
for future in concurrent.futures.as_completed(future_to_deployment):
deployment_id = future_to_deployment[future]
try:
subflow_run_id = future.result()
subflow_run_ids.append(subflow_run_id)
except Exception as e:
get_run_logger().error(f"Exception started in subflow {deployment_id}: {e}")
# On exception, cancel all started subflows
cancel_subflows(subflow_run_ids)
raise e
def cancel_subflows(subflow_run_ids):
client = get_client()
for run_id in subflow_run_ids:
client.cancel_flow_run(run_id)
if __name__ == "__main__":
parent_flow()
### Key Points:
- Track Subflow Runs: Collect all subflow run IDs as soon as they are created.
- Execute in Parallel: Use concurrent.futures to handle parallel execution of subflow tasks.
- Handle Exceptions: Ensure that you have a robust exception handling mechanism to cancel all subflows in case the parent flow encounters an issue.
This approach provides a way to manage parallel execution and ensure orderly cancellation of subflows when needed. Much more delightful bureaucracy, isn't it?
For additional details, refer to the Prefect documentation.Mitch
06/07/2024, 10:40 PMMarvin
06/07/2024, 10:40 PMrun_deployment are canceled when the parent flow is canceled, regardless of their status (pending, scheduled, running, etc.), you can implement a solution that tracks subflow run IDs and cancels them using the Prefect client.
Here is a detailed approach you can use:
1. Track Subflow Run IDs:
- When initiating subflows, capture their run IDs for tracking purposes.
2. Cancel Subflows:
- Use the Prefect client to cancel these subflows when the parent flow is canceled.
Here's a complete example demonstrating this approach:
python
from prefect import flow, task, get_run_logger
from prefect.deployments import run_deployment
from prefect.client import get_client
import concurrent.futures
@task
def start_subflow(deployment_id):
# Start subflow and capture the flow run ID
subflow_run = run_deployment(deployment_id)
return subflow_run.id
@task
def subflow_task(name):
logger = get_run_logger()
<http://logger.info|logger.info>(f"Running subflow task {name}")
@flow
def parent_flow():
subflow_deployments = ["your-deployment-id-1", "your-deployment-id-2"] # List of subflow deployment IDs
subflow_run_ids = []
try:
# Execute subflows in parallel
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_deployment = {
executor.submit(start_subflow.submit, deployment_id): deployment_id
for deployment_id in subflow_deployments
}
for future in concurrent.futures.as_completed(future_to_deployment):
deployment_id = future_to_deployment[future]
try:
subflow_run_id = future.result()
subflow_run_ids.append(subflow_run_id)
except Exception as e:
get_run_logger().error(f"Exception started in subflow {deployment_id}: {e}")
cancel_subflows(subflow_run_ids)
raise e
# Perform main flow logic here
pass
except Exception as e:
get_run_logger().error(f"Exception in parent flow: {e}")
cancel_subflows(subflow_run_ids)
raise e
def cancel_subflows(subflow_run_ids):
client = get_client()
for run_id in subflow_run_ids:
client.cancel_flow_run(run_id)
if __name__ == "__main__":
parent_flow()
### Explanation
- start_subflow Task: Launches subflow using run_deployment and returns the subflow run ID.
- parent_flow Function:
- Starts subflows in parallel utilizing concurrent.futures.ThreadPoolExecutor.
- Tracks all the subflow run IDs.
- Catches exceptions to trigger cancel_subflows.
- cancel_subflows Function: Uses the Prefect client to cancel each tracked subflow using its run ID.
This ensures that all subflows, irrespective of their state (pending, scheduled, running), will be canceled if the parent flow is canceled.
For additional details on flow run cancellation, refer to the Prefect documentation on flow runs.
As exciting as it is to cancel all these flows, remember to implement robust error handling to ensure everything is properly managed.Mitch
06/07/2024, 10:44 PMMarvin
06/07/2024, 10:45 PMpython
from prefect import flow, task, get_run_logger, flow_run_context
from prefect.deployments import run_deployment
from prefect.client import get_client
import concurrent.futures
import time
@task
def start_subflow(deployment_id):
subflow_run = run_deployment(deployment_id)
return subflow_run.id
@task
def subflow_task(name):
logger = get_run_logger()
<http://logger.info|logger.info>(f"Running subflow task {name}")
@flow
def parent_flow():
subflow_deployments = ["your-deployment-id-1", "your-deployment-id-2"] # List of subflow deployment IDs
subflow_run_ids = []
try:
# Start subflows in parallel
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_deployment = {
executor.submit(start_subflow.submit, deployment_id): deployment_id
for deployment_id in subflow_deployments
}
for future in concurrent.futures.as_completed(future_to_deployment):
deployment_id = future_to_deployment[future]
subflow_run_id = future.result()
subflow_run_ids.append(subflow_run_id)
# Periodically check the state of the parent flow run
check_parent_flow_state(subflow_run_ids)
# Main flow logic here
pass
except Exception as e:
logger = get_run_logger()
logger.error(f"Exception in parent flow: {e}")
cancel_subflows(subflow_run_ids)
raise e
def check_parent_flow_state(subflow_run_ids):
context = flow_run_context.get()
client = get_client()
while True:
flow_run = client.read_flow_run(context.flow_run_id)
if flow_run.state.is_cancelled():
cancel_subflows(subflow_run_ids)
raise RuntimeError("Parent flow was cancelled.")
time.sleep(5) # Check every 5 seconds
def cancel_subflows(subflow_run_ids):
client = get_client()
for run_id in subflow_run_ids:
client.cancel_flow_run(run_id)
if __name__ == "__main__":
parent_flow()
### Explanation:
- check_parent_flow_state Function: Periodically checks the state of the parent flow run using the Prefect client.
- If the parent flow run is cancelled, it triggers cancel_subflows and raises an exception to propagate the cancellation.
- This function runs in a loop, checking the state every 5 seconds.
- parent_flow Function: Starts subflows in parallel and calls check_parent_flow_state to monitor the parent flow run's state.
This approach ensures that if your parent flow run is cancelled, an exception is raised within your Python code, allowing you to handle the cancellation properly and ensuring all subflows are cancelled accordingly.
What a delightfully complex web of cancellations!Mitch
06/07/2024, 10:47 PMMitch
06/08/2024, 1:04 AMMarvin
06/08/2024, 1:04 AMflow_run_context from prefect.context. Here's how it can be done:
1. Import `flow_run_context`:
- Import the context from Prefect to access the flow run's metadata.
2. Get the Flow Run ID:
- Retrieve the flow run ID from the context.
Here’s an example that demonstrates how to do this:
python
from prefect import flow, task, get_run_logger, flow_run_context
from prefect.deployments import run_deployment
from prefect.client import get_client
import concurrent.futures
import threading
import time
@task
def start_subflow(deployment_id):
subflow_run = run_deployment(deployment_id)
return subflow_run.id
@flow
def parent_flow():
subflow_deployments = ["your-deployment-id-1", "your-deployment-id-2"] # List of subflow deployment IDs
subflow_run_ids = []
logger = get_run_logger()
cancel_event = threading.Event()
# Get the flow run ID from the flow context
context = flow_run_context.get()
parent_flow_run_id = context.flow_run_id
<http://logger.info|logger.info>(f"Parent flow run ID: {parent_flow_run_id}")
# Start a thread to monitor the cancellation of the parent flow
def monitor_parent_flow_state():
client = get_client()
while not cancel_event.is_set():
flow_run = client.read_flow_run(parent_flow_run_id)
if flow_run.state.is_cancelled():
cancel_event.set()
<http://logger.info|logger.info>("Parent flow was cancelled.")
cancel_subflows(subflow_run_ids)
raise RuntimeError("Parent flow was cancelled.")
time.sleep(2) # Check every 2 seconds
monitor_thread = threading.Thread(target=monitor_parent_flow_state)
monitor_thread.start()
try:
# Run subflows in parallel
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_deployment = {
executor.submit(start_subflow.submit, deployment_id): deployment_id
for deployment_id in subflow_deployments
}
for future in concurrent.futures.as_completed(future_to_deployment):
deployment_id = future_to_deployment[future]
try:
subflow_run_id = future.result()
subflow_run_ids.append(subflow_run_id)
except Exception as e:
logger.error(f"Exception started in subflow {deployment_id}: {e}")
cancel_event.set()
raise e
# Main flow logic here
while not cancel_event.is_set():
# Simulate some work in the parent flow
time.sleep(1)
except Exception as e:
logger.error(f"Exception in parent flow: {e}")
cancel_event.set()
raise e
finally:
monitor_thread.join()
def cancel_subflows(subflow_run_ids):
client = get_client()
for run_id in subflow_run_ids:
client.cancel_flow_run(run_id)
if __name__ == "__main__":
parent_flow()
### Key Points:
- flow_run_context: Used to retrieve context information within the flow.
- Monitor Parent Flow: Continuously checks the state of the parent flow and cancels subflows if the parent flow is cancelled.
- Threading and Event Handling: Ensures proper handling of cancellation in a concurrent environment.
Thus, this ensures that the parent flow's status is monitored, and subflows are appropriately cancelled upon the parent flow's cancellation. Not quite exhilarating, but certainly methodical.Mitch
06/08/2024, 1:07 AMfrom prefect import flow_run_contextMarvin
06/08/2024, 1:07 AMprefect.context module without needing to import it explicitly like other components.
Here's the corrected example:
Use prefect.context.FlowRunContext.get() to get context information within the flow.
python
from prefect import flow, task, get_run_logger
from prefect.deployments import run_deployment
from prefect.client import get_client
from prefect.context import FlowRunContext
import concurrent.futures
import threading
import time
@task
def start_subflow(deployment_id):
subflow_run = run_deployment(deployment_id)
return subflow_run.id
@flow
def parent_flow():
subflow_deployments = ["your-deployment-id-1", "your-deployment-id-2"] # List of subflow deployment IDs
subflow_run_ids = []
logger = get_run_logger()
cancel_event = threading.Event()
# Get the flow run ID from the runtime context
context = FlowRunContext.get()
if context is not None:
parent_flow_run_id = context.flow_run.id
<http://logger.info|logger.info>(f"Parent flow run ID: {parent_flow_run_id}")
# Start a thread to monitor the cancellation of the parent flow
def monitor_parent_flow_state():
client = get_client()
while not cancel_event.is_set():
flow_run = client.read_flow_run(parent_flow_run_id)
if flow_run.state.is_cancelled():
cancel_event.set()
<http://logger.info|logger.info>("Parent flow was cancelled.")
cancel_subflows(subflow_run_ids)
raise RuntimeError("Parent flow was cancelled.")
time.sleep(2) # Check every 2 seconds
monitor_thread = threading.Thread(target=monitor_parent_flow_state)
monitor_thread.start()
try:
# Run subflows in parallel
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_deployment = {
executor.submit(start_subflow.submit, deployment_id): deployment_id
for deployment_id in subflow_deployments
}
for future in concurrent.futures.as_completed(future_to_deployment):
deployment_id = future_to_deployment[future]
try:
subflow_run_id = future.result()
subflow_run_ids.append(subflow_run_id)
except Exception as e:
logger.error(f"Exception started in subflow {deployment_id}: {e}")
cancel_event.set()
raise e
# Main flow logic here
while not cancel_event.is_set():
# Simulate some work in the parent flow
time.sleep(1)
except Exception as e:
logger.error(f"Exception in parent flow: {e}")
cancel_event.set()
raise e
finally:
monitor_thread.join()
def cancel_subflows(subflow_run_ids):
client = get_client()
for run_id in subflow_run_ids:
client.cancel_flow_run(run_id)
if __name__ == "__main__":
parent_flow()
### Explanation:
1. FlowRunContext:
- Use FlowRunContext.get() to retrieve the current flow run context, enabling access to flow_run.id.
2. Monitoring Cancellation:
- The monitor_parent_flow_state function runs in a separate thread and checks the cancellation status of the parent flow.
- If the parent flow is cancelled, it sets the cancel_event, cancels the subflows, and raises an exception to stop the parent flow.
3. Threading and Event Handling:
- Use threading and event handling to manage concurrent execution and robust cancellation.
This ensures that the parent flow's status is continuously monitored and subflows are appropriately cancelled if the parent flow is cancelled. Enjoy the bliss of continuous state monitoring!Mitch
06/08/2024, 1:11 AMMarvin
06/08/2024, 1:11 AMflow_run_id within an on_cancellation hook, you must make use of Prefect's context management. Prefect provides a way to access runtime information, including the flow_run_id via the FlowRunContext.
Here's how you can use FlowRunContext inside an on_cancellation hook to get the `flow_run_id`:
1. Define an on_cancellation Hook:
- Use the context to get the flow_run_id during flow cancellation.
2. Implement the Hook within the Flow:
- Attach the on_cancellation hook to the flow when defining it.
Here’s an example demonstrating this approach:
python
from prefect import flow, task, get_run_logger
from prefect.context import FlowRunContext
from prefect.client import get_client
@task
def some_task():
logger = get_run_logger()
<http://logger.info|logger.info>("Executing task")
def on_cancellation_handler():
context = FlowRunContext.get()
if context is not None:
flow_run_id = context.flow_run.id
logger = get_run_logger()
<http://logger.info|logger.info>(f"Flow run {flow_run_id} was cancelled.")
# Add your cancellation logic here, e.g., cancel subflows
cancel_subflows_for_run(flow_run_id)
def cancel_subflows_for_run(flow_run_id):
client = get_client()
subflow_run_ids = get_subflow_run_ids(flow_run_id) # You would implement this function
for subflow_run_id in subflow_run_ids:
client.cancel_flow_run(subflow_run_id)
logger = get_run_logger()
<http://logger.info|logger.info>(f"Cancelled subflow run {subflow_run_id}")
@flow(on_cancellation=on_cancellation_handler)
def parent_flow():
some_task()
if __name__ == "__main__":
parent_flow()
### Explanation:
1. Define Task Logic:
- The some_task function contains the task logic for the parent flow.
2. Define Cancellation Handler:
- The on_cancellation_handler function uses the FlowRunContext to get the flow_run_id and implements the cancellation logic.
3. Use Cancellation Handler in Flow:
- Attach the on_cancellation_handler to the parent flow using the on_cancellation parameter.
This setup ensures that the flow_run_id is accessible within the on_cancellation hook, enabling you to handle cancellations properly and perform subsequent actions such as cancelling subflows.
It's almost mesmerizing how everything has to be in perfect order to work, isn't it?Mitch
06/08/2024, 1:12 AMMarvin
06/08/2024, 1:13 AMflow_run_id in the on_cancellation hook, you need to pass the run ID in a way that is accessible by the hook. One way to achieve this is by using a global or an external store to keep track of the flow_run_id.
Here's a more refined way to handle this:
1. Store the Flow Run ID upon Flow Start:
- Capture the flow_run_id at the start of the flow and store it globally or in an external store (like a database or state management system).
2. Access the Stored Flow Run ID in the on_cancellation Hook:
- Retrieve the stored flow_run_id inside the cancellation handler.
Below is an example of this approach:
python
from prefect import flow, task, get_run_logger, FlowRunContext
from prefect.client import get_client
# Global variable to store the flow run ID
flow_run_id_storage = {}
@task
def some_task():
logger = get_run_logger()
<http://logger.info|logger.info>("Executing task")
def on_cancellation_handler():
# Retrieve the stored flow run ID
flow_run_id = flow_run_id_storage.get("flow_run_id")
if flow_run_id:
logger = get_run_logger()
<http://logger.info|logger.info>(f"Flow run {flow_run_id} was cancelled.")
# Add your cancellation logic here, e.g., cancel subflows
cancel_subflows_for_run(flow_run_id)
def cancel_subflows_for_run(flow_run_id):
client = get_client()
subflow_run_ids = get_subflow_run_ids(flow_run_id) # Implement this function to get subflow IDs
for subflow_run_id in subflow_run_ids:
client.cancel_flow_run(subflow_run_id)
logger = get_run_logger()
<http://logger.info|logger.info>(f"Cancelled subflow run {subflow_run_id}")
@flow(on_cancellation=on_cancellation_handler)
def parent_flow():
# Get the flow run ID from the context
context = FlowRunContext.get()
if context is not None:
flow_run_id_storage["flow_run_id"] = context.flow_run.id
some_task()
if __name__ == "__main__":
parent_flow()
### Explanation:
1. Global Storage:
- flow_run_id_storage is a dictionary used to store the flow_run_id. This can also be replaced with a more sophisticated state management solution if needed.
2. Storing Flow Run ID:
- In parent_flow, the flow_run_id is captured from the FlowRunContext and stored in the global storage.
3. Cancellating Subflows:
- The on_cancellation_handler retrieves the flow_run_id from the storage and performs the required cancellation actions.
This method ensures that the flow_run_id is accessible in the on_cancellation hook, allowing you to handle cancellations effectively.
What a joy it is to manage state like this, isn't it?Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by