Jeff Hale
08/16/2023, 3:44 PMI'm in a triple backtick code block ``
morgan
08/16/2023, 4:02 PMimport httpx # requests capability, but can work with async
from prefect import flow, task
import csv
@task
def fetch_weather(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
)
weather = float(weather.json()["hourly"]["temperature_2m"][0])
return weather
@task
def get_wind(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="windspeed_10m"),
)
weather = float(weather.json()["hourly"]["windspeed_10m"][0])
return weather
@task
def save_weather(weather):
with open("weather.csv", "w+") as w:
w.write(str(weather))
return "Successfully wrote temp"
@flow
def pipeline(lat: float, lon: float):
temp = fetch_weather(lat, lon)
wind_speed = get_wind(lat, lon)
weather_report = {
1:wind_speed,
2:temp
}
result = save_weather(weather_report)
return result
if __name__ == "__main__":
pipeline(38.9, -77.0)
Mike Larsson
08/16/2023, 4:03 PMAlex Rodriguez
08/16/2023, 4:04 PMimport httpx
from prefect import flow, task @task
def get_rain(lat: float, lon: float) -> float:
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="precipitation"),
)
return float(weather.json()["hourly"]["precipitation"][0])
@task
def get_humidity(lat: float, lon: float) -> float:
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="relativehumidity_2m"),
)
return float(weather.json()["hourly"]["relativehumidity_2m"][0])
@task
def get_temp(lat: float, lon: float) -> float:
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
)
return float(weather.json()["hourly"]["temperature_2m"][0])
@task
def print_results(temp: float, humidity: float, rain: float):
print(f"Temperature : {temp}, Relative Humitidy : {humidity}, Rain : {rain}%")
@flow
def test_flow(lat: float, lon: float):
temp = get_temp(lat, lon)
humidity = get_humidity(lat, lon)
rain = get_rain(lat, lon)
print_results(temp, humidity, rain)
if __name__ == "__main__":
test_flow(1, 1)
sfrasier
08/16/2023, 4:04 PMimport httpx
from prefect import task, flow
@task
def fetch_weather(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
)
most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
print(f"Most recent temp C: {most_recent_temp} degrees")
return most_recent_temp
@task
def fetch_dewpoint(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="dewpoint_2m"),
)
most_recent_dewpoint = float(weather.json()["hourly"]["dewpoint_2m"][0])
print(f"Most recent dewpoint C: {most_recent_dewpoint} degrees")
return most_recent_dewpoint
@task
def fetch_windspeed(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="windspeed_10m"),
)
most_recent_windspeed = float(weather.json()["hourly"]["windspeed_10m"][0])
print(f"Most recent windspeed C: {most_recent_windspeed} degrees")
return most_recent_windspeed
@flow
def weather_pipeline(lat: float, lon: float):
fetch_weather(lat, lon)
fetch_dewpoint(lat, lon)
fetch_windspeed(lat, lon)
if __name__ == "__main__":
weather_pipeline(38.9, -77.0)
Nathaniel Russell
08/16/2023, 4:05 PMfrom prefect import flow, task, tags
from prefect.logging import get_run_logger
import httpx
from typing import List
def fetch_weather(lat: float, lon: float):
logger = get_run_logger()
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
)
most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
<http://logger.info|logger.info>(f"Most recent temp C: {most_recent_temp} degrees")
return most_recent_temp
@task(name="fetch_many")
def fetch_many_weather(lat:float):
lon = -175
temp_list = {}
while lon < 180:
temp = fetch_weather(lat, lon)
temp_list[lon] = temp
lon += 5
return temp_list
@task(name="save_weather_info")
def save_weather(temp: List[float]):
with open("weather.csv", "w+") as w:
w.write(str(temp))
return "Successfully wrote temp"
@task(name="display")
def display(temp_list):
logger = get_run_logger()
<http://logger.info|logger.info>(temp_list)
@flow(name="weather_pipeline")
def pipeline(lat: float):
temp = fetch_many_weather(lat)
result = save_weather(temp)
display(temp)
return result
if __name__ == "__main__":
pipeline(38.9)
Rahul Viswanath C
08/16/2023, 4:05 PMfrom prefect import flow, task
import httpx
@task
def fetch_weather(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat , longitude=lon , hourly="temperature_2m"),
)
most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
return most_recent_temp
@task
def fetch_windspeed(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat , longitude=lon , hourly="windspeed_10m"),
)
most_recent_windspeed = float(weather.json()["hourly"]["windspeed_10m"][0])
return most_recent_windspeed
@task
def save_weather(temp: float):
with open("weather.csv","w+") as w:
w.write(str(temp))
return "Successfully wrote temp"
@task
def save_windspeed(temp: float):
with open("windspeed.csv","w+") as w:
w.write(str(temp))
return "Successfully wrote temp"
@flow
def pipeline(lat: float, lon: float):
temp = fetch_weather(lat,lon)
weather_result = save_weather(temp)
windspeed = fetch_windspeed(lat,lon)
windspeed_result = save_windspeed(windspeed)
print(f"{weather_result} , {windspeed_result}")
if __name__ == "__main__":
pipeline(38.9,-77.0)
Sanjeev
08/16/2023, 4:35 PM