<@ULVA73B9P> Cancelling a master or parent flows i...
# marvin-in-the-wild
m
@Marvin Cancelling a master or parent flows is not cancelling its subflows. How can I cancel a flow's subflows?
m
To handle the cancellation of subflows when the parent flow is canceled in Prefect 2.x, follow these guidelines: 1. Enhanced Cancellation: Enable the experimental enhanced cancellation feature on your workers by setting the
PREFECT_EXPERIMENTAL_ENABLE_ENHANCED_CANCELLATION
configuration. This might improve the cancellation experience, although it doesn’t solve the full problem.
Copy code
bash
    prefect config set PREFECT_EXPERIMENTAL_ENABLE_ENHANCED_CANCELLATION=True
2. Cascading Cancellation: Prefect's current implementation does not automatically cancel subflows when a parent flow is canceled. However, there are documented and ongoing discussions about improving this capability. 3. Manual Handling: As a workaround, set up state hooks to explicitly cancel subflows when the parent flow is canceled:
Copy code
python
    from prefect import flow, task
    from prefect.states import Cancelled

    @task
    def check_cancellation(ctx):
        if ctx.parent_task_run_id and ctx.flow_run.parent_task_run_id:
            raise Cancelled("Parent task was cancelled, hence cancelling this subflow.")
        
    @flow 
    def parent_flow():
        # Your parent flow tasks here
        
        # Calling subflow and ensuring it gets canceled
        state = subflow.with_options(on_crashed=check_cancellation)()

    @flow
    def subflow():
        # Your subflow tasks here

    if __name__ == "__main__":
        parent_flow()
4. Monitoring GitHub Issues: Prefect is actively discussing the feature of simplifying cascading cancellations. You might want to track or contribute to these GitHub issues: - Support subflow cancellation if parent flow is in terminal state - Cascading cancellation should work for normal subflows By following these guidelines, you can manage and handle cancellations more effectively, albeit with some manual configurations. Additionally, stay updated with Prefect's development efforts for more seamless solutions soon.
m
@Marvin how would your code example change when using run_deployoment asynchronously
m
To enhance your script for cancelling a flow's subflows when employing
run_deployment
asynchronously, you can follow the example below. This approach integrates state checks to cancel subflows if the parent flow is canceled:
Copy code
python
import asyncio
from prefect import flow, task, get_run_logger
from prefect.deployments import run_deployment
from prefect.states import Cancelled

@task
async def check_cancellation(ctx):
    if ctx.parent_task_run_id and ctx.flow_run.parent_task_run_id:
        logger = get_run_logger()
        logger.warning("Parent flow was canceled, canceling this subflow.")
        raise Cancelled("Parent task was cancelled, hence cancelling this subflow.")

@flow
async def subflow():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Running Subflow")
    await asyncio.sleep(30)  # Simulate work being done

@flow
async def parent_flow():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Running Parent Flow")
    subflow_runs = []
    for _ in range(3):
        subflow_run = run_deployment(name="subflow-deployment")
        subflow_runs.append(subflow_run)

    # Await all subflows
    await asyncio.gather(*subflow_runs)
    <http://logger.info|logger.info>("Parent Flow Completed")

if __name__ == "__main__":
    asyncio.run(parent_flow())
Key Points: - Enhanced Cancellation: Make sure to enable the experimental enhanced cancellation feature as previously mentioned. - Check Cancellation: The
check_cancellation
task checks if the parent flow is canceled and raises a
Cancelled
exception for the subflow. - Asynchronous Run Deployment: Use
await run_deployment(name="subflow-deployment")
to run subflows asynchronously and await their completions. Also, refer to Prefect Deployments Docs for more details on deploying and running flows. By implementing this approach, you ensure that subflows are dependent on the parent flow’s state and can be cancelled accordingly.
m
@Marvin your example doesn't include the check_cancellation function with run_deployment, how do you cancel the subflows when using run_deployment
m
Apologies for the oversight. To integrate
check_cancellation
when using
run_deployment
, you will need to explicitly check for and handle cancellation in the parent flow. Here's how you can modify your flows to ensure subflows are canceled when the parent flow is canceled using `run_deployment`:
Copy code
python
import asyncio
from prefect import flow, task, get_run_logger
from prefect.deployments import run_deployment
from prefect.states import Cancelled
from prefect.client import get_client

@task
async def check_and_cancel_subflows(subflow_runs):
    logger = get_run_logger()
    client = get_client()
    parent_flow_run = await client.read_flow_run()
    
    if parent_flow_run.state.is_cancelled():
        logger.warning("Parent flow was canceled, canceling subflows.")
        for subflow_run in subflow_runs:
            await client.set_flow_run_state(flow_run_id=subflow_run.id, state=Cancelled())

