Marc-Antoine Bélanger
08/25/2022, 7:36 PMAnna Geller
08/26/2022, 9:13 PMprefect deployment build flow.py:subflow1 -n deploy1 ...
prefect deployment build flow.py:subflow2 -n deploy2 ...
prefect deployment build flow.py:subflow3 -n deploy3 ...
Marc-Antoine Bélanger
08/29/2022, 3:49 PMmain flow
orchestrated? Here is a little diagram to make everything clearer.Anna Geller
08/29/2022, 11:37 PMMarc-Antoine Bélanger
08/30/2022, 10:03 PMimport asyncio
import json
import os
from uuid import UUID
import pytest
from prefect import flow, task, get_run_logger
from prefect.client import get_client, OrionClient
from prefect.filesystems import Azure
from prefect.infrastructure.docker import DockerContainer
from prefect.orion.schemas.core import BlockDocument
from prefect.orion.schemas.states import StateType
from prefect.packaging.docker import DockerPackager
from prefect.software.python import PythonEnvironment, PipRequirement
@pytest.fixture
def prefect_client():
client = get_client()
yield client
@pytest.fixture
def azure_blob(prefect_client):
name = "azureblobblock"
azure_block_type = list(
filter(
lambda b: b.block_type.slug == "azure",
asyncio.run(prefect_client.read_block_schemas()),
)
)[0]
azure_block = BlockDocument(
name=name,
data={
"bucket_path": "prefect",
"azure_storage_account_name": os.environ["ACCOUNT_NAME"],
"azure_storage_account_key": os.environ["ACCOUNT_KEY"],
},
block_schema_id=azure_block_type.id,
block_type_id=azure_block_type.block_type_id,
)
azure_block = asyncio.run(prefect_client.create_block_document(azure_block))
yield Azure.load(name)
asyncio.run(prefect_client.delete_block_document(azure_block.id))
@flow
def my_subflow1():
my_task(1)
return "hola22"
@flow
def my_subflow2():
my_task(2)
return "hola"
@task
def my_task(x):
return x + 1
def test_azure_blob(azure_blob):
content = b"ok"
asyncio.run(azure_blob.write_path("yo.txt", content))
assert asyncio.run(azure_blob.read_path("yo.txt")) == content
@flow
async def orchestrator_flow():
logger = get_run_logger()
azure_block = await Azure.load("azureblobblock")
async with get_client() as client:
my_deployment_id = (
await client.read_flow_run(UUID(os.environ["PREFECT__FLOW_RUN_ID"]))
).deployment_id
deployment_ids = json.loads(
(await azure_block.read_path(f"{my_deployment_id}.json")).decode()
)
subflow_1 = await orchestration_flow_wrapper(
client, deployment_ids["my-subflow1"], {}, {}
)
<http://logger.info|logger.info>(subflow_1.data.decode().json())
async def orchestration_flow_wrapper(
client: OrionClient, deployment_id, parameters: dict, context: dict
):
async def _wrapper():
return await wait_for_flow_run_completion(client, flow_run.id)
flow_run = await client.create_flow_run_from_deployment(
deployment_id, parameters=parameters, context=context
)
my_flow = await client.read_flow(flow_run.flow_id)
return await flow(_wrapper, name=my_flow.name + '-wrapper')()
def test_flow_of_flow_orchestrator_pattern(azure_blob):
# <https://discourse.prefect.io/t/how-to-create-a-flow-run-from-deployment-orchestrator-pattern/803>
asyncio.run(async_test_flow_of_flow_orchestrator_pattern(azure_blob))
async def async_test_flow_of_flow_orchestrator_pattern(azure_blob):
async with get_client() as client:
try:
deployment_subflow1_ids = await async_build_deployment(
azure_blob, client, my_subflow1
)
deployment_subflow2_ids = await async_build_deployment(
azure_blob, client, my_subflow2
)
deployment_orchestrator_ids = await async_build_deployment(
azure_blob, client, orchestrator_flow
)
deployment_ids = {
"my-subflow1": str(deployment_subflow1_ids["deployment"]),
"my-subflow2": str(deployment_subflow2_ids["deployment"]),
}
await azure_blob.write_path(
f"{deployment_orchestrator_ids['deployment']}.json",
json.dumps(deployment_ids).encode(),
)
flow_run = await client.create_flow_run_from_deployment(
deployment_orchestrator_ids["deployment"]
)
final_state = await wait_for_flow_run_completion(client, flow_run.id, 30)
if final_state.type == StateType.FAILED:
raise RuntimeError(final_state)
finally:
await client.delete_deployment(deployment_subflow1_ids["deployment"])
await client.delete_block_document(deployment_subflow1_ids["infra"])
await client.delete_deployment(deployment_subflow2_ids["deployment"])
await client.delete_block_document(deployment_subflow2_ids["infra"])
await client.delete_deployment(deployment_orchestrator_ids["deployment"])
await client.delete_block_document(deployment_orchestrator_ids["infra"])
async def wait_for_flow_run_completion(
client: OrionClient, flow_run_id: UUID, max_wait: int | None = None
):
wait = 0
while (await client.read_flow_run_states(flow_run_id))[-1].type not in {
StateType.COMPLETED,
StateType.FAILED,
} and (max_wait is None or wait <= max_wait):
wait += 1
await asyncio.sleep(1)
return (await client.read_flow_run_states(flow_run_id))[-1]
async def async_build_deployment(azure_blob, client, flow_func) -> dict[str, UUID]:
python_env = PythonEnvironment(
pip_requirements=[PipRequirement("adlfs"), PipRequirement("pytest")]
)
docker_image = await DockerPackager(
python_environment=python_env, image_flow_location="/test/flow.py"
).package(flow_func)
flow_id = await client.create_flow(flow_func)
infra_id = await DockerContainer(image=docker_image.image).save(
docker_image.flow_name
)
deployment_id = await (
client.create_deployment(
flow_id,
name=flow_func.name,
version="1",
tags=["flow-of-flow"],
infrastructure_document_id=infra_id,
entrypoint=f"flow.py:{flow_func.name.replace('-', '_')}",
path="/test",
storage_document_id=azure_blob._block_document_id,
work_queue_name="default",
)
)
return {"deployment": deployment_id, "infra": infra_id, "flow": flow_id}
return
values. Intuitively, I was expecting to find hola22
in the completed state data
property of my_subflow1
. However, I have _Result(key='b1fd11041ae54cfcbb9b41ec938ff0cf', filesystem_document_id=UUID('bd1ed851-82d2-4f87-b11c-be04a80634af'))
. I have no idea to what it refers to.
My question is:
How can I retrieve hola22
from my subflow run at the last line of orchestrator_flow
method?Anna Geller
08/31/2022, 5:03 AMMarc-Antoine Bélanger
08/31/2022, 3:12 PMlib1==1.0.0
and subflow2 may requires lib1==2.0.0
. This segregates the different subflows makes it easier to mitigate the dependency hell
problem.