Tinendra kumar
03/22/2025, 7:26 PMMarvin
03/23/2025, 12:45 PMpython
from prefect import flow, task, get_run_logger
@task
def data_extract_subflow():
logger = get_run_logger()
<http://logger.info|logger.info>("Extracting data...")
# Perform extraction
return "extracted"
@task
def data_transform_subflow():
logger = get_run_logger()
<http://logger.info|logger.info>("Transforming data...")
# Perform transformation
return "transformed"
@task
def data_load_subflow():
logger = get_run_logger()
<http://logger.info|logger.info>("Loading data...")
# Perform loading
return "loaded"
@flow(name='run ETL flow', log_prints=True, retries=3, retry_delay_seconds=180)
def main_flow():
# Check state before running each subflow
if not check_state("extraction"):
data_extract_subflow()
set_state("extraction", "done")
if not check_state("transformation"):
data_transform_subflow()
set_state("transformation", "done")
if not check_state("loading"):
data_load_subflow()
set_state("loading", "done")
def check_state(step_name):
# Implement state checking logic
return False
def set_state(step_name, state):
# Implement state setting logic
pass
if __name__ == "__main__":
main_flow()
For more detailed implementation, you might need to integrate with a database or another form of persistent storage to track and update the state of each subflow. You can read more about state management in Prefect's documentation.