Brendan Shanahan
05/31/2023, 1:54 PMKalo
05/31/2023, 1:57 PMEmiliano
05/31/2023, 1:59 PMAndrea Borruso
05/31/2023, 1:59 PMwindspeed_10m
import httpx # requests capability, but can work with async
from prefect import flow, task
@task
def fetch_weather(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
)
most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0])
return most_recent_temp
@task
def fetch_windspeed(lat: float, lon: float):
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="windspeed_10m"),
)
most_recent_windspeed = float(weather.json()["hourly"]["windspeed_10m"][0])
return most_recent_windspeed
@task
def save_weather(temp: float):
with open("weather.csv", "w+") as w:
w.write(str(temp))
return "Successfully wrote temp"
@task
def save_windspeed(temp: float):
with open("windspeed.csv", "w+") as w:
w.write(str(temp))
return "Successfully wrote temp"
@flow
def pipeline(lat: float, lon: float):
temp = fetch_weather(lat, lon)
result = save_weather(temp)
return result
if __name__ == "__main__":
pipeline(38.9, -77.0)
Sam Cook
05/31/2023, 2:00 PMAmir Volfovich
05/31/2023, 2:09 PMfrom enum import Enum, auto
import httpx # requests capability, but can work with async
from prefect import flow, task
class TimeResolution(Enum):
hourly = auto()
daily = auto()
@task
def fetch_weather_hourly_param(lat: float, lon: float, resolution: TimeResolution, param: str) -> dict:
base_url = "<https://api.open-meteo.com/v1/forecast/>"
weather = httpx.get(
base_url,
params={
"latitude": lat,
"longitude": lon,
resolution.name: param
}
)
return weather.json()
@task
def save_weather(temp: float):
with open("weather.csv", "w+") as w:
w.write(str(temp))
return "Successfully wrote temp"
@task
def extract_data_point(response: dict, resolution: TimeResolution, param: str, hour: int) -> float:
return float(response[resolution.name][param][hour])
@task
def print_float_param(data: float, resolution: TimeResolution, param: str):
print(f"for param {param}, in resoltion {resolution.name} data is {data}")
@flow
def pipeline(lat: float, lon: float, resolution: TimeResolution, item_pos: int, params: list):
params_jsons = fetch_weather_hourly_param.map(lat, lon, resolution, params)
float_data = extract_data_point.map(params_jsons, resolution, params, item_pos)
return print_float_param.map(float_data, resolution, params)
if __name__ == "__main__":
params = ["temperature_2m", "relativehumidity_2m", "dewpoint_2m"]
pipeline(38.9, -77.0, TimeResolution.hourly, 0, params)
Emil Christensen
05/31/2023, 2:13 PMEmil Christensen
05/31/2023, 2:17 PMJeff Hale
05/31/2023, 2:27 PMJeff Hale
05/31/2023, 2:27 PMLi McCarthy
05/31/2023, 2:31 PMJeff Hale
05/31/2023, 2:33 PMSonglu Li
05/31/2023, 2:34 PMQuan Bach
05/31/2023, 2:45 PM10:37:45.809 | ERROR | prefect.server.services.telemetry - Unexpected error in: OperationalError('(sqlite3.OperationalError) database is locked')
Luke
05/31/2023, 2:49 PMdef distance_to_collector(lat,lon):
params = {"latitude":lat, "longitude":lon, "hourly":"temperature_2m"}
result = httpx.get("<https://api.open-meteo.com/v1/forecast/>",params = params).json()
collector_lat = result['latitude']
collector_lon = result['longitude']
return haversine.haversine((lat,lon),(collector_lat,collector_lon),unit=haversine.Unit.FEET)
Jeff Hale
05/31/2023, 2:52 PMJeff Hale
05/31/2023, 3:09 PMEmil Christensen
05/31/2023, 3:11 PMKalo
05/31/2023, 3:12 PMBrendan Shanahan
05/31/2023, 3:13 PMMatt Conger
05/31/2023, 3:18 PMLi McCarthy
05/31/2023, 3:21 PMprefect version
see active workspace prefect cloud workspaces ls
Tom Kaplan
05/31/2023, 3:23 PMAmir Volfovich
05/31/2023, 3:27 PMmat swinianski
05/31/2023, 3:38 PMQuan Bach
05/31/2023, 3:45 PMJeff Hale
05/31/2023, 3:46 PMBrendan Shanahan
05/31/2023, 3:48 PMluca pugliese
05/31/2023, 3:50 PMMichal Drobena
05/31/2023, 3:51 PM