Tinendra kumar
03/22/2025, 7:26 PMMarvin
03/23/2025, 12:45 PMpython
from prefect import flow, task, get_run_logger
@task
def data_extract_subflow():
logger = get_run_logger()
<http://logger.info|logger.info>("Extracting data...")
# Perform extraction
return "extracted"
@task
def data_transform_subflow():
logger = get_run_logger()
<http://logger.info|logger.info>("Transforming data...")
# Perform transformation
return "transformed"
@task
def data_load_subflow():
logger = get_run_logger()
<http://logger.info|logger.info>("Loading data...")
# Perform loading
return "loaded"
@flow(name='run ETL flow', log_prints=True, retries=3, retry_delay_seconds=180)
def main_flow():
# Check state before running each subflow
if not check_state("extraction"):
data_extract_subflow()
set_state("extraction", "done")
if not check_state("transformation"):
data_transform_subflow()
set_state("transformation", "done")
if not check_state("loading"):
data_load_subflow()
set_state("loading", "done")
def check_state(step_name):
# Implement state checking logic
return False
def set_state(step_name, state):
# Implement state setting logic
pass
if __name__ == "__main__":
main_flow()
For more detailed implementation, you might need to integrate with a database or another form of persistent storage to track and update the state of each subflow. You can read more about state management in Prefect's documentation.Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by