mark doerr
02/21/2025, 1:18 PMJeff S
02/22/2025, 4:06 PMJeff S
02/22/2025, 4:07 PMJeff S
02/22/2025, 4:07 PMJeff S
02/22/2025, 9:40 PMprefect-github
package to clone the repo seems like it works. Not sure this is the best path but it works for me. Here's my entry.py
- FYI I used chatgpt to make the names more generic so it may not be perfect but you'll get the gist:
"""
This file serves as the entrypoint for a generic service agent.
It allows dynamic flows while minimizing worker rebuilds.
It will start by cloning/pulling a specified repository, then pass all parameters to the executed pipeline.
"""
import os
import sys
from prefect import flow, get_run_logger
from prefect.exceptions import PrefectException
from typing import List
from dotenv import load_dotenv
import importlib.util
from prefect_github.repository import GitHubRepository
def import_module_from_path(module_name: str, module_path: str):
"""Dynamically import a module from a file path.
Args:
module_name: Name to give the module
module_path: Full path to the module file
Returns:
The imported module
"""
spec = importlib.util.spec_from_file_location(module_name, module_path)
if not spec or not spec.loader:
raise ImportError(
f"Could not load spec for module {module_name} from {module_path}"
)
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
return module
@flow(
name="generic-service-flow",
description="Entry point flow that clones/pulls repo and runs a generic pipeline",
retries=2,
retry_delay_seconds=60,
log_prints=True,
)
def entry_flow(
entities: List[str],
resource_id: str,
repo_path: str = "/tmp/generic-functions",
) -> List[dict]:
"""Entry point flow that ensures latest code and runs a generic pipeline.
Args:
entities: List of entities to process
resource_id: Identifier for the external resource
repo_path: Path where the repo should be downloaded
Returns:
List[dict]: Results from all pipeline executions
Raises:
PrefectException: If repo operations fail or pipeline execution fails
"""
logger = get_run_logger()
# Load environment variables
load_dotenv()
try:
# Load the GitHub repository block
github_repository_block = GitHubRepository.load("generic-functions")
# Use the block to get the repository content
<http://logger.info|logger.info>(f"Downloading repository to {repo_path}")
github_repository_block.get_directory(local_path=repo_path)
<http://logger.info|logger.info>("Repository content updated successfully")
# Add repo to Python path
if repo_path not in sys.path:
sys.path.insert(0, repo_path)
# Dynamically import the service module
generic_module_path = os.path.join(repo_path, "prefect_flows", "generic_service.py")
generic_service = import_module_from_path("generic_service", generic_module_path)
# Run the pipeline
<http://logger.info|logger.info>(f"Starting pipeline for resource {resource_id}")
results = generic_service.run_table_pipeline(
resource_id=resource_id, entities=entities
)
return results
except Exception as e:
error_msg = f"Error in entry flow: {str(e)}"
logger.error(error_msg)
raise PrefectException(error_msg) from e
if __name__ == "__main__":
# Example usage
sample_entities = ["items", "item_metrics", "collections"]
sample_resource_id = "0987654321"
entry_flow(entities=sample_entities, resource_id=sample_resource_id)
Jeff S
02/22/2025, 9:43 PM# Deploy the flow using local file source
await entry_flow.from_source(
source=str(Path(__file__).parent),
entrypoint="prefect_flows/entry.py:entry_flow",
).deploy(
name=deployment_name,
work_pool_name=request.work_pool_name,
parameters={
"resource_id": request.account_id,
"entities": entity_names,
},
**schedule_args,
)
I run my worker via docker. The only time I'll need to rebuild the worker container is if I have to change entry.py, otherwise it appears to properly pull the repo on each flow run.Jeff S
02/22/2025, 9:44 PMJeff S
02/22/2025, 9:51 PM