Nesting multiple asynchronous flows in Prefect can be managed using the concept of subflows or neste...
m

Marvin

10 months ago
Nesting multiple asynchronous flows in Prefect can be managed using the concept of subflows or nested flows. Here's how it works, based on both Prefect 2.x and 3.x: ### Prefect 2.x In Prefect 2.x, you can create subflows by calling a flow function inside another flow. The parent flow is the main flow, while the child or "subflow" is executed within it. Subflows behave like regular flow runs and have their own task runners. Importantly, subflows block the parent flow's execution until completion, but with asynchronous subflows, you can run them concurrently using
asyncio.gather
. For more on this, you can check the Prefect 2.x documentation on nested flows. ### Prefect 3.x In Prefect 3.x, nested flows offer several benefits, such as using different task runners and enabling conditional execution of grouped tasks. They provide first-class observability, meaning you can track the status of nested flows easily within the Prefect UI. Nested flows also support asynchronous execution, allowing them to be awaited or run concurrently, adhering to async Python standards. For a deeper dive into creating asynchronous nested flows in Prefect 3.x, review the Prefect 3.x documentation on writing flows. In essence, whether you're using Prefect 2.x or 3.x, the key to nesting asynchronous flows lies in leveraging subflows with async capabilities and managing their execution so they can handle concurrent workloads effectively. Ah, the joys of managing asynchronous tasks—always making you feel like a juggler who needs just one more hand!
Hi, I have a pretty simple issue that I can't seem to find any documentation about. I set up an `ori...
p

pk13055

almost 3 years ago
Hi, I have a pretty simple issue that I can't seem to find any documentation about. I set up an
orion
+
agent
container setup locally, and everything is working as expected when I try to run the flows manually. I was able to successfully create and apply a deployment as well (from within the
cli
container). However, when trying to run the deployment I am faced with a
File not found error
. Here's the traceback:
Flow could not be retrieved from deployment.
Traceback (most recent call last):
  File "<frozen importlib._bootstrap_external>", line 879, in exec_module
  File "<frozen importlib._bootstrap_external>", line 1016, in get_code
  File "<frozen importlib._bootstrap_external>", line 1073, in get_data
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/tmp_zx6qpglprefect/flows/fetch_etfs.py'
My
docker-compose
is as follows:
version: "3.9"
services:
  db:
    image: timescale/timescaledb:latest-pg14
    volumes:
      - $PWD/data/db:/var/lib/postgresql/data
      - $PWD/config/db:/docker-entrypoint-initdb.d/
    healthcheck:
      test: [ "CMD-SHELL", "pg_isready" ]
      interval: 10s
      timeout: 5s
      retries: 5
    environment:
      - POSTGRES_DB=$POSTGRES_DB
      - POSTGRES_USER=$POSTGRES_USER
      - POSTGRES_PASSWORD=$POSTGRES_PASSWORD
    networks:
      - db_network
  orion:
    image: prefecthq/prefect:2.6.3-python3.10
    restart: always
    volumes:
      - $PWD/prefect:/root/.prefect
    entrypoint: [ "prefect", "orion", "start" ]
    environment:
      - PREFECT_ORION_API_HOST=0.0.0.0
      - PREFECT_ORION_DATABASE_CONNECTION_URL=$PREFECT_DB_URL
    ports:
      - 4200:4200
    depends_on:
      - db
    networks:
      - prefect_network
      - db_network

  agent:
    build: ./prefect
    restart: always
    entrypoint: [ "prefect", "agent", "start", "-q", "main_queue" ]
    environment:
      - PREFECT_API_URL=<http://orion:4200/api>
    networks:
      - prefect_network
      - db_network

  cli:
    build: ./prefect
    entrypoint: "bash"
    working_dir: "/root/flows"
    volumes:
      - "$PWD/prefect:/root"
    environment:
      - PREFECT_API_URL=<http://orion:4200/api>
    networks:
      - prefect_network
      - db_network

networks:
  db_network:
  web_network:
  prefect_network:
To create and apply the deployment(s):
> docker compose run cli
$ prefect deployment create ./flows/fetch_etf.py:flow_name --name flow_name
$ prefect deployment apply deployment.yaml
âś… 1