One person from each group share your code in this...
# pacc-clearcover-june-12-2023
j
One person from each group share your code in this thread here.
upvote 1
p
Copy code
import httpx
import json

from prefect import flow, task


@task
def hit_meteo_api(
    base_url="<https://api.open-meteo.com/v1/forecast>",
    latitude: float = 0,
    longitude: float = 0,
    current_weather: bool = True,
):
    """ """
    params = {
        "latitude": latitude,
        "longitude": longitude,
        "current_weather": current_weather,
    }

    r = httpx.get(base_url, params=params)

    results_dict = json.loads(r.text)
    print(results_dict)

    return results_dict["current_weather"]


@task
def get_temp(weather_dict: dict):
    """ """
    temperature = weather_dict["temperature"]

    return temperature


@task
def write_to_file(temperature: float):
    """_summary_"""
    with open("lab_101_temperature.txt", "w") as f:
        f.write(str(temperature))

    print("we did it!")


@flow
def run_everything():
    """ """
    results_dict = hit_meteo_api()
    temp = get_temp(results_dict)
    write_to_file(temp)


if __name__ == "__main__":
    run_everything()
🙌 5
🔥 1
s
Copy code
import httpx  # requests capability, but can work with async
from prefect import flow, task

BASE_URL = "<https://api.open-meteo.com/v1/forecast/>"

@task
def fetch_weather(lat: float, lon: float):
    weather = httpx.get(
        BASE_URL,
        params=dict(latitude=lat, longitude=lon, hourly=["temperature_2m", "precipitation"]),
    )
    return weather


@task
def get_weather_data(weather):
    most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
    most_recent_precipitation = float(weather.json()["hourly"]["precipitation"][0])
    return most_recent_temp, most_recent_precipitation


@task
def save_weather(temp: float, precipitation: float):
    with open("weather.csv", "w+") as w:
        weather_output = f"{str(temp)}, {str(precipitation)}"
        w.write(weather_output)
    return "Successfully wrote temp"


@flow(log_prints=True)
def pipeline(lat: float, lon: float):
    weather_data = fetch_weather(lat, lon)
    temp_metrics, precipitation_metrics = get_weather_data(weather_data)
    print(temp_metrics, precipitation_metrics)
    result = save_weather(temp_metrics, precipitation_metrics)
    return result


if __name__ == "__main__":
    pipeline(38.9, -77.0)
🙌 3
🦜 3
l
Copy code
import httpx  # requests capability, but can work with async
from prefect import flow, task


@task
def fetch_temperature(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
    )
    most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
    return most_recent_temp


@task
def save_weather(temp: float,windspeed:float):
    with open("weather.csv", "w+") as w:
        w.writelines(['temp is '+str(temp),'windspeed is ' +str(windspeed)])
        #w.writelines('windspeed is ' +str(windspeed))
    return "Successfully wrote weather"

@task
def fetch_windspeed(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="windspeed_10m"),
    )
    most_recent_windspeed = float(weather.json()["hourly"]["windspeed_10m"][0])
    return most_recent_windspeed

@flow
def pipeline(lat: float, lon: float):
    temp = fetch_temperature(lat, lon)
    windspeed=fetch_windspeed(lat,lon)
    result = save_weather(temp,windspeed)
    return result


if __name__ == "__main__":
    pipeline(38.9, -77.0)
👍 3
wizard2 4
m
Copy code
import json

from prefect import flow, task

import httpx

@task
def fetch_weather(lat: float, lon: float) -> dict:
    base_url = "<https://api.open-meteo.com/v1/forecast>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly="temperature_2m")
    )
    json_payload = weather.json()
    return json_payload


@task
def print_data(data: dict):
    print(data)
    return data

@task
def save_data(data: dict):
    json_object = json.dumps(data, indent=4)
    with open("data.json", "w") as f:
        f.write(json_object)
    return data

@flow
def get_data():
    data = fetch_weather(52.52, 13.40)
    print_data(data)
    save_data(data)


if __name__ == "__main__":
    get_data()
🦜 4
blob attention gif 1
d
Untitled
👏 1
🎉 4