https://prefect.io logo
j

Jeff Hale

07/11/2023, 2:52 PM
102 lab: Share your code in the 🧵:
l

Ludwig Schmidt-Hackenberg

07/11/2023, 2:57 PM
Untitled
🦜 3
👍 3
j

Jakub Dvorak

07/11/2023, 3:01 PM
Copy code
import httpx  # requests capability, but can work with async
from prefect import flow, task
from prefect.tasks import task_input_hash

@task(retries=2, retry_delay_seconds = 5)
def fetch_weather(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
    )
    most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
    print(f"Most recent temp C: {most_recent_temp} degrees")
    return most_recent_temp

@task(retries=2, retry_delay_seconds = 5)
def fetch_rain(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="rain"),
    )
    rain_forecast = float(weather.json()["hourly"]["rain"][0])
    print(f"Recent rain forecast: {rain_forecast}")
    return rain_forecast

@task(retries=2, retry_delay_seconds = 5)
def fetch_showers(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="showers"),
    )
    showers_forecast = float(weather.json()["hourly"]["showers"][0])
    print(f"Recent showers: {showers_forecast}")
    return showers_forecast


@task(cache_key_fn=task_input_hash, retries=2, retry_delay_seconds = 5)
def save_weather(temp: float, rain: float, showers: float):
    with open("weather.csv", "w+") as w:
        w.write("Temperature: " + str(temp) + "\n")
        w.write("Rain: " + str(rain) + "\n")
        w.write("Showers: " + str(showers) + "\n")
    return "Successfully wrote data"

@flow(log_prints=True)
def pipeline(lat: float, lon: float):
    temp = fetch_weather(lat, lon)
    rain = fetch_rain(lat, lon)
    showers = fetch_showers(lat, lon)
    
    
    result = save_weather(temp, rain, showers)
    return result


if __name__ == "__main__":
    pipeline(49.68720747651769, 18.384079000116053)
🦜 2
🙌 2
j

János

07/11/2023, 3:01 PM
import httpx # requests capability, but can work with async from prefect import flow, task, get_run_logger from prefect.tasks import task_input_hash @task(retries=2, retry_delay_seconds=1, cache_key_fn=task_input_hash) def fetch_wind_speed(lat: float, lon: float): base_url = “https://api.open-meteo.com/v1/forecast/” weather = httpx.get( base_url, params=dict(latitude=lat, longitude=lon, hourly=“windspeed_10m”), ) wind_speed = weather.json()[“hourly”][“windspeed_10m”][0] return wind_speed @task def fetch_weather(lat: float, lon: float): base_url = “https://api.open-meteo.com/v1/forecast/” weather = httpx.get( base_url, params=dict(latitude=lat, longitude=lon, hourly=“temperature_2m”), ) most_recent_temp = float(weather.json()[“hourly”][“temperature_2m”][0]) return most_recent_temp @task def save(speed): with open(“weather.csv”, “w+“) as w: w.write(str(speed)) return “Successfully wrote temp” @flow def pipeline(lat: float, lon: float): logger = get_run_logger() speed = fetch_wind_speed(lat, lon) logger.info(f”{speed=}“) temp = fetch_weather(lat, lon) logger.info(f”{temp=}“) result = save(temp) return result if name == “__main__“: pipeline(30.46, -97.76)
🦜 3
👍 3
b

Blake Stefansen

07/11/2023, 3:03 PM
import httpx
from prefect import flow, task, get_run_logger
from prefect.logging.loggers import task_run_logger
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(retries=2)
def get_temp(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
)
temp = weather.json()["hourly"]["temperature_2m"][0]
logger = get_run_logger()
<http://logger.info|logger.info>(f"Most recent air temperature at 2m above ground: {temp} C")
return temp
@task(retries=2, cache_key_fn=task_input_hash, cache_expiration=timedelta(seconds=5))
def get_wind(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="windspeed_10m"),
)
temp = weather.json()["hourly"]["windspeed_10m"][0]
logger = get_run_logger()
<http://logger.info|logger.info>(f"Most recent wind speed: {temp} km/h")
print("test")
return temp
@task
def save_weather(temp: float, wind: float):
with open("weather.csv", "w+") as w:
w.write("Temperature: " + str(temp) + "\n")
w.write("Wind: " + str(wind) + "\n")
return "Successfully wrote data"
@flow(log_prints=True)
def test_flow():
temp = get_temp(52.52, 13.41)
wind = get_wind(52.52, 13.41)
wind = get_wind(52.52, 13.41)
save_weather(temp, wind)
if __name__ == "__main__":
test_flow()
a