@flow
async def parent_flow():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Running Parent Flow")
    subflow_runs = []
    for _ in range(3):
        subflow_run = await run_deployment(name="subflow-deployment")
        subflow_runs.append(subflow_run)

    # Periodically check if the parent flow is canceled and cancel subflows if so
    while not parent_flow.is_complete():
        await check_and_cancel_subflows(subflow_runs)
        await asyncio.sleep(5)  # Adjust the sleep time as needed
    
    # Ensure all subflows are cancelled if the parent flow is cancelled
    await check_and_cancel_subflows(subflow_runs)
    <http://logger.info|logger.info>("Parent Flow Completed")

@flow
async def subflow():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Running Subflow")
    await asyncio.sleep(30)  # Simulate work being done

if __name__ == "__main__":
    asyncio.run(parent_flow())
### Key Points: 1. Check and Cancel: The
check_and_cancel_subflows
task examines the state of the parent flow and triggers the cancellation of subflows if the parent flow is canceled. 2. Async Flow: In
parent_flow
, run subflows asynchronously using
await run_deployment
. 3. Periodic Checks: The
parent_flow
periodically calls
check_and_cancel_subflows
to ensure subflows are canceled if the parent flow is in a canceled state. 4. Completion Check: Ensure the parent flow marks subflows as canceled before exiting. By integrating
check_and_cancel_subflows
, you ensure that subflows are monitored and canceled when the parent flow is canceled. Make sure to adapt sleep intervals and deployment details based on your specific requirements.
m
@Marvin would your code snippet change if I'm using run_deployment to run flows in parallel
m
Yes, running deployments in parallel would involve combining
asyncio.gather
with
run_deployment
to run and manage multiple subflows concurrently. The cancellation logic would need to be adapted to handle concurrent tasks effectively. Here’s how you can modify your script to achieve this:
Copy code
python
import asyncio
from prefect import flow, task, get_run_logger
from prefect.deployments import run_deployment
from prefect.states import Cancelled
from prefect.client import get_client

@task
async def check_and_cancel_subflows(subflow_runs):
    logger = get_run_logger()
    client = get_client()
    parent_flow_run = await client.read_flow_run()
    
    if parent_flow_run.state.is_cancelled():
        logger.warning("Parent flow was canceled, canceling subflows.")
        for subflow_run in subflow_runs:
            await client.set_flow_run_state(flow_run_id=subflow_run.id, state=Cancelled())

@flow
async def parent_flow():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Running Parent Flow")
    subflow_runs = []
    for _ in range(3):  # Adjust the range as per your requirements
        subflow_run = run_deployment(name="subflow-deployment")
        subflow_runs.append(subflow_run)

    # Run subflows concurrently
    subflow_tasks = [asyncio.create_task(subflow) for subflow in subflow_runs]

    # Periodically check if the parent flow is canceled and cancel subflows if so
    while any([not task.done() for task in subflow_tasks]):
        await check_and_cancel_subflows([task.result() for task in subflow_tasks if task.done()])
        await asyncio.sleep(5)  # Adjust the sleep time as needed
    
    # Ensure all subflows are cancelled if the parent flow is cancelled
    await check_and_cancel_subflows([task.result() for task in subflow_tasks if task.done()])
    
    # Await all subflow tasks to ensure completion or cancellation
    await asyncio.gather(*subflow_tasks, return_exceptions=True)

    <http://logger.info|logger.info>("Parent Flow Completed")

@flow
async def subflow():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Running Subflow")
    await asyncio.sleep(30)  # Simulate work being done

if __name__ == "__main__":
    asyncio.run(parent_flow())
### Key Points: 1. Concurrent Subflows: By using
asyncio.create_task
, you can run subflows concurrently. 2. Task Management: Collect all
asyncio.Task
instances in
subflow_tasks
and periodically check their states. 3. Check and Cancel: Apply the
check_and_cancel_subflows
task periodically to ensure all running subflows are checked for cancellation. 4. Await Completion: Ensure all tasks are properly awaited before completion of the parent flow. This approach ensures that multiple subflows run in parallel and respond appropriately to cancellation requests from the parent flow. Adjust the number of subflows and cancellation check intervals as per your needs.
t
@Marvin is it possible to use a storage other than git for the code such as a shared folder? And have prefect upload the code to the shared folder?
m
Yes, it is possible to use a shared folder for Prefect flow storage. Prefect supports various storage backends, including file system-based storage via
fsspec
. This allows you to store code in any filesystem supported by
fsspec
, which includes shared network storage using SMB. Here's a step-by-step guide on using a shared folder for storage: ### 1. Install SMB Protocol First, ensure that you have the necessary library installed. For SMB, you would typically use
smbprotocol
.
Copy code
bash
pip install smbprotocol
### 2. Configure SMBSync Storage Block You can configure this via the Prefect UI or via code. Here’s how you'd set it up using code:
Copy code
python
from prefect.filesystems import SMB

