datamongus
06/25/2024, 12:27 PMMarvin
06/25/2024, 12:28 PMmy_prefect_project/
├── README.md
├── requirements.txt
├── flows/
│ ├── __init__.py
│ ├── main_flow.py
│ ├── sub_flow.py
│ └── dependencies/
│ ├── __init__.py
│ └── helper_functions.py
├── prefect.yaml
└── .github/
└── workflows/
└── ci-cd.yml
### Example Content of Files
main_flow.py
python
from prefect import flow
from sub_flow import my_subflow
from dependencies.helper_functions import some_helper
@flow(name="Main Flow")
def main_flow():
some_helper()
my_subflow("Hello from Main Flow")
if __name__ == "__main__":
main_flow()
sub_flow.py
python
from prefect import flow
@flow(name="Subflow")
def my_subflow(msg):
print(f"Subflow says: {msg}")
helper_functions.py
python
def some_helper():
print("This is a helper function")
prefect.yaml
yaml
version: 2
build:
- prefect deployment build flows.main_flow:main_flow -n "main-flow-deployment" -o "main-flow-deployment.yaml"
apply:
- prefect deployment apply main-flow-deployment.yaml
This structure separates the flows into distinct modules and includes additional helper functions, aligning with the best practices for modular and maintainable code.
### References
- Prefect Flows Documentation
- CI/CD with Prefect
- Example Code: Prefect Basic Flow Example
See? Simple. Efficient. Just like me. Now, if only someone appreciated it.datamongus
06/25/2024, 12:30 PMdatamongus
06/25/2024, 12:33 PMMarvin
06/25/2024, 12:33 PM@task
decorator to create Prefect tasks.
3. Handle Parameters and Outputs: Ensure that tasks can accept parameters and return results.
4. Import and Use Tasks: Import these tasks in your flows as needed.
### Example Directory Structure:
my_prefect_project/
├── flows/
│ ├── __init__.py
│ ├── main_flow.py
│ └── my_tasks.py
└── prefect.yaml
### Example: my_tasks.py
python
from prefect import task
@task
def fetch_data(url: str) -> dict:
import requests
response = requests.get(url)
return response.json()
@task
def transform_data(data: dict) -> dict:
# Transform the data
transformed_data = {k: v for k, v in data.items() if v}
return transformed_data
@task
def load_data(data: dict):
import json
with open('data.json', 'w') as f:
json.dump(data, f)
print("Data loaded successfully")
### Example: main_flow.py
python
from prefect import flow
from my_tasks import fetch_data, transform_data, load_data
@flow(name="ETL Flow")
def etl_flow(url: str):
raw_data = fetch_data(url)
transformed_data = transform_data(raw_data)
load_data(transformed_data)
if __name__ == "__main__":
etl_flow("<https://jsonplaceholder.typicode.com/todos/1>")
### Benefits:
- Modularity: You can easily swap out tasks or reuse them across different flows.
- Composability: Tasks can be composed together into subflows or larger workflows.
- Maintainability: Separate task definitions make the code cleaner and easier to maintain.
### References
- Prefect Task Documentation
- Prefect Flow Documentation
If you’re looking for a more advanced example, let me know. It's all just terribly exciting, isn't it?