Andrey Lukashevich

07/11/2023, 3:05 PM
Copy code
# Write flow code that fetches other weather metrics
# Make at least 3 tasks in the flow
# Example: windspeed for the last hour:
# weather.json()["hourly"]["windspeed_10m"][0]
# Docs: <http://open-meteo.com/en/docs|open-meteo.com/en/docs>

import httpx
import random
from datetime import timedelta
from prefect import flow,  task, get_run_logger
from prefect.tasks import task_input_hash


@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def get_api_url(logger):
    url = "<https://api.open-meteo.com/v1/forecast>"
    logger.debug(f"API url is: {url}")
    return url

@task(
    retries=2, 
    retry_delay_seconds=1,
    cache_key_fn=task_input_hash, 
    cache_expiration=timedelta(hours=1)
)
def get_api_request(logger, api_url, params):
    random_num = random.randint(1, 2)
    <http://logger.info|logger.info>(f"Random number (for failures simulation): {random_num}")
    if random_num == 1:
        logger.debug("Going to request to the API...")
        res = httpx.request(
            method="get",
            url=api_url,
            params=params
        )
        return res
    else:
        err_msg = "Simulation of failure"
        logger.error(err_msg)
        raise Exception(err_msg)

@task(retries=5, retry_delay_seconds=1)
def parse_result(logger, res):
    logger.debug(f"API result: {res}")
    if res.status_code == 200:
        <http://logger.info|logger.info>("Res code 200 OK")
        return res.json()    
    else:
        logger.error(f"Res code isn't 200 - we got {res.status_code}")
        raise Exception("API returned wrong result")
    
@flow(
    retries=4, 
    retry_delay_seconds=5,
    name="lab102-flow"
)
def get_recent_windspeed_10m(lat, lon):
    logger = get_run_logger()
    <http://logger.info|logger.info>("Running get_api_url")
    api_url = get_api_url(logger)    
    <http://logger.info|logger.info>("Running get_api_request")
    res = get_api_request(logger, api_url , dict(latitude=lat, longitude=lon, hourly="windspeed_10m"))
    <http://logger.info|logger.info>("Running parse_result")
    res_json = parse_result(logger, res)
    return res_json["hourly"]["windspeed_10m"][-1]
    

if __name__ == "__main__":
    miami = {"lat": 25.761681, "lon": -80.191788}
    miami_windspeed = get_recent_windspeed_10m(**miami)
    print(miami_windspeed)
🦜 1
🙌 2
p

Paul Weinmann

07/11/2023, 3:07 PM
Copy code
from pathlib import Path
from prefect import flow, get_run_logger

import httpx  # requests capability, but can work with async
from prefect import flow, task


@task
def fetch_weather(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
    )
    most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
    return most_recent_temp


@task
def fetch_precipitation(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"

    weather_feature = "precipitation_probability"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly=weather_feature),
    )
    most_recent_prec_prob = float(weather.json()["hourly"][weather_feature][0])
    return most_recent_prec_prob


@task
def save_weather(temp: float):
    with open("weather.csv", "w+") as w:
        w.write(str(temp))
    return "Successfully wrote temp"


@task
def save_precipitation(pp: float):
    with open("precipitation_probability.csv", "w+") as w:
        w.write(str(pp))
    return "Successfully wrote precipitation probability"


@task
def fetch_hourly_weather_feature(lat: float, lon: float, weather_feature: str):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"

    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly=weather_feature),
    )
    most_recent_prec_prob = float(weather.json()["hourly"][weather_feature][0])
    return most_recent_prec_prob


