Alejandro
10/31/2022, 10:52 AMimport random
from collections import namedtuple
from datetime import date, datetime, time
import pandas as pd
from prefect import flow, task
WeatherConditions = namedtuple(
"WeatherConditions", ["wind_speed", "temperature", "rel_humidity"]
)
@task
def register_current_weather() -> WeatherConditions:
return WeatherConditions(
wind_speed=random.weibullvariate(3, 1.5),
temperature=random.uniform(-5, 25),
rel_humidity=random.uniform(0, 100),
)
@task
def upload_to_database(station_data: pd.DataFrame) -> None:
print("Updating weather database with the following data:")
print(station_data)
print("Observations were successfully recorded")
@flow
def surface_station_daily_weather(station: str, freq: str = "H") -> pd.DataFrame:
print(f"Daily weather observations for station {station.title()!r}")
timestamps = pd.date_range(
start=date.today(), end=datetime.combine(datetime.now(), time.max), freq=freq
)
observations = [register_current_weather() for _ in range(len(timestamps))]
return pd.DataFrame(data=observations, index=timestamps)
@flow
def weather_app(station_names: list[str]) -> None:
print("Welcome to the world's fastest weather data collection application!")
for station in station_names:
station_weather = surface_station_daily_weather(station=station, freq="3H")
upload_to_database(station_data=station_weather)
print(
"Daily observations have been updated for all operational stations. See you soon!"
)
if __name__ == "__main__":
STATIONS = [
"bilbao_station",
"oviedo_station",
"salamanca_station",
"badajoz_station",
]
weather_app(station_names=STATIONS)
I was wondering what is the recommended way to run subflows in parallel (not concurrently). In this case, the subflow surface_station_daily_weather
is executed sequentially (as far as I know there is no way to use the submit
mechanism with a flow). Is it advisable to use the multiprocessing library for this purpose? Or is there any built-in functionality for it?redsquare
10/31/2022, 1:02 PMAlejandro
10/31/2022, 1:28 PMredsquare
10/31/2022, 1:34 PMAlejandro
10/31/2022, 1:53 PMimport asyncio
from prefect import flow
@flow
async def subflow(n: int):
print(f"Subflow {n} started!")
await asyncio.sleep(1)
@flow
async def main_flow():
await asyncio.gather(*[subflow(n + 1) for n in range(4)])
if __name__ == "__main__":
main_flow_state = asyncio.run(main_flow())
If you run this code, you will see the following error:
RuntimeError: The task runner is already started!
Khuyen Tran
10/31/2022, 3:35 PMMarc Lipoff
10/31/2022, 9:57 PMKhuyen Tran
11/01/2022, 4:46 PMimport asyncio
from prefect import flow, task
@task
async def my_task(n: int):
print(f"Task {n} started!")
await asyncio.sleep(1)
async def build_subflow(n):
@flow(name=f"subflow:{n}")
async def subflow(x):
await my_task(x)
return subflow
@flow
async def main_flow():
await asyncio.gather(*[build_subflow(n + 1) for n in range(4)])
if __name__ == "__main__":
main_flow_state = asyncio.run(main_flow())
Marc Lipoff
11/01/2022, 4:48 PMTimo
11/08/2022, 7:52 AMimport asyncio
from prefect import flow, task, get_run_logger
@task
async def my_task(n: int):
logger = get_run_logger()
<http://logger.info|logger.info>(f"Task {n} started!")
await asyncio.sleep(1)
async def build_subflow(n):
@flow(name=f"subflow:{n}")
async def subflow(x):
await my_task(x)
return subflow
@task
def input_task():
return [1, 2, 3, 4, 5, 6]
@flow
async def main_flow():
inputs = input_task.submit()
await asyncio.gather(*[build_subflow(n + 1) for n in inputs])
if __name__ == "__main__":
main_flow_state = asyncio.run(main_flow())
Khuyen Tran
11/10/2022, 4:03 PMTimo
11/11/2022, 10:32 AM11:18:41.508 | DEBUG | prefect.client - Using ephemeral application with database at <postgresql+asyncpg://xxxx@prefect_postgres:5432/orion_db>
11:18:41.698 | INFO | prefect.engine - Created flow run 'crouching-sloth' for flow 'main-flow'
11:18:41.699 | DEBUG | Flow run 'crouching-sloth' - Starting 'ConcurrentTaskRunner'; submitted tasks will be run concurrently...
11:18:41.700 | DEBUG | prefect.task_runner.concurrent - Starting task runner...
11:18:41.703 | DEBUG | prefect.client - Using ephemeral application with database at <postgresql+asyncpg://xxxx@prefect_postgres:5432/orion_db>
11:18:41.806 | DEBUG | Flow run 'crouching-sloth' - Executing flow 'main-flow' for flow run 'crouching-sloth'...
11:18:41.807 | DEBUG | Flow run 'crouching-sloth' - Beginning execution...
11:18:41.851 | DEBUG | prefect.task_runner.concurrent - Shutting down task runner...
11:18:41.853 | INFO | Flow run 'crouching-sloth' - Finished in state Completed()
11:18:41.854 | DEBUG | prefect.client - Using ephemeral application with database at <postgresql+asyncpg://xxxx@prefect_postgres:5432/orion_db>
In this example from @Anna Geller build_subflow(n)
isn't a async function. Could this be a typo in your example?Khuyen Tran
11/11/2022, 5:21 PMimport asyncio
from prefect import flow, task, get_run_logger
@task
async def my_task(n: int):
get_run_logger().info(f"Task {n} started!")
await asyncio.sleep(1)
async def build_subflow(n):
@flow(name=f"subflow:{n}")
async def subflow(x):
await my_task(x)
await subflow(n)
@flow
async def main_flow():
await asyncio.gather(*[build_subflow(n + 1) for n in range(4)])
if __name__ == "__main__":
main_flow_state = asyncio.run(main_flow())
Khuyen Tran
11/11/2022, 5:24 PMNils
12/07/2022, 4:20 PMKhuyen Tran
12/07/2022, 4:23 PM