Horatiu Bota
03/31/2022, 7:20 PMKevin Kho
03/31/2022, 7:23 PMHoratiu Bota
03/31/2022, 7:25 PMimport os
from functools import partial
from pathlib import Path
from typing import Callable
import pandas as pd
from prefect import Flow, Parameter, task
from prefect.engine.results import LocalResult
from prefect.engine.serializers import PandasSerializer
def get_data():
return pd.DataFrame({"a": [1, 2, 3, 4], "b": [9, 8, 7, 6]})
def config_task(
function: Callable,
company_id: int,
company_name: str,
file_name: str = None,
checkpoint: bool = False,
):
return task(
checkpoint=checkpoint,
cache_for=pd.Timedelta(days=1),
target=os.path.join(
*[
f"{company_id}-{company_name}",
"{today}",
f"{(file_name or function.__name__)}.csv",
]
),
)(function)
def main():
result = LocalResult(
dir=f"{Path(os.getenv('DATA_DIR')).resolve()}",
serializer=PandasSerializer(file_type="csv"),
)
with Flow("flow", result=result) as flow:
company_id = Parameter("company_id")
company_name = Parameter("company_name")
flow.add_task(company_id)
flow.add_task(company_name)
wrap_task = partial(
config_task,
company_id=company_id,
company_name=company_name,
checkpoint=True,
)
wrap_task(get_data)()
flow.run(parameters={"company_id": 9999, "company_name": "test"})
if __name__ == "__main__":
main()
Kevin Kho
03/31/2022, 7:27 PM