Hey guys, do you have any examples or do you even ...
# ask-community
b
Hey guys, do you have any examples or do you even suggest using Prefect to manage ML pipelines. Currently we have a very naive pipeline that creates 100's of features in order. It takes an hour or so, and I am thinking of a way that each feature could be created in a parallel as a flow. Would you suggest this as a good use case for Prefect?
k
Hey, this is funny because I just talked at a Meetup and mapped my model training like a grid search. See attached flow code:
Copy code
import prefect
from prefect.executors.dask import LocalDaskExecutor
import requests as re
import pandas as pd
from datetime import datetime, timedelta

from prefect import task, Flow, Parameter, unmapped
from prefect.tasks.notifications import SlackTask
from prefect.executors import LocalDaskExecutor


def format_url(coin="DOGE"):
    url = "<https://production.api.coindesk.com/v2/price/values/>"
    start_time = (datetime.now() - timedelta(minutes=60)).isoformat(timespec="minutes")
    end_time = datetime.now().isoformat(timespec="minutes")
    params = f"?start_date={start_time}&end_date={end_time}&ohlc=false"
    return url + coin + params

@task(max_retries = 3, retry_delay=timedelta(minutes=1))
def get_data(coin="DOGE") -> pd.DataFrame:
    prices = re.get(format_url(coin))
    prices = prices.json()['data']['entries']
    data = pd.DataFrame(prices, columns=["time", "price"])
    return data

@task
def get_models():
    from sklearn.linear_model import LinearRegression, Ridge, Lasso
    m1 = LinearRegression()
    m2 = Ridge()
    m3 = Lasso()
    return [m1, m2, m3]

@task
def train_model(df, model):
    from sklearn.metrics import mean_absolute_error
    df['lag1'] = df["price"].shift(1)
    df['lag2'] = df["price"].shift(2)
    df.fillna(0, inplace=True)

    model.fit(df.drop("price", axis=1), df["price"])

    error = mean_absolute_error(model.predict(df.drop("price", axis=1)), df["price"])
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(model.__class__.__name__)
    <http://logger.info|logger.info>(error)
    return 

with Flow("to-the-moon-modelling") as flow:
    coin = Parameter("coin", default="DOGE")
    data = get_data(coin)
    models = get_models()
    train_model.map(unmapped(data), models)

flow.executor = LocalDaskExecutor()
flow.run()
🦜 2
To answer your question though, I recommend processing those independently and then joining later if possible yep. Just don’t map because the map has to be a homogenous task.
Actually, maybe you should just use a Dask Dataframe to handle the distributed compute.