Horatiu Bota
03/31/2022, 7:20 PMKevin Kho
Horatiu Bota
03/31/2022, 7:25 PMHoratiu Bota
03/31/2022, 7:27 PMimport os
from functools import partial
from pathlib import Path
from typing import Callable
import pandas as pd
from prefect import Flow, Parameter, task
from prefect.engine.results import LocalResult
from prefect.engine.serializers import PandasSerializer
def get_data():
return pd.DataFrame({"a": [1, 2, 3, 4], "b": [9, 8, 7, 6]})
def config_task(
function: Callable,
company_id: int,
company_name: str,
file_name: str = None,
checkpoint: bool = False,
):
return task(
checkpoint=checkpoint,
cache_for=pd.Timedelta(days=1),
target=os.path.join(
*[
f"{company_id}-{company_name}",
"{today}",
f"{(file_name or function.__name__)}.csv",
]
),
)(function)
def main():
result = LocalResult(
dir=f"{Path(os.getenv('DATA_DIR')).resolve()}",
serializer=PandasSerializer(file_type="csv"),
)
with Flow("flow", result=result) as flow:
company_id = Parameter("company_id")
company_name = Parameter("company_name")
flow.add_task(company_id)
flow.add_task(company_name)
wrap_task = partial(
config_task,
company_id=company_id,
company_name=company_name,
checkpoint=True,
)
wrap_task(get_data)()
flow.run(parameters={"company_id": 9999, "company_name": "test"})
if __name__ == "__main__":
main()
Kevin Kho
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by