Jeff Hale
09/13/2023, 3:01 PMJonas
09/13/2023, 3:01 PMTom A
09/13/2023, 3:02 PMimport httpx
from prefect import flow, task, artifacts
from prefect.tasks import task_input_hash
from datetime import timedelta
METEO_URL = "<https://api.open-meteo.com/v1/forecast/>"
@task(retries=2, retry_delay_seconds=5)
def fetch_metric(lat, lon, metric_name):
response_json = httpx.get(
METEO_URL,
params=dict(latitude=lat, longitude=lon, hourly=metric_name),
)
metric_value = float(response_json.json()["hourly"][metric_name][0])
print(f"Most recent temp C: {response_json} degrees")
return(metric_value)
@task()
def write_to_file(filename, metrics):
file=open(filename, "w")
for metric_name, metric_value in metrics:
file.write(f"Metric: {metric_name} is {metric_value}\n")
file.close()
@task()
def generate_artifact(lat, lon, metrics):
markdown = f"""
# Weather Report for {lat}, {lon}
| Metric Name | Value |
|-------------|-------|
"""
for metric_name, metric_value in metrics:
markdown += f'|{metric_name}|{metric_value}|\n'
print(markdown)
artifacts.create_markdown_artifact(
key='weather-info',
markdown=markdown,
description='Data'
)
@flow()
def fetch_weather(lat: float, lon: float):
temp = fetch_metric(lat, lon, "temperature_2m")
humidity = fetch_metric(lat, lon, "relativehumidity_2m")
dewpoint = fetch_metric(lat, lon, "dewpoint_2m")
metrics = [('temp', temp), ('humidity', humidity), ('dewpoint', dewpoint)]
write_to_file('weather_output.txt', metrics )
generate_artifact(lat,lon,metrics)
if __name__ == "__main__":
fetch_weather(38.9, -77.0)
fetch_weather.serve('my_weather_flow')
Elinor Scruby
09/13/2023, 3:05 PMimport httpx
from prefect import flow, task
from prefect.artifacts import create_table_artifact
@task
def fetch_cat_fact():
return httpx.get("<https://catfact.ninja/fact?max_length=140>").json()["fact"]
@task
def fetch_dog_fact():
return httpx.get(
"<https://dogapi.dog/api/v2/facts>",
headers={"accept": "application/json"},
).json()["data"][0]["attributes"]["body"]
@flow
def animal_facts():
cat_fact = fetch_cat_fact()
dog_fact = fetch_dog_fact()
fact_table = [
{'animal':'cat', 'fact':cat_fact},
{'animal':'dog','fact':dog_fact}
]
create_table_artifact(
key="animal-facts",
table=fact_table,
description="# Here are some fun animal facts"
)
if __name__ == "__main__":
animal_facts()
David Maxson
09/13/2023, 3:06 PMimport csv
from datetime import timedelta
import httpx
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
from prefect.tasks import task_input_hash
from prefect.artifacts import create_table_artifact
METEO_URL = "<https://api.open-meteo.com/v1/forecast/>"
@task(retries=3, cache_key_fn=task_input_hash, cache_expiration=timedelta(minutes=1))
def fetch_metric(lat: float, lon: float, metric_rate: str, metric_name: str):
weather = httpx.get(
METEO_URL,
params=dict(latitude=lat, longitude=lon, **{metric_rate: metric_name}),
)
most_recent_temp = float(weather.json()[metric_rate][metric_name][0])
return most_recent_temp
@task(retries=3, cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1))
def save_metrics(filename: str, lat: float, lon: float, metric_results: list[tuple[str, str, float]]):
with open(filename, 'w') as f:
writer = csv.DictWriter(f, fieldnames=['lat', 'lon', 'metric_rate', 'metric_name', 'value'])
writer.writeheader()
for metric_rate, metric_name, value in metric_results:
writer.writerow(dict(lat=lat, lon=lon, metric_rate=metric_rate, metric_name=metric_name, value=value))
@task
def report_metrics(metric_results):
create_table_artifact(
key="weather-report",
table=[
dict(zip(["Frequency", "Metric", "Value"], vals))
for vals in metric_results
]
)
@flow(task_runner=ConcurrentTaskRunner())
def open_meteo_dataflow(lat: float, lon: float, metrics: list[tuple[str, str]] = (
('hourly', 'apparent_temperature'),
), filename: str = 'lab2.csv'):
metric_values = [(metric_rate, metric_name, fetch_metric(lat, lon, metric_rate, metric_name)) for metric_rate, metric_name in metrics]
save_metrics(filename, lat, lon, metric_values)
report_metrics(metric_values)
if __name__ == '__main__':
open_meteo_dataflow.serve("lab-2")
Beizhen
09/13/2023, 3:06 PM