Share your 102 code in this :thread::
# pacc-clearcover-june-12-2023
j
Share your 102 code in this ๐Ÿงต:
upvote 1
p
Copy code
import httpx
import json 
from prefect import flow, task
from prefect.tasks import task_input_hash

@task(retries=4, cache_key_fn=task_input_hash)
def hit_meteo_api(
    base_url="<https://api.open-meteo.com/v1/forecast>",
    latitude: float = 0,
    longitude: float = 0,
    current_weather: bool = True,
):
    """ """
    params = {
        "latitude": latitude,
        "longitude": longitude,
        "current_weather": current_weather,
    }

    r = httpx.get(base_url, params=params)

    results_dict = json.loads(r.text)
    print(results_dict)

    return results_dict["current_weather"]


# subflow 1
@flow
def grab_data():
    """_summary_
    """
    weather_1_dict = hit_meteo_api(latitude=64, longitude=64)
    weather_2_dict = hit_meteo_api(latitude=-64, longitude=-64)

    return weather_1_dict, weather_2_dict


@task
def aggregate_dicts(weather_dict1: dict, weather_dict2: dict):
    """_summary_

    Args:
        weather_dict1 (dict): _description_
        weather_dict2 (dict): _description_
    """
    output_list = [weather_dict1, weather_dict2]
    print(output_list)

    return output_list

# subflow 2
@flow
def subflow(weather_dict1, weather_dict2):
    """
    
    """
    aggregate_dicts(weather_dict1, weather_dict2)



# main flow that runs subflow 1 + 2
@flow(log_prints=True)
def run_everything():
    """
    
    """
    weather_1_dict, weather_2_dict = grab_data()
    # aggregate_dicts(weather_1_dict, weather_2_dict)
    subflow(weather_1_dict, weather_2_dict)
    print("*************")
    print(weather_1_dict)
    print(weather_2_dict)


if __name__ == "__main__":
    run_everything()
sonic 1
๐Ÿš€ 3
t
Copy code
import httpx  # requests capability, but can work with async
from prefect import flow, task
import numpy as np

@task(retries=4, retry_delay_seconds=0.1)
def fetch_weather(lat: float, lon: float):
    base_url = "<https://api.open-meteo.com/v1/forecast/>"
    weather = httpx.get(
        base_url,
        params=dict(latitude=lat, longitude=lon, hourly=["temperature_2m","precipitation","windspeed_10m"]),
    )
    return weather.json()

@task
def extract_temperatures(data):
    return np.asarray(data["hourly"]["temperature_2m"])

@task
def extract_precipitation(data):
    return np.asarray(data["hourly"]["precipitation"])

def average(data_points):
    data = np.asarray(data_points)
    return np.average(data)

@task
def save_weather(temp: float, precipitation: float):
    with open("weather.csv", "w+") as w:
        w.write(f"{str(temp)},{str(precipitation)}")
    return "Successfully wrote temp"


@flow
def getAverageTemp(data):
    temps = extract_temperatures(data)
    print(temps)
    average_temp = average(temps)
    return average_temp

@flow
def getAveragePrecip(data):
    precip = extract_precipitation(data)
    print(precip)
    average_precip = average(precip)
    return average_precip

@flow(log_prints=True)
def pipeline(lat: float, lon: float):
    data = fetch_weather(lat, lon)
    avg_temp = getAverageTemp(data)
    avg_precip = getAveragePrecip(data)

    result = save_weather(avg_temp, avg_precip)
    return result


if __name__ == "__main__":
    pipeline(38.9, -77.0)
๐ŸŽ‰ 3
squirtle cool 1
d
Untitled
prefect duck 1
๐Ÿ™Œ 2
s
Copy code
import httpx
from datetime import timedelta
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash


@task(retries=3, retry_delay_seconds=60, cache_key_fn=task_input_hash, cache_expiration=timedelta(minutes=1))
def fetch_weather(lat: float, lon: float):
    weather = httpx.get(
        "<https://api.open-meteo.com/v1/forecast/>",
        params=dict(latitude=lat, longitude=lon, hourly=["temperature_2m", "precipitation"]),
    )
    return weather


@flow
def get_temp(weather):
    logger = get_run_logger()
    <http://logger.info|logger.info>("Fetching temperature data...")
    most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
    return most_recent_temp


@flow
def get_precipitation(weather):
    logger = get_run_logger()
    <http://logger.info|logger.info>("Fetching precipitation data...")
    most_recent_precipitation = float(weather.json()["hourly"]["precipitation"][0])
    return most_recent_precipitation


@flow(name="GetWeatherDataLab102", log_prints=True)
def get_all_weather_data():
    fetch_api_data = fetch_weather(38.9, -79.0)
    temp = get_temp(fetch_api_data)
    precipitation = get_precipitation(fetch_api_data)
    print(f"Temperature: {temp} \nPrecipitation: {precipitation}")


if __name__ == "__main__":
    get_all_weather_data()
๐Ÿ™Œ 1
๐Ÿฆœ 1