Hello everyone! I have the following example code:...
# ask-community
a
Hello everyone! I have the following example code:
Copy code
import random
from collections import namedtuple
from datetime import date, datetime, time

import pandas as pd
from prefect import flow, task

WeatherConditions = namedtuple(
    "WeatherConditions", ["wind_speed", "temperature", "rel_humidity"]
)


@task
def register_current_weather() -> WeatherConditions:
    return WeatherConditions(
        wind_speed=random.weibullvariate(3, 1.5),
        temperature=random.uniform(-5, 25),
        rel_humidity=random.uniform(0, 100),
    )


@task
def upload_to_database(station_data: pd.DataFrame) -> None:
    print("Updating weather database with the following data:")
    print(station_data)
    print("Observations were successfully recorded")


@flow
def surface_station_daily_weather(station: str, freq: str = "H") -> pd.DataFrame:
    print(f"Daily weather observations for station {station.title()!r}")
    timestamps = pd.date_range(
        start=date.today(), end=datetime.combine(datetime.now(), time.max), freq=freq
    )
    observations = [register_current_weather() for _ in range(len(timestamps))]
    return pd.DataFrame(data=observations, index=timestamps)


@flow
def weather_app(station_names: list[str]) -> None:
    print("Welcome to the world's fastest weather data collection application!")
    for station in station_names:
        station_weather = surface_station_daily_weather(station=station, freq="3H")
        upload_to_database(station_data=station_weather)
    print(
        "Daily observations have been updated for all operational stations. See you soon!"
    )


if __name__ == "__main__":
    STATIONS = [
        "bilbao_station",
        "oviedo_station",
        "salamanca_station",
        "badajoz_station",
    ]
    weather_app(station_names=STATIONS)
I was wondering what is the recommended way to run subflows in parallel (not concurrently). In this case, the subflow
surface_station_daily_weather
is executed sequentially (as far as I know there is no way to use the
submit
mechanism with a flow). Is it advisable to use the multiprocessing library for this purpose? Or is there any built-in functionality for it?
a
Yes, but I mean subflows, not tasks
a
Thanks for the example, but it doesn't work for me. I need to run in parallel the same subflow with different parameters. See this example:
Copy code
import asyncio
from prefect import flow


@flow
async def subflow(n: int):
    print(f"Subflow {n} started!")
    await asyncio.sleep(1)


@flow
async def main_flow():
    await asyncio.gather(*[subflow(n + 1) for n in range(4)])


if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())
If you run this code, you will see the following error:
RuntimeError: The task runner is already started!
k
I would suggest that you use task instead of subflow to take advantage of either DaskTaskRunner or RayTaskRunner to run your code in parallel
👍 1
m
Chiming in here... Is there no way to accomplish what @Alejandro wants with flows? I want to use subflows myself. I have a workflow to do repeatable data operations that I want to have as a subflow.
k
This is a known issue and was added to the backlog. Currently, this is the workaround:
Copy code
import asyncio
from prefect import flow, task


@task
async def my_task(n: int):
    print(f"Task {n} started!")
    await asyncio.sleep(1)


async def build_subflow(n):
    @flow(name=f"subflow:{n}")
    async def subflow(x):
        await my_task(x)

    return subflow


@flow
async def main_flow():
    await asyncio.gather(*[build_subflow(n + 1) for n in range(4)])


if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())
👍 1
m
ok thanks. what is the ETA for a resolution of the issue?
t
@Khuyen Tran the provided example doesn't work for me. The subflows won't get executed. Additionally I would like to know how to execute subflows based on a input from another sync task. something like:
Copy code
import asyncio
from prefect import flow, task, get_run_logger


@task
async def my_task(n: int):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Task {n} started!")
    await asyncio.sleep(1)


async def build_subflow(n):
    @flow(name=f"subflow:{n}")
    async def subflow(x):
        await my_task(x)

    return subflow


@task
def input_task():
    return [1, 2, 3, 4, 5, 6]


@flow
async def main_flow():
    inputs = input_task.submit()
    await asyncio.gather(*[build_subflow(n + 1) for n in inputs])


if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())
k
That’s weird. Which version of Prefect are you in? Can you show me the logs you got when execute the provided example?
t
@Khuyen Tran running prefect v2.6.7
Copy code
11:18:41.508 | DEBUG   | prefect.client - Using ephemeral application with database at <postgresql+asyncpg://xxxx@prefect_postgres:5432/orion_db>
11:18:41.698 | INFO    | prefect.engine - Created flow run 'crouching-sloth' for flow 'main-flow'
11:18:41.699 | DEBUG   | Flow run 'crouching-sloth' - Starting 'ConcurrentTaskRunner'; submitted tasks will be run concurrently...
11:18:41.700 | DEBUG   | prefect.task_runner.concurrent - Starting task runner...
11:18:41.703 | DEBUG   | prefect.client - Using ephemeral application with database at <postgresql+asyncpg://xxxx@prefect_postgres:5432/orion_db>
11:18:41.806 | DEBUG   | Flow run 'crouching-sloth' - Executing flow 'main-flow' for flow run 'crouching-sloth'...
11:18:41.807 | DEBUG   | Flow run 'crouching-sloth' - Beginning execution...
11:18:41.851 | DEBUG   | prefect.task_runner.concurrent - Shutting down task runner...
11:18:41.853 | INFO    | Flow run 'crouching-sloth' - Finished in state Completed()
11:18:41.854 | DEBUG   | prefect.client - Using ephemeral application with database at <postgresql+asyncpg://xxxx@prefect_postgres:5432/orion_db>
In this example from @Anna Geller
build_subflow(n)
isn't a async function. Could this be a typo in your example?
k
Hmm you are right. The example didn’t run the task because I forgot to run the subflow. Here is the modified version:
Copy code
import asyncio
from prefect import flow, task, get_run_logger


@task
async def my_task(n: int):
    get_run_logger().info(f"Task {n} started!")
    await asyncio.sleep(1)


async def build_subflow(n):
    @flow(name=f"subflow:{n}")
    async def subflow(x):
        await my_task(x)

    await subflow(n)


@flow
async def main_flow():
    await asyncio.gather(*[build_subflow(n + 1) for n in range(4)])


if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())
I also modified this in Discourse. We are fixing this issue so there’s no need for the workaround in the future
n
@Khuyen Tran When can we expect this fix to be released?
k
It is added to backlog with the medium priority, but I’m not sure when it will be released yet