Mitch
05/27/2024, 8:40 PMMarvin
05/27/2024, 8:40 PMPREFECT_EXPERIMENTAL_ENABLE_ENHANCED_CANCELLATION
configuration. This might improve the cancellation experience, although it doesn’t solve the full problem.
bash
prefect config set PREFECT_EXPERIMENTAL_ENABLE_ENHANCED_CANCELLATION=True
2. Cascading Cancellation: Prefect's current implementation does not automatically cancel subflows when a parent flow is canceled. However, there are documented and ongoing discussions about improving this capability.
3. Manual Handling: As a workaround, set up state hooks to explicitly cancel subflows when the parent flow is canceled:
python
from prefect import flow, task
from prefect.states import Cancelled
@task
def check_cancellation(ctx):
if ctx.parent_task_run_id and ctx.flow_run.parent_task_run_id:
raise Cancelled("Parent task was cancelled, hence cancelling this subflow.")
@flow
def parent_flow():
# Your parent flow tasks here
# Calling subflow and ensuring it gets canceled
state = subflow.with_options(on_crashed=check_cancellation)()
@flow
def subflow():
# Your subflow tasks here
if __name__ == "__main__":
parent_flow()
4. Monitoring GitHub Issues: Prefect is actively discussing the feature of simplifying cascading cancellations. You might want to track or contribute to these GitHub issues:
- Support subflow cancellation if parent flow is in terminal state
- Cascading cancellation should work for normal subflows
By following these guidelines, you can manage and handle cancellations more effectively, albeit with some manual configurations. Additionally, stay updated with Prefect's development efforts for more seamless solutions soon.Mitch
05/27/2024, 8:42 PMMarvin
05/27/2024, 8:42 PMrun_deployment
asynchronously, you can follow the example below. This approach integrates state checks to cancel subflows if the parent flow is canceled:
python
import asyncio
from prefect import flow, task, get_run_logger
from prefect.deployments import run_deployment
from prefect.states import Cancelled
@task
async def check_cancellation(ctx):
if ctx.parent_task_run_id and ctx.flow_run.parent_task_run_id:
logger = get_run_logger()
logger.warning("Parent flow was canceled, canceling this subflow.")
raise Cancelled("Parent task was cancelled, hence cancelling this subflow.")
@flow
async def subflow():
logger = get_run_logger()
<http://logger.info|logger.info>("Running Subflow")
await asyncio.sleep(30) # Simulate work being done
@flow
async def parent_flow():
logger = get_run_logger()
<http://logger.info|logger.info>("Running Parent Flow")
subflow_runs = []
for _ in range(3):
subflow_run = run_deployment(name="subflow-deployment")
subflow_runs.append(subflow_run)
# Await all subflows
await asyncio.gather(*subflow_runs)
<http://logger.info|logger.info>("Parent Flow Completed")
if __name__ == "__main__":
asyncio.run(parent_flow())
Key Points:
- Enhanced Cancellation: Make sure to enable the experimental enhanced cancellation feature as previously mentioned.
- Check Cancellation: The check_cancellation
task checks if the parent flow is canceled and raises a Cancelled
exception for the subflow.
- Asynchronous Run Deployment: Use await run_deployment(name="subflow-deployment")
to run subflows asynchronously and await their completions.
Also, refer to Prefect Deployments Docs for more details on deploying and running flows.
By implementing this approach, you ensure that subflows are dependent on the parent flow’s state and can be cancelled accordingly.Mitch
05/27/2024, 8:50 PMMarvin
05/27/2024, 8:50 PMcheck_cancellation
when using run_deployment
, you will need to explicitly check for and handle cancellation in the parent flow. Here's how you can modify your flows to ensure subflows are canceled when the parent flow is canceled using `run_deployment`:
python
import asyncio
from prefect import flow, task, get_run_logger
from prefect.deployments import run_deployment
from prefect.states import Cancelled
from prefect.client import get_client
@task
async def check_and_cancel_subflows(subflow_runs):
logger = get_run_logger()
client = get_client()
parent_flow_run = await client.read_flow_run()
if parent_flow_run.state.is_cancelled():
logger.warning("Parent flow was canceled, canceling subflows.")
for subflow_run in subflow_runs:
await client.set_flow_run_state(flow_run_id=subflow_run.id, state=Cancelled())
@flow
async def parent_flow():
logger = get_run_logger()
<http://logger.info|logger.info>("Running Parent Flow")
subflow_runs = []
for _ in range(3):
subflow_run = await run_deployment(name="subflow-deployment")
subflow_runs.append(subflow_run)
# Periodically check if the parent flow is canceled and cancel subflows if so
while not parent_flow.is_complete():
await check_and_cancel_subflows(subflow_runs)
await asyncio.sleep(5) # Adjust the sleep time as needed
# Ensure all subflows are cancelled if the parent flow is cancelled
await check_and_cancel_subflows(subflow_runs)
<http://logger.info|logger.info>("Parent Flow Completed")
@flow
async def subflow():
logger = get_run_logger()
<http://logger.info|logger.info>("Running Subflow")
await asyncio.sleep(30) # Simulate work being done
if __name__ == "__main__":
asyncio.run(parent_flow())
### Key Points:
1. Check and Cancel: The check_and_cancel_subflows
task examines the state of the parent flow and triggers the cancellation of subflows if the parent flow is canceled.
2. Async Flow: In parent_flow
, run subflows asynchronously using await run_deployment
.
3. Periodic Checks: The parent_flow
periodically calls check_and_cancel_subflows
to ensure subflows are canceled if the parent flow is in a canceled state.
4. Completion Check: Ensure the parent flow marks subflows as canceled before exiting.
By integrating check_and_cancel_subflows
, you ensure that subflows are monitored and canceled when the parent flow is canceled. Make sure to adapt sleep intervals and deployment details based on your specific requirements.Mitch
05/27/2024, 9:05 PMMarvin
05/27/2024, 9:06 PMasyncio.gather
with run_deployment
to run and manage multiple subflows concurrently. The cancellation logic would need to be adapted to handle concurrent tasks effectively. Here’s how you can modify your script to achieve this:
python
import asyncio
from prefect import flow, task, get_run_logger
from prefect.deployments import run_deployment
from prefect.states import Cancelled
from prefect.client import get_client
@task
async def check_and_cancel_subflows(subflow_runs):
logger = get_run_logger()
client = get_client()
parent_flow_run = await client.read_flow_run()
if parent_flow_run.state.is_cancelled():
logger.warning("Parent flow was canceled, canceling subflows.")
for subflow_run in subflow_runs:
await client.set_flow_run_state(flow_run_id=subflow_run.id, state=Cancelled())
@flow
async def parent_flow():
logger = get_run_logger()
<http://logger.info|logger.info>("Running Parent Flow")
subflow_runs = []
for _ in range(3): # Adjust the range as per your requirements
subflow_run = run_deployment(name="subflow-deployment")
subflow_runs.append(subflow_run)
# Run subflows concurrently
subflow_tasks = [asyncio.create_task(subflow) for subflow in subflow_runs]
# Periodically check if the parent flow is canceled and cancel subflows if so
while any([not task.done() for task in subflow_tasks]):
await check_and_cancel_subflows([task.result() for task in subflow_tasks if task.done()])
await asyncio.sleep(5) # Adjust the sleep time as needed
# Ensure all subflows are cancelled if the parent flow is cancelled
await check_and_cancel_subflows([task.result() for task in subflow_tasks if task.done()])
# Await all subflow tasks to ensure completion or cancellation
await asyncio.gather(*subflow_tasks, return_exceptions=True)
<http://logger.info|logger.info>("Parent Flow Completed")
@flow
async def subflow():
logger = get_run_logger()
<http://logger.info|logger.info>("Running Subflow")
await asyncio.sleep(30) # Simulate work being done
if __name__ == "__main__":
asyncio.run(parent_flow())
### Key Points:
1. Concurrent Subflows: By using asyncio.create_task
, you can run subflows concurrently.
2. Task Management: Collect all asyncio.Task
instances in subflow_tasks
and periodically check their states.
3. Check and Cancel: Apply the check_and_cancel_subflows
task periodically to ensure all running subflows are checked for cancellation.
4. Await Completion: Ensure all tasks are properly awaited before completion of the parent flow.
This approach ensures that multiple subflows run in parallel and respond appropriately to cancellation requests from the parent flow. Adjust the number of subflows and cancellation check intervals as per your needs.Tal
05/28/2024, 6:00 AMMarvin
05/28/2024, 6:01 AMfsspec
. This allows you to store code in any filesystem supported by fsspec
, which includes shared network storage using SMB.
Here's a step-by-step guide on using a shared folder for storage:
### 1. Install SMB Protocol
First, ensure that you have the necessary library installed. For SMB, you would typically use smbprotocol
.
bash
pip install smbprotocol
### 2. Configure SMBSync Storage Block
You can configure this via the Prefect UI or via code. Here’s how you'd set it up using code:
python
from prefect.filesystems import SMB
# Define the SMB storage block
smb_storage = SMB(
basepath="<smb://your_server/shared_folder/>",
username="your_username",
password="your_password",
# Other SMB options can be set here
)
# Save the storage block
smb_storage.save(name="smb-shared-folder", overwrite=True)
### 3. Deploy Using SMB Storage
Now, deploy your flow to use this SMB-based storage. Here’s an example using the flow
and Deployment
classes:
python
from prefect import flow, get_run_logger, task
from prefect.deployments import Deployment
from prefect.filesystems import SMB
@task
def say_hello():
logger = get_run_logger()
<http://logger.info|logger.info>("Hello, world!")
@flow
def my_flow():
say_hello()
# Define the SMB storage again
smb_storage = SMB.load("smb-shared-folder")
# Create and apply the deployment
deployment = Deployment.build_from_flow(
flow=my_flow,
name="my-flow-deployment",
storage=smb_storage
)
deployment.apply()
### 4. Run the Deployment
Deploying can be easily done using the Prefect CLI:
bash
prefect deployment run my-flow-deployment
### Summary
By defining and saving an SMB storage block, you can have Prefect upload flow code to a shared folder, enabling multiple workers to access and run the flow from that shared location. This setup can be useful for on-premises infrastructure or other environments where cloud-based solutions might not be viable.
For more detailed configuration and setup on different storage options, refer to the Prefect Storage Documentation.Tal
05/28/2024, 6:02 AMMarvin
05/28/2024, 6:03 AMbash
pip install smbprotocol
### 2. Create and Configure SMB Block
You can create an SMB block using the Prefect Cloud UI or programmatically:
#### Using Prefect Cloud / UI:
1. Navigate to the Blocks section in Prefect Cloud UI.
2. Create a new SMB block by providing the necessary credentials and connection details.
#### Using Python:
Here’s how to create the SMB block programmatically:
python
from prefect.filesystems import SMB
# Define the SMB storage block
smb_block = SMB(
basepath="<smb://your_server/shared_folder/>",
username="your_username",
password="your_password"
)
# Save the storage block
smb_block.save(name="smb-shared-folder", overwrite=True)
### 3. Deploy Using SMB Storage
Once you have the SMB block set up, you can use it for your deployments:
python
from prefect import flow, get_run_logger, task
from prefect.deployments import Deployment
from prefect.filesystems import SMB
@task
def say_hello():
logger = get_run_logger()
<http://logger.info|logger.info>("Hello, world!")
@flow
def my_flow():
say_hello()
# Load the SMB storage block
smb_block = SMB.load("smb-shared-folder")
# Create and apply the deployment
deployment = Deployment.build_from_flow(
flow=my_flow,
name="my-flow-deployment",
storage=smb_block
)
deployment.apply()
### 4. Run the Deployment
Deploying can be easily done using the Prefect CLI:
bash
prefect deployment run my-flow-deployment
### Summary
By using the Prefect 2.x block architecture, you can configure and manage shared folder storage via SMB. This updated method utilizes SMB
storage blocks, providing a modern and flexible approach to handling flow storage.
For more details, please refer to the Prefect Storage Documentation and Prefect Filesystems Documentation.Nate
05/28/2024, 2:59 PMon_cancellation
hook in which you could call the update_flow_run
client method or something similar for each run_deployment
call your flow has made, maybe something like
e.g.
all_my_runs = {}
def invoke_remote_subflow(...):
flow_run = run_deployment(...)
all_my_runs.add(flow_run)
async def cancel_remote_subflows(flow, flow_run, state):
async with get_client() as client:
await asyncio.gather(*[partial(client.set_flow_run_state, flow_run_id=run.id, state=Cancelled()) for run in all_my_runs])
@flow(on_cancellation=[cancel_remote_subflows])
def your_flow():
# use invoke_remote_subflow