Hi, I would like to have an input from you guys. I...
# prefect-community
m
Hi, I would like to have an input from you guys. I am looing at prefect2.0. I was expected that in a deployment I could specify an environment (docker image) per subflow as my subflows can have a lot of dependencies that are not always compatible between each other. How I envision things is to have 1 image per subflow. As the main flow expands, it seems that allowing such deployments would make everything more compartmented and maintainable. Have you encountered such request before? Is there a way to do that in prefect 2.0?
1
a
you can totally have one deployment per subflow and it's a matter of setting a different entrypoint on your build CLI:
Copy code
prefect deployment build flow.py:subflow1 -n deploy1 ...
prefect deployment build flow.py:subflow2 -n deploy2 ...
prefect deployment build flow.py:subflow3 -n deploy3 ...
m
Thanks @Anna Geller for the answer. I may be missing something, but how do we "call" deployments (subflows) from another deployment? In other words, how is the
main flow
orchestrated? Here is a little diagram to make everything clearer.
m
thanks @Anna Geller for the resource. I managed to get to something with this:
Copy code
import asyncio
import json
import os
from uuid import UUID

import pytest
from prefect import flow, task, get_run_logger
from prefect.client import get_client, OrionClient
from prefect.filesystems import Azure
from prefect.infrastructure.docker import DockerContainer
from prefect.orion.schemas.core import BlockDocument
from prefect.orion.schemas.states import StateType
from prefect.packaging.docker import DockerPackager
from prefect.software.python import PythonEnvironment, PipRequirement


@pytest.fixture
def prefect_client():
    client = get_client()
    yield client


@pytest.fixture
def azure_blob(prefect_client):
    name = "azureblobblock"
    azure_block_type = list(
        filter(
            lambda b: b.block_type.slug == "azure",
            asyncio.run(prefect_client.read_block_schemas()),
        )
    )[0]
    azure_block = BlockDocument(
        name=name,
        data={
            "bucket_path": "prefect",
            "azure_storage_account_name": os.environ["ACCOUNT_NAME"],
            "azure_storage_account_key": os.environ["ACCOUNT_KEY"],
        },
        block_schema_id=azure_block_type.id,
        block_type_id=azure_block_type.block_type_id,
    )
    azure_block = asyncio.run(prefect_client.create_block_document(azure_block))

    yield Azure.load(name)

    asyncio.run(prefect_client.delete_block_document(azure_block.id))


@flow
def my_subflow1():
    my_task(1)
    return "hola22"


@flow
def my_subflow2():
    my_task(2)
    return "hola"


@task
def my_task(x):
    return x + 1


def test_azure_blob(azure_blob):
    content = b"ok"
    asyncio.run(azure_blob.write_path("yo.txt", content))
    assert asyncio.run(azure_blob.read_path("yo.txt")) == content


@flow
async def orchestrator_flow():
    logger = get_run_logger()
    azure_block = await Azure.load("azureblobblock")

    async with get_client() as client:
        my_deployment_id = (
            await client.read_flow_run(UUID(os.environ["PREFECT__FLOW_RUN_ID"]))
        ).deployment_id
        deployment_ids = json.loads(
            (await azure_block.read_path(f"{my_deployment_id}.json")).decode()
        )

        subflow_1 = await orchestration_flow_wrapper(
            client, deployment_ids["my-subflow1"], {}, {}
        )
        <http://logger.info|logger.info>(subflow_1.data.decode().json())


async def orchestration_flow_wrapper(
    client: OrionClient, deployment_id, parameters: dict, context: dict
):
    async def _wrapper():
        return await wait_for_flow_run_completion(client, flow_run.id)

    flow_run = await client.create_flow_run_from_deployment(
        deployment_id, parameters=parameters, context=context
    )
    my_flow = await client.read_flow(flow_run.flow_id)
    return await flow(_wrapper, name=my_flow.name + '-wrapper')()


def test_flow_of_flow_orchestrator_pattern(azure_blob):
    # <https://discourse.prefect.io/t/how-to-create-a-flow-run-from-deployment-orchestrator-pattern/803>
    asyncio.run(async_test_flow_of_flow_orchestrator_pattern(azure_blob))


