Vijay Purbia
01/20/2025, 9:04 AMraise ENDRUN(
state=Failed(
message=message
)
)
raise ENDRUN(
state=Skipped(
message=message
)
)
Bianca Hoch
01/21/2025, 10:30 PMfrom prefect import flow, task
from prefect.states import Completed
from typing import Optional, List, Dict
from datetime import datetime
@task
def fetch_data(data: List[Dict]):
"""
Task to fetch data from some source
Returns COMPLETED state if no data is found
"""
fetched_data = data
return fetched_data
@task
def transform_data(data: List[Dict]) -> List[Dict]:
"""Transform the fetched data"""
transformed_data = data.copy()
for item in transformed_data:
item['processed_at'] = datetime.now().isoformat()
item['status'] = 'transformed'
return transformed_data
@task
def load_data(data: List[Dict]) -> None:
"""Load the transformed data"""
print(f"Loading {len(data)} records...")
print(f"Data to be loaded: {data}")
print("Data loaded successfully!")
@flow(name="Simple ETL Flow", log_prints=True)
def etl_flow(data: List[Dict]):
# Fetch data
raw_data = fetch_data(data)
if not raw_data:
print("No data available")
return Completed(message="No data available")
# These tasks will only run if fetch_data returns data
transformed_data = transform_data(raw_data)
load_data(transformed_data)
print("Flow completed successfully!")
if __name__ == "__main__":
#uncomment to run with data
#etl_flow([{'id': 1, 'value': 'test1'}, {'id': 2, 'value': 'test2'}, {'id': 3, 'value': 'test3'}])
#uncomment to run without data
etl_flow([])