Aaron Goebel
06/06/2024, 9:20 PMMarvin
06/06/2024, 9:20 PMpython
from prefect import task, flow, get_run_logger
import time
@task(retries=20, retry_delay_seconds=900) # Retries every 15 minutes for up to 5 hours
def wait_for_external_state_change():
logger = get_run_logger()
# Replace with your actual checking logic
state = check_external_state()
if state != "desired_state":
<http://logger.info|logger.info>("External state not met. Checking again later.")
raise Exception("External state not met yet.")
<http://logger.info|logger.info>("External state condition met.")
return state
def check_external_state():
# Replace this with your actual logic to check the state of the external system
# For example, query a database or make an API call
...
return "current state"
@flow
def my_flow():
result = wait_for_external_state_change()
logger = get_run_logger()
<http://logger.info|logger.info>(f"Final state: {result}")
if __name__ == "__main__":
my_flow()
### Key Points
- Retries and Delay: The retries
parameter specifies the number of retry attempts, and retry_delay_seconds
specifies the delay between retries.
- State Checking: Replace check_external_state
with your actual logic for checking the state of the external system.
### Sources
1. State Change Hooks
2. AWS Client Waiter Example
This approach allows your flow to wait for several hours, periodically checking for the desired external state.Aaron Goebel
06/06/2024, 9:21 PMKevin Grismore
06/06/2024, 9:23 PM