Jon
08/22/2023, 1:47 PMIsabel
08/22/2023, 1:51 PMimport httpx # requests capability, but can work with async
from prefect import flow, task
@task
def fetch_temp(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
)
most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
return most_recent_temp
@task
def fetch_wind(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="windspeed_10m"),
)
most_recent_wind = float(weather.json()["hourly"]["windspeed_10m"][0])
return most_recent_wind
@task
def save_weather(temp: float):
with open("weather.csv", "w+") as w:
w.write(str(temp))
return "Successfully wrote temp"
@flow
def pipeline(lat: float, lon: float):
temp = fetch_temp(lat, lon)
result = save_weather(temp)
x = fetch_wind(lat, lon)
result = save_weather(x)
return result
if __name__ == "__main__":
pipeline(38.9, -77.0)
Jon
08/22/2023, 2:02 PMimport httpx # requests capability, but can work with async
from prefect import flow, task, logging
@task
def fetch_temp(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(
latitude=lat,
longitude=lon,
hourly="temperature_2m",
),
)
most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
return most_recent_temp
@task
def fetch_wind(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(
latitude=lat,
longitude=lon,
hourly="windspeed_10m",
),
)
most_recent_wind = float(weather.json()["hourly"]["windspeed_10m"][0])
return most_recent_wind
@task
def save_weather(temp: float):
with open("weather.csv", "w+") as w:
w.write(str(temp))
return "Successfully wrote temp"
@flow
def pipeline(lat: float, lon: float):
logger = logging.get_run_logger()
temp = fetch_temp(lat, lon)
result = save_weather(temp)
x = fetch_wind(lat, lon)
<http://logger.info|logger.info>(f"temp >>>>>>>>> {temp }")
<http://logger.info|logger.info>(f"wind >>>>>>>>>> {x}")
result = save_weather(x)
return result
if __name__ == "__main__":
pipeline(38.9, -77.0)
Sanz Al
08/22/2023, 2:06 PM