Share your code for module 2 here: (thread)
# pacc-may-31-2023
j
Share your code for module 2 here: (thread)
k
Copy code
import csv
import random
from pathlib import Path

import httpx
from prefect import flow, task

base_url = '<https://api.open-meteo.com/v1/>'


@task(retries=4, persist_result=True, retry_delay_seconds=0.3)
def fetch_weather(lat: float, lon: float):
    weather = httpx.get(
        f'{base_url}/forecast',
        params=dict(
            latitude=float(lat),
            longitude=float(lon),
            hourly="temperature_2m",
        )
    )
    if random.randint(0, 10) % 2:
        1 / 0

    return weather.json()["hourly"]["temperature_2m"][0]


@task
def fetch_soil_data(lat, lon):
    response = httpx.get(
        f'{base_url}/forecast',
        params=dict(
            latitude=lat,
            longitude=lon,
            hourly='soil_temperature_0cm'
        )
    )
    return response.json()["hourly"]["soil_temperature_0cm"][0]


@flow
def transform_weather_data(weather, soil):
    return f'{weather}-{soil}'


@task
def save_weather(to_write):
    dest = Path('output') / Path('weather.csv')

    with open(dest, 'a+') as _outfile:
        _writer = csv.writer(_outfile, delimiter=';', quotechar="\"")
        _writer.writerow([to_write])


@flow
def get_weather_flow(lat, lon):
    weather_data = fetch_weather(lat, lon)
    soil_data = fetch_soil_data(lat, lon)
    to_write = transform_weather_data(weather_data, soil_data)
    save_weather(to_write)


if __name__ == '__main__':
    get_weather_flow(52.52, 13.41)
🚀 1
prefect duck 1
e
Copy code
import httpx
from prefect import flow, task, get_run_logger

def fetch_api(lat: float, lon: float, hourly: str):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly=hourly),
    )
    most_recent_data = float(weather.json()["hourly"][hourly][0])
    print(f"Most recent {hourly}: {most_recent_data}")
    return most_recent_data

@task(retries=3, retry_delay_seconds=0.1)
def fetch_weather(lat: float, lon: float):
    results = fetch_api(lat, lon, "temperature_2m")
    return results

@task(retries=3, retry_delay_seconds=0.1)
def fetch_windspeed(lat: float, lon: float):
    results = fetch_api(lat, lon, "windspeed_10m")
    return results

@task(retries=3, retry_delay_seconds=0.1)
def fetch_rain(lat: float, lon: float):
    results = fetch_api(lat, lon, "rain")
    return results


@flow()
def pipeline(lat: float, lon: float):
    latest_temp = fetch_weather(lat, lon)
    latest_windspeed = fetch_windspeed(lat, lon)
    latest_rain = fetch_rain(lat, lon)
    return latest_temp, latest_windspeed, latest_rain

@flow()
def log_data_fetched(lat: float, lon: float):
    logger = get_run_logger()
    data = pipeline(lat, lon)
    for metric in data:
        <http://logger.info|logger.info>(f'Data fetched from the API: {metric}')


if __name__ == "__main__":
    log_data_fetched(lat=10, lon=3.5)
marvin 1
wizard2 1
s
Copy code
import httpx  # requests capability, but can work with async
from prefect import flow, task, get_run_logger

base_url = "<https://api.open-meteo.com/v1/forecast/>"

@task(retries=3, retry_delay_seconds=0.1)
def get_temps(lat:float, lon:float):
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="apparent_temperature"),
    )
    return weather.json()["hourly"]["apparent_temperature"]

@task
def average(vals:list[str]):
    return sum([float(v) for v in vals]) / len(vals)

@flow
def average_temp(lat:float, lon:float):
    temps = get_temps(lat, lon)
    avg = average(temps)
    return avg

@flow
def pipeline(lat: float, lon: float):
    result = average_temp(lat, lon)
    get_run_logger().info(f"Average apparent temp is {result}")
    return result


if __name__ == "__main__":
    pipeline(38.9, -77.0)
🙌 1
🦜 1
l
@Tim Lin could you share the code here? 🙏
t
import httpx
from prefect import flow, task import statistics @task() def fetch_air_quality(lat, lon): base_url = "https://air-quality-api.open-meteo.com/v1/air-quality" air_quality = httpx.get( base_url, params=dict(latitude=lat, longitude=lon, hourly="carbon_monoxide,dust") ) return(air_quality.json()) @flow(retries=2) def air_quality_flow(lat: float, lon: float): air_quality_data = fetch_air_quality(lat, lon) air_quality_stats(air_quality_data) return None @flow(log_prints=True) def air_quality_stats(data): cm_mean = statistics.mean(data['hourly']['carbon_monoxide']) print(f"Mean Carbon Monoxide over L24h: {cm_mean}μg/m³") if name == "__main__": air_quality_flow(51.51, -0.13)
l
ty!!
t
Copy code
import httpx
from prefect import flow, task
import statistics

@task()
def fetch_air_quality(lat, lon):
    base_url = "<https://air-quality-api.open-meteo.com/v1/air-quality>"
    air_quality = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="carbon_monoxide,dust")
    )

    return(air_quality.json())


@flow(retries=2)
def air_quality_flow(lat: float, lon: float):
    air_quality_data = fetch_air_quality(lat, lon)
    air_quality_stats(air_quality_data)
    return None

@flow(log_prints=True)
def air_quality_stats(data):
    cm_mean = statistics.mean(data['hourly']['carbon_monoxide'])
    print(f"Mean Carbon Monoxide over L24h: {cm_mean}μg/m³")



if __name__ == "__main__":
    air_quality_flow(51.51, -0.13)
🙌 1
1
formatted better
gratitude thank you 1