https://prefect.io logo
Title
d

datamongus

12/05/2022, 12:16 AM
Greetings, I’m aware that the Dask functionality in Prefect allows for running parallel flows/tasks across different agents. I’m curious if the K8s operator does the same? Ultimately what I am attempting to do is offload a resource intensive task to different agents. The this particular task was previously done using native pipeline and a for loop. I’ve refactored this logic (in python) to use a thread pool and removed the for loop. While this had great performance improvements the timing of the logic is still not efficient enough. I am now considering moving this logic to a prefect flow in the hopes of improving this parallelization even further across different agents which are setup as kubernetes pods.
1
m

Mason Menges

12/05/2022, 7:23 PM
For clarification in this context Flows are submitted by an agent to an execution environment from a workqueue depending on how your deployment is setup the agent could be spinning up multiple pods within the kubernetes cluster for each flow running them concurrently with tasks being a unit of work within a flow. You could setup multiple deployments for different flows and then call them with the run_deployment command from a parent flow, you can specify separate infrastructure from the run_deployment command for each subflow. This was a simple example I wrote up around using the run_deployment command concurrently and then waiting for the completion of each flow before continuing with the execution of the parent flow.
from prefect import flow, get_run_logger, get_client
from prefect.deployments import run_deployment
import asyncio
import time


@flow
async def test_flow():
    logger = get_run_logger()

    list_inputs = ["grass", "fire"]

    results = []
    for type in list_inputs:
        deployed_run = await run_deployment(
            name="create-pk-team/pk-flow-dev", 
            parameters={"pk_type": type}, 
            flow_run_name=f"{type} create-pk-team/pk-flow-dev", 
            timeout=0)

        results.append(deployed_run.id)


    api_client = get_client()

    while len(results) > 0:
        <http://logger.info|logger.info>("in while loop")
        for id in results:
            <http://logger.info|logger.info>(f"checking flow run id: {id}")
            deployed_run = await api_client.read_flow_run(id)
            run_state = deployed_run.state
            <http://logger.info|logger.info>(f"Run {id} Completed {run_state}")
            if run_state.is_final() == True:
                <http://logger.info|logger.info>("removed result")
                results.remove(id)
                <http://logger.info|logger.info>(results)
        time.sleep(5)

    <http://logger.info|logger.info>("I waited")

if __name__ == "__main__":
    asyncio.run(test_flow())
TLDR: This is likely doable but the exact process around how you would manage varies depending on your use case, though the run_deployment command would be a useful tool for this kind of use case as it allows you to run deployments from a parent flow with different arguments as needed.
👍 1