Hey <@ULVA73B9P> is it possible to run subflows o...
# marvin-in-the-wild
n
Hey @Marvin is it possible to run subflows on multiple k8s jobs?
m
Let's search the Prefect documentation and GitHub issues to see how we can run subflows on multiple Kubernetes jobs. I'll start by querying the relevant sources. To run subflows on multiple Kubernetes jobs with Prefect, you need to leverage the
run_namespaced_job
function from the
prefect_kubernetes
module. Here's a concise overview of what you'll need: 1. Set Up Kubernetes Jobs: Define your Kubernetes jobs using
KubernetesJob
and provide the necessary credentials. 2. Run Namespaced Jobs: Utilize
run_namespaced_job
to execute these jobs within your flow or subflow. ### Example
Copy code
python
from prefect_kubernetes import KubernetesJob, run_namespaced_job
from prefect_kubernetes.credentials import KubernetesCredentials

# Example function to run a Kubernetes job
def run_k8s_job():
    k8s_creds = KubernetesCredentials.load("k8s-creds")
    k8s_job = KubernetesJob.from_yaml_file(
        credentials=k8s_creds,
        manifest_path="path/to/job.yaml",
    )
    return run_namespaced_job(kubernetes_job=k8s_job)

# Example subflow
@flow
def subflow():
    result = run_k8s_job()
    return result

# Example main flow
@flow
def main_flow():
    subflow_execution = subflow()
    return subflow_execution

if __name__ == "__main__":
    main_flow()
### Resources - Prefect Documentation on Kubernetes Flows: Prefect Kubernetes Flows Integration - GitHub Issues Discussing Subflows: - Flows with subflows incorrectly reporting state as Failed - Subflows cause parent to be incorrectly marked as failed - Long running kubernetes jobs are marked as crashed Reviewing these links will provide more context on integrating and troubleshooting subflows with Kubernetes jobs in Prefect.
n
@Marvin so a k8s worker will pull a flow from a work-pool and create a k8s job which in turn will create k8s jobs?
m
Yes, that's essentially the idea. Here’s a step-by-step outline: 1. Kubernetes Worker in Work Pool: A Kubernetes worker from a work pool will pull the main flow. 2. Executing Main Flow: When the main flow runs, it will use the
run_namespaced_job
function to create and execute Kubernetes jobs. 3. Subflows Creating Jobs: These Kubernetes jobs can, in turn, run subflows, and each subflow can create additional Kubernetes jobs. Important Points: - Ensure that the Kubernetes job manifests and credentials are correctly set up. - Given Prefect’s ability to handle subflows, the infrastructure should be appropriately managed and monitored to avoid potential issues like incorrectly marked states or job evictions. ### Example Workflow Here’s a more detailed example to demonstrate the interactions: main_flow.py
Copy code
python
from prefect import flow
from prefect_kubernetes import KubernetesJob, run_namespaced_job
from prefect_kubernetes.credentials import KubernetesCredentials

# Function to run a Kubernetes job
def run_k8s_job(job_manifest_path):
    k8s_creds = KubernetesCredentials.load("k8s-creds")
    k8s_job = KubernetesJob.from_yaml_file(
        credentials=k8s_creds,
        manifest_path=job_manifest_path,
    )
    return run_namespaced_job(kubernetes_job=k8s_job)

# Subflow that creates a Kubernetes job
@flow
def subflow(job_manifest_path):
    result = run_k8s_job(job_manifest_path)
    return result

# Main flow that triggers the subflow
@flow
def main_flow():
    job_manifest_path = "path/to/job.yaml"
    subflow_execution = subflow(job_manifest_path)
    return subflow_execution

if __name__ == "__main__":
    main_flow()
### Execution Flow 1. Kubernetes Worker: The worker pulls
main_flow
from your work pool and executes it. 2. Main Flow Execution: - Calls
subflow
and provides the job manifest path. 3. Subflow Execution: - Calls
run_namespaced_job
, creating and running another Kubernetes job based on the provided manifest. This way, you can chain Kubernetes jobs using subflows orchestrated by Prefect. ### Resources Links - Prefect Documentation on Kubernetes Flows: Prefect Kubernetes Flows - GitHub Issues on Subflows: - Flows with subflows incorrectly reporting state as Failed - Subflows cause parent to be incorrectly marked as failed - Long running kubernetes jobs are marked as crashed These references should provide more context and specific configurations.
n
Is it possible to use the same k8s manifest of the main flow for the subflows just with different params? @Marvin
m
Yes, it's possible to use the same Kubernetes manifest for the main flow and its subflows, just with different parameters. You can dynamically modify the manifest to change parameters as needed for each subflow execution. ### Steps to Achieve This 1. Load and Modify the Manifest: Load the Kubernetes job manifest and inject the required parameters. 2. Create and Run the Job: Use the modified manifest to create and run the Kubernetes job via Prefect. ### Example Workflow Here's how you can do it: main_flow.py
Copy code
python
import yaml
from prefect import flow, task
from prefect_kubernetes import KubernetesJob, run_namespaced_job
from prefect_kubernetes.credentials import KubernetesCredentials

# Task to modify the Kubernetes job manifest
@task
def modify_manifest(manifest_path, params):
    with open(manifest_path, 'r') as file:
        manifest = yaml.safe_load(file)
  
    # Modify the manifest with new params
    for key, value in params.items():
        # Assuming params is a dictionary and manifest structure is known
        manifest['spec']['template']['spec']['containers'][0]['command'] = value

    return manifest

# Task to run a Kubernetes job
@task
def run_k8s_job(modified_manifest):
    k8s_creds = KubernetesCredentials.load("k8s-creds")
    k8s_job = KubernetesJob(manifest=modified_manifest, credentials=k8s_creds)
    return run_namespaced_job(kubernetes_job=k8s_job)

# Subflow that modifies and runs a Kubernetes job
@flow
def subflow(manifest_path, params):
    modified_manifest = modify_manifest(manifest_path, params)
    result = run_k8s_job(modified_manifest)
    return result

# Main flow that triggers subflows
@flow
def main_flow():
    job_manifest_path = "path/to/job.yaml"
    params_list = [
        {'command': ["python", "script1.py"]},
        {'command': ["python", "script2.py"]},
    ]
  
    for params in params_list:
        subflow(job_manifest_path, params)

if __name__ == "__main__":
    main_flow()
### Explanation 1. modify_manifest Task: This task loads the Kubernetes job manifest, modifies it by injecting the provided parameters, and returns the modified manifest. 2. run_k8s_job Task: This task accepts the modified manifest and runs a namespaced Kubernetes job using Prefect. 3. subflow Function: Calls the
modify_manifest
and
run_k8s_job
tasks to execute the job with modified parameters. 4. main_flow Function: Iterates over a list of parameters, calling the
subflow
for each set of parameters. ### Resources - Prefect Kubernetes Flows Integration: Prefect Kubernetes Flows This should cover dynamically modifying and reusing a Kubernetes job manifest for running subflows with different parameters.