https://prefect.io logo
e

Emil Christensen

08/31/2023, 12:41 AM
@Elizabeth Bing - 🧵 for error
e

Elizabeth Bing

08/31/2023, 12:41 AM
Traceback (most recent call last): File "/opt/homebrew/Caskroom/miniconda/base/bin/prefect", line 5, in <module> from prefect.cli import app File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/site-packages/prefect/__init__.py", line 45, in <module> from prefect.engine import pause_flow_run, resume_flow_run File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/site-packages/prefect/engine.py", line 98, in <module> from anyio import start_blocking_portal ImportError: cannot import name 'start_blocking_portal' from 'anyio' (/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/site-packages/anyio/__init__.py)
e

Emil Christensen

08/31/2023, 12:44 AM
docker run -it --rm -v $PWD:/opt/prefect/flows/ prefecthq/prefect:2-latest bash
e

Elizabeth Bing

08/31/2023, 12:47 AM
import httpx
from prefect import flow, task import pandas as pd from prefect.tasks import task_input_hash # latitude 26.1 North, longitude 84.8 West # TASKS # cache_key determines the inputs @task(cache_key_fn=task_input_hash, cache_expiration=datetime.timedeltas=) def fetch_weather(lat: float, lon: float): base_url = "https://api.open-meteo.com/v1/forecast/" weather = httpx.get( base_url, params=dict( latitude=lat, longitude=lon, hourly="temperature_2m,cloudcover_mid,windspeed_10m,winddirection_10m", ), ) weather_json = weather.json() df = pd.DataFrame( { "latitude": weather_json["latitude"], "longitude": weather_json["longitude"], "temperature": weather.json()["hourly"]["temperature_2m"], "wind": weather.json()["hourly"]["windspeed_10m"], "cloudcover": weather.json()["hourly"]["cloudcover_mid"], } ) return df @task def save_weather_data(data: pd.DataFrame): data.to_csv("weather_data.csv") return "successfully wrote file" @task def get_latest_value(data: pd.DataFrame, metric: str): print(f"latest data for {metric} {data[metric].iloc[-1]}") # FLOW # Determines the order in which tasks are called @flow(retries=4) def get_weather_metrics(lat: float, lon: float): df = fetch_weather(lat=lat, lon=lon) # print out latest wind metric get_latest_value(df, "wind") save_weather_data(df) # Start the flow if name == "__main__": get_weather_metrics(38.9, -77.0)
e

Emil Christensen

08/31/2023, 12:55 AM
Issue was a syntax error:
Copy code
@task(cache_key_fn=task_input_hash, cache_expiration=datetime.timedeltas=)
Seems to affect parsing of all files.
e

Elizabeth Bing

08/31/2023, 1:05 AM
Thanks for helping with the discovery of another bug/fixing team 🙂 Btw, have you come across this issue with deployment? RuntimeError: Docker worker cannot be used with an ephemeral server.
I have created a workpool in docker
e

Emil Christensen

08/31/2023, 1:24 AM
Hmm… no I haven’t seen that one. Let me take a look…
Hmm, ok I see. So you need to either be pointing to cloud or have a local server running.
So in another terminal run
prefect server start
Then in your original terminal run
prefect config set PREFECT_API_URL=<http://127.0.0.1:4200/api>
Then you should be able to start the worker.
Or you can log into cloud and that will have the same effect
e

Elizabeth Bing

08/31/2023, 3:38 AM
I see I see, thanks Emil!