https://prefect.io logo
#prefect-community
Title
# prefect-community
h

Horatiu Bota

03/31/2022, 7:20 PM
hi prefect-community! is there a good way to use flow Parameters when configuring tasks before runtime -- for example, i'd like to pass company_id/company_name as parameters to my flow, then use those values to configure where to cache task outputs:
k

Kevin Kho

03/31/2022, 7:23 PM
Hi @Horatiu Bota, could you move the code to the thread when you get the chance? I think what you are asking is how to modify result location or target depending on parameters. You can use templating for that and use the parameters from context
h

Horatiu Bota

03/31/2022, 7:25 PM
@Kevin Kho whoops, sorry about the code 😅
Copy code
import os
from functools import partial
from pathlib import Path
from typing import Callable

import pandas as pd
from prefect import Flow, Parameter, task
from prefect.engine.results import LocalResult
from prefect.engine.serializers import PandasSerializer


def get_data():
    return pd.DataFrame({"a": [1, 2, 3, 4], "b": [9, 8, 7, 6]})


def config_task(
    function: Callable,
    company_id: int,
    company_name: str,
    file_name: str = None,
    checkpoint: bool = False,
):
    return task(
        checkpoint=checkpoint,
        cache_for=pd.Timedelta(days=1),
        target=os.path.join(
            *[
                f"{company_id}-{company_name}",
                "{today}",
                f"{(file_name or function.__name__)}.csv",
            ]
        ),
    )(function)


def main():
    result = LocalResult(
        dir=f"{Path(os.getenv('DATA_DIR')).resolve()}",
        serializer=PandasSerializer(file_type="csv"),
    )

    with Flow("flow", result=result) as flow:
        company_id = Parameter("company_id")
        company_name = Parameter("company_name")

        flow.add_task(company_id)
        flow.add_task(company_name)

        wrap_task = partial(
            config_task,
            company_id=company_id,
            company_name=company_name,
            checkpoint=True,
        )

        wrap_task(get_data)()

    flow.run(parameters={"company_id": 9999, "company_name": "test"})


if __name__ == "__main__":
    main()
k

Kevin Kho

03/31/2022, 7:27 PM
Yeah the doc I sent is a good place to start
8 Views