Ben Muller

    Ben Muller

    1 year ago
    Hey guys, do you have any examples or do you even suggest using Prefect to manage ML pipelines. Currently we have a very naive pipeline that creates 100's of features in order. It takes an hour or so, and I am thinking of a way that each feature could be created in a parallel as a flow. Would you suggest this as a good use case for Prefect?
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey, this is funny because I just talked at a Meetup and mapped my model training like a grid search. See attached flow code:
    import prefect
    from prefect.executors.dask import LocalDaskExecutor
    import requests as re
    import pandas as pd
    from datetime import datetime, timedelta
    
    from prefect import task, Flow, Parameter, unmapped
    from prefect.tasks.notifications import SlackTask
    from prefect.executors import LocalDaskExecutor
    
    
    def format_url(coin="DOGE"):
        url = "<https://production.api.coindesk.com/v2/price/values/>"
        start_time = (datetime.now() - timedelta(minutes=60)).isoformat(timespec="minutes")
        end_time = datetime.now().isoformat(timespec="minutes")
        params = f"?start_date={start_time}&end_date={end_time}&ohlc=false"
        return url + coin + params
    
    @task(max_retries = 3, retry_delay=timedelta(minutes=1))
    def get_data(coin="DOGE") -> pd.DataFrame:
        prices = re.get(format_url(coin))
        prices = prices.json()['data']['entries']
        data = pd.DataFrame(prices, columns=["time", "price"])
        return data
    
    @task
    def get_models():
        from sklearn.linear_model import LinearRegression, Ridge, Lasso
        m1 = LinearRegression()
        m2 = Ridge()
        m3 = Lasso()
        return [m1, m2, m3]
    
    @task
    def train_model(df, model):
        from sklearn.metrics import mean_absolute_error
        df['lag1'] = df["price"].shift(1)
        df['lag2'] = df["price"].shift(2)
        df.fillna(0, inplace=True)
    
        model.fit(df.drop("price", axis=1), df["price"])
    
        error = mean_absolute_error(model.predict(df.drop("price", axis=1)), df["price"])
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(model.__class__.__name__)
        <http://logger.info|logger.info>(error)
        return 
    
    with Flow("to-the-moon-modelling") as flow:
        coin = Parameter("coin", default="DOGE")
        data = get_data(coin)
        models = get_models()
        train_model.map(unmapped(data), models)
    
    flow.executor = LocalDaskExecutor()
    flow.run()
    To answer your question though, I recommend processing those independently and then joining later if possible yep. Just don’t map because the map has to be a homogenous task.
    Actually, maybe you should just use a Dask Dataframe to handle the distributed compute.