# Define the SMB storage block
smb_storage = SMB(
    basepath="<smb://your_server/shared_folder/>",
    username="your_username",
    password="your_password",
    # Other SMB options can be set here
)

# Save the storage block
smb_storage.save(name="smb-shared-folder", overwrite=True)
### 3. Deploy Using SMB Storage Now, deploy your flow to use this SMB-based storage. Here’s an example using the
flow
and
Deployment
classes:
Copy code
python
from prefect import flow, get_run_logger, task
from prefect.deployments import Deployment
from prefect.filesystems import SMB

@task
def say_hello():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Hello, world!")

@flow
def my_flow():
    say_hello()

# Define the SMB storage again
smb_storage = SMB.load("smb-shared-folder")

# Create and apply the deployment
deployment = Deployment.build_from_flow(
    flow=my_flow,
    name="my-flow-deployment",
    storage=smb_storage
)
deployment.apply()
### 4. Run the Deployment Deploying can be easily done using the Prefect CLI:
Copy code
bash
prefect deployment run my-flow-deployment
### Summary By defining and saving an SMB storage block, you can have Prefect upload flow code to a shared folder, enabling multiple workers to access and run the flow from that shared location. This setup can be useful for on-premises infrastructure or other environments where cloud-based solutions might not be viable. For more detailed configuration and setup on different storage options, refer to the Prefect Storage Documentation.
t
@Marvin but isn't that deprecated?
m
You're correct that some storage methods have been updated in Prefect 2.x, and it's important to follow the latest recommendations. Prefect 2.x primarily utilizes "blocks" for configuring storage, and while the older storage methods may still function, they might not be the preferred approach. ### Steps to Use a Shared Folder with SMB in Prefect 2.x Prefect 2.x supports using an SMB file system for storage by defining and utilizing storage blocks. Below is an updated example using the most current approach. ### 1. Install Required Libraries Ensure you have the necessary libraries installed for SMB support:
Copy code
bash
pip install smbprotocol
### 2. Create and Configure SMB Block You can create an SMB block using the Prefect Cloud UI or programmatically: #### Using Prefect Cloud / UI: 1. Navigate to the Blocks section in Prefect Cloud UI. 2. Create a new SMB block by providing the necessary credentials and connection details. #### Using Python: Here’s how to create the SMB block programmatically:
Copy code
python
from prefect.filesystems import SMB

# Define the SMB storage block
smb_block = SMB(
    basepath="<smb://your_server/shared_folder/>",
    username="your_username",
    password="your_password"
)

# Save the storage block
smb_block.save(name="smb-shared-folder", overwrite=True)
### 3. Deploy Using SMB Storage Once you have the SMB block set up, you can use it for your deployments:
Copy code
python
from prefect import flow, get_run_logger, task
from prefect.deployments import Deployment
from prefect.filesystems import SMB

@task
def say_hello():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Hello, world!")

@flow
def my_flow():
    say_hello()

# Load the SMB storage block
smb_block = SMB.load("smb-shared-folder")

# Create and apply the deployment
deployment = Deployment.build_from_flow(
    flow=my_flow,
    name="my-flow-deployment",
    storage=smb_block
)
deployment.apply()
### 4. Run the Deployment Deploying can be easily done using the Prefect CLI:
Copy code
bash
prefect deployment run my-flow-deployment
### Summary By using the Prefect 2.x block architecture, you can configure and manage shared folder storage via SMB. This updated method utilizes
SMB
storage blocks, providing a modern and flexible approach to handling flow storage. For more details, please refer to the Prefect Storage Documentation and Prefect Filesystems Documentation.
n
> but isn't that deprecated? yes - marvin has trouble removing some deprecated ideas from recommendations 🙄 I think you may want some sort of
on_cancellation
hook in which you could call the
update_flow_run
client method or something similar for each
run_deployment
call your flow has made, maybe something like e.g.
Copy code
all_my_runs = {}

def invoke_remote_subflow(...):
  flow_run = run_deployment(...)
  all_my_runs.add(flow_run)

async def cancel_remote_subflows(flow, flow_run, state):
  async with get_client() as client:
    await asyncio.gather(*[partial(client.set_flow_run_state, flow_run_id=run.id, state=Cancelled()) for run in all_my_runs])

@flow(on_cancellation=[cancel_remote_subflows])
def your_flow():
  # use invoke_remote_subflow