https://prefect.io logo
j

Jeff Hale

08/30/2023, 12:45 AM
Share your code for 102 here (🧵):
🦜 1
j

julius

08/30/2023, 12:53 AM
import httpx from prefect import flow, task import pandas as pd from datetime import timedelta from prefect.tasks import task_input_hash from prefect import flow, get_run_logger @task(retries=4, retry_delay_seconds=0.1, cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1), log_prints=True) def fetch_weather(lat: float, lon: float): print(“This is the start of my flow”) base_url = “https://api.open-meteo.com/v1/forecast/” weather = httpx.get( base_url, params=dict( latitude=lat, longitude=lon, hourly=“temperature_2m,cloudcover_mid,windspeed_10m,winddirection_10m”, ), ) weather_json = weather.json() df = pd.DataFrame( { “latitude”: weather_json[“latitude”], “longitude”: weather_json[“longitude”], “temperature”: weather.json()[“hourly”][“temperature_2m”], “wind”: weather.json()[“hourly”][“windspeed_10m”], “cloudcover”: weather.json()[“hourly”][“cloudcover_mid”], } ) return df @task(retries=4, retry_delay_seconds=0.1, cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1)) def save_weather_data(data: pd.DataFrame): data.to_csv(“weather_data.csv”) return “successfully wrote file” @task(retries=4, retry_delay_seconds=0.1, cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1)) def get_latest_value(data: pd.DataFrame, metric: str): print(f”latest data for {metric}: {data[metric].iloc[-1]}“) @flow def get_weather_metrics(lat: float, lon: float): print(“This is the start of my flow”) print(“You only see this message if the logging level is set to DEBUG. 🙂”) df = fetch_weather(lat=lat, lon=lon) # print out latest wind metric get_latest_value(df, “wind”) save_weather_data(df) if name == “__main__“: get_weather_metrics(38.9, -77.0)
🦜 1
sonic 1
e

Elizabeth Bing

08/30/2023, 1:00 AM
import httpx
from prefect import flow, task import pandas as pd from prefect.tasks import task_input_hash # latitude 26.1 North, longitude 84.8 West # cache_key determines the inputs @task(cache_key_fn=task_input_hash, cache_expiration=datetime.timedeltas=) def fetch_weather(lat: float, lon: float): base_url = "https://api.open-meteo.com/v1/forecast/" weather = httpx.get( base_url, params=dict( latitude=lat, longitude=lon, hourly="temperature_2m,cloudcover_mid,windspeed_10m,winddirection_10m", ), ) weather_json = weather.json() df = pd.DataFrame( { "latitude": weather_json["latitude"], "longitude": weather_json["longitude"], "temperature": weather.json()["hourly"]["temperature_2m"], "wind": weather.json()["hourly"]["windspeed_10m"], "cloudcover": weather.json()["hourly"]["cloudcover_mid"], } ) return df @task def save_weather_data(data: pd.DataFrame): data.to_csv("weather_data.csv") return "successfully wrote file" @task def get_latest_value(data: pd.DataFrame, metric: str): print(f"latest data for {metric} {data[metric].iloc[-1]}") @flow(retries=4) def get_weather_metrics(lat: float, lon: float): df = fetch_weather(lat=lat, lon=lon) # print out latest wind metric get_latest_value(df, "wind") save_weather_data(df) if name == "__main__": get_weather_metrics(38.9, -77.0)
prefect spin 1
wizard2 1
l

Lucero Yanez

08/30/2023, 1:01 AM
Copy code
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
import httpx

@task(cache_key_fn=task_input_hash, retries=3)
def fetch_weather(lat:float, lon:float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat ,longitude=lon, hourly="temperature_2m"),
    )
    most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
    return most_recent_temp

@task(cache_key_fn=task_input_hash, retries=2, persist_result=True)
def save_weather(temp:float):
    with open("weather.csv", "w+") as w:
        w.write(str(temp))
    return "Successfully wrote temp"

@task(cache_key_fn=task_input_hash, retries=3, persist_result=True)
def get_elevation(lat:float, lon:float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat ,longitude=lon, hourly="temperature_2m"),
    )
    elevation = float(weather.json()["elevation"])
    return elevation


@flow(log_prints=True)
def pipeline(lat:float, lon:float):
    temp = fetch_weather(lat, lon)
    result = save_weather(temp)
    elevation = get_elevation(lat, lon)
    print("The elevation is: ",elevation)
    return result

if __name__ == "__main__":
    pipeline(52.52, 13.41)
🦜 1
prefect duck 1
a

Anthony Kippa

08/30/2023, 1:04 AM
Copy code
import httpx
from prefect import task, flow
from prefect.tasks import task_input_hash

@task(cache_key_fn=task_input_hash, retries=2, retry_delay_seconds=0.5)
def get_hourly_weather_temparature(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
    )
    most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
    return most_recent_temp

@task(retries=2, retry_delay_seconds=0.5)
def get_hourly_rain(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    rain = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="rain"),
    )
    most_recent_rain = float(rain.json()["hourly"]["rain"][0])
    return most_recent_rain

@task(retries=2, retry_delay_seconds=0.5)
def save_weather(temp: float, rain: float):
    with open("weather.csv", "w+") as w:
        temp = "Temparature:{}\n".format(temp)
        rain = "Rainfall:{}\n".format(rain)
        w.writelines([temp,rain])
    return "Successfully wrote temp and rainfall"

@flow
def pipeline(lat: float, lon: float):
    temp = get_hourly_weather_temparature(lat, lon)
    rain = get_hourly_rain(lat, lon)
    result = save_weather(temp, rain)
    return result


if __name__ == "__main__":
    pipeline(38.9, -77.0)
🦜 1
prefect spin 1