Hello Prefectionists, I have a django application ...
# ask-community
m
Hello Prefectionists, I have a django application and I want to deploy prefect flows programmatially from within django (e.g. when a user creates a new model instance of a flow). The flows are stored in a public git repository. How would you achieve this ?
j
I've been attempting to accomplish the same thing and have been struggling to get it to fetch the repo
I've tried using worker commands to clone the repo on each flow run which works, but then it loses the original command it was meant to execute.
There must be a way to build on that
@mark doerr figured it out finally. Using the
prefect-github
package to clone the repo seems like it works. Not sure this is the best path but it works for me. Here's my
entry.py
- FYI I used chatgpt to make the names more generic so it may not be perfect but you'll get the gist:
Copy code
"""
This file serves as the entrypoint for a generic service agent.
It allows dynamic flows while minimizing worker rebuilds.

It will start by cloning/pulling a specified repository, then pass all parameters to the executed pipeline.
"""

import os
import sys
from prefect import flow, get_run_logger
from prefect.exceptions import PrefectException
from typing import List
from dotenv import load_dotenv
import importlib.util
from prefect_github.repository import GitHubRepository


def import_module_from_path(module_name: str, module_path: str):
    """Dynamically import a module from a file path.

    Args:
        module_name: Name to give the module
        module_path: Full path to the module file

    Returns:
        The imported module
    """
    spec = importlib.util.spec_from_file_location(module_name, module_path)
    if not spec or not spec.loader:
        raise ImportError(
            f"Could not load spec for module {module_name} from {module_path}"
        )

    module = importlib.util.module_from_spec(spec)
    sys.modules[module_name] = module
    spec.loader.exec_module(module)
    return module


@flow(
    name="generic-service-flow",
    description="Entry point flow that clones/pulls repo and runs a generic pipeline",
    retries=2,
    retry_delay_seconds=60,
    log_prints=True,
)
def entry_flow(
    entities: List[str],
    resource_id: str,
    repo_path: str = "/tmp/generic-functions",
) -> List[dict]:
    """Entry point flow that ensures latest code and runs a generic pipeline.

    Args:
        entities: List of entities to process
        resource_id: Identifier for the external resource
        repo_path: Path where the repo should be downloaded

    Returns:
        List[dict]: Results from all pipeline executions

    Raises:
        PrefectException: If repo operations fail or pipeline execution fails
    """
    logger = get_run_logger()

    # Load environment variables
    load_dotenv()

    try:
        # Load the GitHub repository block
        github_repository_block = GitHubRepository.load("generic-functions")

        # Use the block to get the repository content
        <http://logger.info|logger.info>(f"Downloading repository to {repo_path}")
        github_repository_block.get_directory(local_path=repo_path)
        <http://logger.info|logger.info>("Repository content updated successfully")

        # Add repo to Python path
        if repo_path not in sys.path:
            sys.path.insert(0, repo_path)

        # Dynamically import the service module
        generic_module_path = os.path.join(repo_path, "prefect_flows", "generic_service.py")
        generic_service = import_module_from_path("generic_service", generic_module_path)

        # Run the pipeline
        <http://logger.info|logger.info>(f"Starting pipeline for resource {resource_id}")
        results = generic_service.run_table_pipeline(
            resource_id=resource_id, entities=entities
        )

        return results

    except Exception as e:
        error_msg = f"Error in entry flow: {str(e)}"
        logger.error(error_msg)
        raise PrefectException(error_msg) from e


if __name__ == "__main__":
    # Example usage
    sample_entities = ["items", "item_metrics", "collections"]
    sample_resource_id = "0987654321"
    entry_flow(entities=sample_entities, resource_id=sample_resource_id)
Then this function builds my deployments:
Copy code
# Deploy the flow using local file source
        await entry_flow.from_source(
            source=str(Path(__file__).parent),
            entrypoint="prefect_flows/entry.py:entry_flow",
        ).deploy(
            name=deployment_name,
            work_pool_name=request.work_pool_name,
            parameters={
                "resource_id": request.account_id,
                "entities": entity_names,
            },
            **schedule_args,
        )
I run my worker via docker. The only time I'll need to rebuild the worker container is if I have to change entry.py, otherwise it appears to properly pull the repo on each flow run.
I've tested this minimally but it works thus far.
Maybe someone else has a better solution. It feels kind of hacky.