Sam Joel
10/04/2023, 5:34 PMclass PrefectExecutor:
## appropriate init method
def setup(self):
"""
Create deployment flows for all the loaders
"""
for loader in self.loaders:
print(loader.config.id)
self._build_and_deploy_loader(loader)
def _build_and_deploy_loader(self, loader):
try:
from prefect import flow
from prefect.deployments import Deployment
from prefect.filesystems import LocalFileSystem
except ImportError:
print(
"""
Prefect is not found. Install prefect with "pip install prefect==2.10.21"
"""
)
@flow
def process_job(parameters: dict):
print(parameters)
stack = Stack(etl=loader, destination_db = self.destination_db)
stack.etl.run(**parameters)
deployment = Deployment.build_from_flow(
process_job, name=str(loader.config.id), storage=LocalFileSystem.load("local")
)
deployment.apply()
Nate
10/04/2023, 6:18 PM.serve()
is probably more or less almost exactly what you're doing, except you wouldn't need an agent, the served process would effectively do the agents job and then you could just trigger the deploymentSam Joel
10/05/2023, 7:50 AMNate
10/05/2023, 2:32 PM