Shaoyi Zhang
07/03/2024, 6:14 PMMarvin
07/03/2024, 6:15 PMpython
from prefect import flow, get_run_logger
from prefect.runtime.flows import FlowRunContext
from prefect.blocks.system import Download
@flow(name="My Flow")
def my_flow():
logger = get_run_logger()
# Define a unique lock name for the deployment
lock_name = f"{FlowRunContext.get().deployment_name}-lock"
with Download.block(f"{lock_name}", "/tmp/lockfile.lock") as lock:
if not lock.acquire(timeout=1):
<http://logger.info|logger.info>("Another instance is running, skipping this run.")
return
# Your flow logic here
<http://logger.info|logger.info>("Running the flow logic")
if __name__ == "__main__":
my_flow()
In this snippet:
- The Download.block
is used to create a lockfile as a hacky way to manage concurrency.
- It attempts to acquire the lock; if the lock is already held (meaning another run is in progress), it skips execution.
- If the lock is acquired, it proceeds with the flow logic.
This is a basic example to demonstrate the idea. Depending on your use case, you might want to implement a more robust locking mechanism.
For more detailed information and examples, you can refer to Prefect's documentation or explore community discussions on GitHub.