Share your code from lab 101 here: :thread:
# pacc-june-14-2023
j
Share your code from lab 101 here: ๐Ÿงต
upvote 1
b
We missed the GitHub link so we went down our own path with some simple fetch methods
Copy code
from prefect import flow, task 
import requests as re 

url = '<https://api.open-meteo.com/v1/forecast>'

@task
def fetch_weather_data(url, params):
    response = re.get(url, params)
    return response

@flow
def main():
    # Get some temperature data for a place @ 13.41 Lon 52.52 Lat
    temperature_params = {
        'longitude' : 13.41,
        'latitude' : 52.52
        }

    temperature_data = fetch_weather_data(url, temperature_params)
    # <https://api.open-meteo.com/v1/forecast?latitude=52.52&longitude=13.41>
    print(temperature_datsa.text)
1000000 1
๐Ÿ™Œ 1
blob attention gif 1
c
Copy code
import httpx  # requests capability, but can work with async
from prefect import flow, task


@task
def fetch_weather(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
    )
    most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
    return most_recent_temp


@task
def save_weather(temp: float, elevation: float):
    with open("weather.csv", "w+") as w:
        w.write(str(temp))
    with open("elevation.csv", "w+") as w:
        w.write(str(elevation))

    return "Successfully wrote temp and elevation"

@task
def fetch_elevation(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/elevation/>"
    elevation = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon),
    )

    elevation = float(elevation.json()["elevation"][0])

    return elevation

@flow
def pipeline(lat: float, lon: float):
    temp = fetch_weather(lat, lon)
    elevation = fetch_elevation(lat,lon)
    result = save_weather(temp,elevation)
    
    return result


if __name__ == "__main__":
    pipeline(38.9, -77.0)
b
Copy code
from prefect import flow, task 
import requests as re 
import json 
url = '<https://api.open-meteo.com/v1/forecast>'

@task
def fetch_weather_data(url, params):
    response = re.get(url, params)
    return response

@task
def get_elevation(json_object):
    elevation = json.loads(json_object.text)['elevation']
    return elevation 

@flow
def main():
    # Get some temperature data for a place @ 13.41 Lon 52.52 Lat
    temperature_params = {
        'longitude' : 13.41,
        'latitude' : 52.52
        }

    temperature_data = fetch_weather_data(url, temperature_params)
    # <https://api.open-meteo.com/v1/forecast?latitude=52.52&longitude=13.41>
    print(temperature_data.text)

    yemen_elevation = get_elevation(temperature_data)
    print(f"The elevation off the coast of Yemen is {yemen_elevation}")

if __name__ == '__main__':
    # do stuff
    print("We are going to fetch some weather data")
    main()
    print("Have a nice day (: ")
๐Ÿ”ฅ 1
๐Ÿ™Œ 1
๐Ÿฆœ 1
c
Copy code
from prefect import task, flow
import requests

@task
def get_data():
    res = requests.get("<https://api.open-meteo.com/v1/forecast?latitude=52.52&longitude=13.41&hourly=temperature_2m>")
    print('ran get_data')
    return res.json()

@task
def parse_data(data):
    lat = data['latitude']
    lon = data['longitude']
    elev = data['elevation']

    out = {"lat": lat,
           "lon": lon,
           "elev": elev}
    print('parsed data')
    return out


@task
def print_data(data):
    print('printing data')
    print(data)


@flow
def my_flow():
    data = get_data()
    parsed_data = parse_data(data)
    print_data(parsed_data)


if __name__ == "__main__":
    my_flow()
๐Ÿ”ฅ 2
sonic 1
j
Our abomination:
Copy code
import httpx
from prefect import flow, task

@task
def fetch_temp(lat: float, lon: float) -> float:
    metric = {"daily": "temperature_2m_max", "timezone": "UTC"}
    return fetch_weather_from_api(lat, lon, metric)

@task
def fetch_humidity(lat: float, lon: float) -> float:
    metric = {"hourly": "relativehumidity_2m"}
    return fetch_weather_from_api(lat, lon, metric)

@task
def fetch_dewpoint(lat: float, lon: float) -> float:
    metric = {"hourly": "dewpoint_2m"}
    return fetch_weather_from_api(lat, lon, metric)

def fetch_weather_from_api(lat: float, lon: float, metric: str) -> float:
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, **metric),
    )
    print(weather)
    return float(weather.json()[list(metric.keys())[0]][list(metric.values())[0]][0])

@flow
def fetch_weather(lat: float, lon: float):
    temp = fetch_temp(lat, lon)
    humidity = fetch_humidity(lat, lon)
    dewpoint = fetch_dewpoint(lat, lon)
    print(f"Temp (Max Daily for LA): {temp}")
    print(f"Humidity: {humidity}")
    print(f"Dewpoint: {dewpoint}")

if __name__ == "__main__":
    fetch_weather(38.9, -77.0)
๐Ÿ”ฅ 2
squirtle cool 1
j
Copy code
import httpx
from prefect import task, flow

@task
def fetch_weather(lat: float, lon:float):
    base_url = "<https://api.open-meteo.com/v1/forecast>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="temperature_2m")
    )
    most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
    return most_recent_temp

# fetch_weather(51.5072,0.1276)
@task
def save_weather(temp: float):
    with open("weather.csv", "w+") as w:
        w.write(str(temp))
    return "Successfully wrote temp"

@flow
def pipeline(lat: float, lon:float):
    temp = fetch_weather(lat, lon)
    result = save_weather(temp)
    return result    

if __name__ == "__main__":
    pipeline(51.5072,0.1276)
๐Ÿ™Œ 1
blob attention gif 1
s
The flow gets a city name and returns air quality
Copy code
from prefect import flow, task
from requests import get

@task(name='air-quality-data')
def get_air_quality_information(latitude: str, longitude: str) -> list:
    api_response = get(f'<https://air-quality-api.open-meteo.com/v1/air-quality?latitude={latitude}&longitude={longitude}&hourly=pm10,pm2_5>')
    air_quality_data = []
    if api_response.status_code == 200:
        time_data = api_response.json().get('hourly').get('time')
        pm10 = api_response.json().get('hourly').get('pm_10')
        pm2_5 = api_response.json().get('hourly').get('pm2_5')
        for idx, data in enumerate(time_data):
            air_quality_data.append(
                {
                    'hour': data,
                    'pm10': pm10[idx],
                    'pm2_5': pm2_5[idx]
                }
            )
    return air_quality_data


@task(name='co-ordinates-api')
def get_co_ordinates(city_name: str) -> tuple | None:
    api_response = get(f'<https://geocoding-api.open-meteo.com/v1/search?name={city_name}&count=10&language=en&format=json>')
    if api_response.status_code == 200:
        city = api_response.json().get('results')[0]
        latitude = city.get('latitude')
        longitude = city.get('longitude')
        return (latitude, longitude)
    else:
        return None


@flow(name='parent workflow')
def get_meteo_data(city: str):
    co_ordinates = get_co_ordinates(city)
    air_quality_data = get_air_quality_information(co_ordinates[0], co_ordinates[1])
    print(air_quality_data)


if __name__ == '__main__':
    get_meteo_data('Berlin')
๐Ÿ™Œ 1
โ˜๏ธ 1