async def async_test_flow_of_flow_orchestrator_pattern(azure_blob):
    async with get_client() as client:
        try:
            deployment_subflow1_ids = await async_build_deployment(
                azure_blob, client, my_subflow1
            )
            deployment_subflow2_ids = await async_build_deployment(
                azure_blob, client, my_subflow2
            )
            deployment_orchestrator_ids = await async_build_deployment(
                azure_blob, client, orchestrator_flow
            )
            deployment_ids = {
                "my-subflow1": str(deployment_subflow1_ids["deployment"]),
                "my-subflow2": str(deployment_subflow2_ids["deployment"]),
            }
            await azure_blob.write_path(
                f"{deployment_orchestrator_ids['deployment']}.json",
                json.dumps(deployment_ids).encode(),
            )

            flow_run = await client.create_flow_run_from_deployment(
                deployment_orchestrator_ids["deployment"]
            )

            final_state = await wait_for_flow_run_completion(client, flow_run.id, 30)
            if final_state.type == StateType.FAILED:
                raise RuntimeError(final_state)

        finally:
            await client.delete_deployment(deployment_subflow1_ids["deployment"])
            await client.delete_block_document(deployment_subflow1_ids["infra"])
            await client.delete_deployment(deployment_subflow2_ids["deployment"])
            await client.delete_block_document(deployment_subflow2_ids["infra"])
            await client.delete_deployment(deployment_orchestrator_ids["deployment"])
            await client.delete_block_document(deployment_orchestrator_ids["infra"])


async def wait_for_flow_run_completion(
    client: OrionClient, flow_run_id: UUID, max_wait: int | None = None
):
    wait = 0
    while (await client.read_flow_run_states(flow_run_id))[-1].type not in {
        StateType.COMPLETED,
        StateType.FAILED,
    } and (max_wait is None or wait <= max_wait):
        wait += 1
        await asyncio.sleep(1)

    return (await client.read_flow_run_states(flow_run_id))[-1]


async def async_build_deployment(azure_blob, client, flow_func) -> dict[str, UUID]:
    python_env = PythonEnvironment(
        pip_requirements=[PipRequirement("adlfs"), PipRequirement("pytest")]
    )
    docker_image = await DockerPackager(
        python_environment=python_env, image_flow_location="/test/flow.py"
    ).package(flow_func)
    flow_id = await client.create_flow(flow_func)

    infra_id = await DockerContainer(image=docker_image.image).save(
        docker_image.flow_name
    )
    deployment_id = await (
        client.create_deployment(
            flow_id,
            name=flow_func.name,
            version="1",
            tags=["flow-of-flow"],
            infrastructure_document_id=infra_id,
            entrypoint=f"flow.py:{flow_func.name.replace('-', '_')}",
            path="/test",
            storage_document_id=azure_blob._block_document_id,
            work_queue_name="default",
        )
    )
    return {"deployment": deployment_id, "infra": infra_id, "flow": flow_id}
However, it still have an issue of getting my subflows
return
values. Intuitively, I was expecting to find
hola22
in the completed state
data
property of
my_subflow1
. However, I have
_Result(key='b1fd11041ae54cfcbb9b41ec938ff0cf', filesystem_document_id=UUID('bd1ed851-82d2-4f87-b11c-be04a80634af'))
. I have no idea to what it refers to. My question is: How can I retrieve
hola22
from my subflow run at the last line of
orchestrator_flow
method?
a
This looks way too complicated, what end goal do you have in mind? There are definitely easier ways to solve this
m
I want 1 environment/infra (set of dependencies/docker image) per subflow. The concatenation of the dependencies of all my subflows are incompatible. For example, subflow1 may requires
lib1==1.0.0
and subflow2 may requires
lib1==2.0.0
. This segregates the different subflows makes it easier to mitigate the
dependency hell
problem.
In terms of maintenance, I then don't need to update/retest subflow1 when only subflow2 changes.