@task
def calc_sunscreen_level(uv_index: float):
    sunscreen_level = 10 * (1 + uv_index) ** 2
    return sunscreen_level


@task
def save_sunscreen_level(sunscreen_level: float):
    with open("sunscreen_level.csv", "w+") as w:
        w.write(str(sunscreen_level))
    return "Successfully wrote precipitation probability"


@flow(log_prints=True)
def pipeline(lat: float, lon: float):
    uv_index = fetch_hourly_weather_feature(lat, lon, weather_feature="uv_index")
    sunscreen_level = calc_sunscreen_level(uv_index)
    result = save_sunscreen_level(sunscreen_level)
    return result


@flow(
    name="log-example-flow",
    retries=3,
    log_prints=True,
    cache_result_in_memory=True,
    persist_result=True,
)
def log_it():
    logger = get_run_logger()
    <http://logger.info|logger.info>("INFO level log message.")
    logger.debug("You only see this message if the logging level is set to DEBUG. 🙂")
    pipeline(38.9, -77.0)


if __name__ == "__main__":
    log_it()
3
🚀 3
v

Vinayak Nair

07/11/2023, 3:15 PM
Copy code
import httpx  # requests capability, but can work with async
from prefect import flow, task
from datetime import timedelta
from prefect.tasks import task_input_hash

@task(retries=3, retry_delay_seconds=2)
def fetch_windspeed(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    windspeed = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="windspeed_10m"),
    )
    most_recent_windspeed = float(windspeed.json()["hourly"]["windspeed_10m"][0])
    return most_recent_windspeed

@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(minutes=1))
def fetch_relative_humidity(lat:float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    relative_humidity = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="relativehumidity_2m"),
    )
    most_recent_relative_humidity = float(relative_humidity.json()["hourly"]["relativehumidity_2m"][0])
    return most_recent_relative_humidity

@task(cache_key_fn=task_input_hash, retries=2, retry_delay_seconds = 5)
def fetch_rain(lat:float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    rain = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="rain"),
    )
    most_recent_rain = float(rain.json()["hourly"]["rain"][0])
    return most_recent_rain

@task(retries=4, retry_delay_seconds=0.1)
def save_data(windspeed: float, relative_humidity: float, rain:float):
    with open("weather_deets.csv", "w+") as w:
        w.write(f"{windspeed},{relative_humidity}, {rain}")
    return "Successfully wrote data"


@flow(
    name="meteo-data",
    retries=3,
    log_prints=True,
    cache_result_in_memory=True,
    persist_result=True,
)
def pipeline(lat: float, lon: float):
    windspeed = fetch_windspeed(lat, lon)
    relative_humidity = fetch_relative_humidity(lat, lon)
    rain = fetch_rain(lat, lon)
    result = save_data(windspeed, relative_humidity, rain)
    return result


if __name__ == "__main__":
    pipeline(52.5244, 13.4105)
squirtle cool 2
🎉 1
d

Derick

07/11/2023, 3:28 PM
Copy code
import httpx  # requests capability, but can work with async
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
from prefect import flow, get_run_logger
from prefect import flow, get_run_logger

# @task
# def fetch_weather(lat: float, lon: float):
#     base_url = "<https://api.open-meteo.com/v1/forecast/>"
#     weather = httpx.get(
#         base_url,
#         params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
#     )
#     most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
#     return most_recent_temp


@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(minutes=1),retries=4, retry_delay_seconds=0.1)
def fetch_weather(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="windspeed_10m"),
    )
    most_recent_windspeed = float(weather.json()["hourly"]["windspeed_10m"][-1])
    return most_recent_windspeed


@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(minutes=1),retries=4, retry_delay_seconds=0.1)
def save_weather(windspeed: float):
    with open("weather.csv", "w+") as w:
        w.write(str(windspeed))
    return "Successfully wrote temp"


@flow(name="dericks-test-flow")
def pipeline(lat: float, lon: float):
    windspeed = fetch_weather(lat, lon)
    result = save_weather(windspeed)
    return result


if __name__ == "__main__":
    pipeline(38.9, -77.0)
marvin 3
🎊 2