https://prefect.io logo
j

Jeff Hale

09/13/2023, 3:01 PM
Share code from 102 lab in this ๐Ÿงต:
sonic 1
j

Jonas

09/13/2023, 3:01 PM
import httpx # requests capability, but can work with async from prefect import flow, task from prefect.artifacts import create_markdown_artifact from prefect.tasks import task_input_hash @task(retries = 3, cache_key_fn=task_input_hash) def fetch_weather(lat: float, lon: float): base_url = "https://api.open-meteo.com/v1/forecast/" weather = httpx.get( base_url, params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"), ) most_recent_temp = float(weather.json()["hourly"]["temperature_2m"][0]) create_markdown_artifact( key = 'report', markdown = "#whoohoo", description = 'just a try to do something' ) return most_recent_temp @task def save_weather(temp: float): with open("weather.csv", "w+") as w: w.write(str(temp)) return "Successfully wrote temp" @flow() def pipeline(lat: float, lon: float): temp = fetch_weather(lat, lon) result = save_weather(temp) return result if name == "__main__": pipeline.serve(name='just-another-test')
wizard2 1
t

Tom A

09/13/2023, 3:02 PM
Copy code
import httpx
from prefect import flow, task, artifacts
from prefect.tasks import task_input_hash
from datetime import timedelta

METEO_URL = "<https://api.open-meteo.com/v1/forecast/>"

@task(retries=2, retry_delay_seconds=5)
def fetch_metric(lat, lon, metric_name):
    response_json = httpx.get(
        METEO_URL,
        params=dict(latitude=lat, longitude=lon, hourly=metric_name),
    )
    metric_value = float(response_json.json()["hourly"][metric_name][0])
    print(f"Most recent temp C: {response_json} degrees")
    return(metric_value)

@task()
def write_to_file(filename, metrics):
    file=open(filename, "w")
    for metric_name, metric_value in metrics:
        file.write(f"Metric: {metric_name} is {metric_value}\n")
    file.close()

@task()
def generate_artifact(lat, lon, metrics):
    markdown = f"""
# Weather Report for {lat}, {lon}
    
| Metric Name | Value |
|-------------|-------|
"""
    for metric_name, metric_value in metrics:
        markdown += f'|{metric_name}|{metric_value}|\n'
    print(markdown)
    artifacts.create_markdown_artifact(
        key='weather-info',
        markdown=markdown,
        description='Data'
    )

@flow()
def fetch_weather(lat: float, lon: float):
    temp = fetch_metric(lat, lon, "temperature_2m")
    humidity = fetch_metric(lat, lon, "relativehumidity_2m")
    dewpoint = fetch_metric(lat, lon, "dewpoint_2m")
    metrics = [('temp', temp), ('humidity', humidity), ('dewpoint', dewpoint)]
    write_to_file('weather_output.txt', metrics )
    generate_artifact(lat,lon,metrics)

if __name__ == "__main__":
    fetch_weather(38.9, -77.0)
    fetch_weather.serve('my_weather_flow')
sonic 1
e

Elinor Scruby

09/13/2023, 3:05 PM
Copy code
import httpx
from prefect import flow, task
from prefect.artifacts import create_table_artifact

@task
def fetch_cat_fact():
    return httpx.get("<https://catfact.ninja/fact?max_length=140>").json()["fact"]

@task
def fetch_dog_fact():
    return httpx.get(
        "<https://dogapi.dog/api/v2/facts>",
        headers={"accept": "application/json"},
    ).json()["data"][0]["attributes"]["body"]

@flow
def animal_facts():
    cat_fact = fetch_cat_fact()
    dog_fact = fetch_dog_fact()
    
    fact_table = [
            {'animal':'cat', 'fact':cat_fact},
            {'animal':'dog','fact':dog_fact}
        ]

    create_table_artifact(
        key="animal-facts",
        table=fact_table,
        description="# Here are some fun animal facts"
    )

if __name__ == "__main__":
    animal_facts()
๐Ÿ™Œ 1
d

David Maxson

09/13/2023, 3:06 PM
Copy code
import csv
from datetime import timedelta

import httpx
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
from prefect.tasks import task_input_hash
from prefect.artifacts import create_table_artifact

METEO_URL = "<https://api.open-meteo.com/v1/forecast/>"


@task(retries=3, cache_key_fn=task_input_hash, cache_expiration=timedelta(minutes=1))
def fetch_metric(lat: float, lon: float, metric_rate: str, metric_name: str):
    weather = httpx.get(
        METEO_URL,
        params=dict(latitude=lat, longitude=lon, **{metric_rate: metric_name}),
    )
    most_recent_temp = float(weather.json()[metric_rate][metric_name][0])
    return most_recent_temp


@task(retries=3, cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1))
def save_metrics(filename: str, lat: float, lon: float, metric_results: list[tuple[str, str, float]]):
    with open(filename, 'w') as f:
        writer = csv.DictWriter(f, fieldnames=['lat', 'lon', 'metric_rate', 'metric_name', 'value'])
        writer.writeheader()
        for metric_rate, metric_name, value in metric_results:
            writer.writerow(dict(lat=lat, lon=lon, metric_rate=metric_rate, metric_name=metric_name, value=value))


@task
def report_metrics(metric_results):
    create_table_artifact(
        key="weather-report",
        table=[
            dict(zip(["Frequency", "Metric", "Value"], vals))
            for vals in metric_results
        ]
    )


@flow(task_runner=ConcurrentTaskRunner())
def open_meteo_dataflow(lat: float, lon: float, metrics: list[tuple[str, str]] = (
    ('hourly', 'apparent_temperature'),
), filename: str = 'lab2.csv'):
    metric_values = [(metric_rate, metric_name, fetch_metric(lat, lon, metric_rate, metric_name)) for metric_rate, metric_name in metrics]
    save_metrics(filename, lat, lon, metric_values)
    report_metrics(metric_values)


if __name__ == '__main__':
    open_meteo_dataflow.serve("lab-2")
๐Ÿฆœ 1
b

Beizhen

09/13/2023, 3:06 PM
Hey @Christopher Boyd regarding the markdown, we found out that it is because of the indentation. So there should be no indentation in the markdown text and that worked.
๐Ÿ™ 1