Sam Joel
10/04/2023, 5:34 PMclass PrefectExecutor:
## appropriate init method
def setup(self):
"""
Create deployment flows for all the loaders
"""
for loader in self.loaders:
print(loader.config.id)
self._build_and_deploy_loader(loader)
def _build_and_deploy_loader(self, loader):
try:
from prefect import flow
from prefect.deployments import Deployment
from prefect.filesystems import LocalFileSystem
except ImportError:
print(
"""
Prefect is not found. Install prefect with "pip install prefect==2.10.21"
"""
)
@flow
def process_job(parameters: dict):
print(parameters)
stack = Stack(etl=loader, destination_db = self.destination_db)
stack.etl.run(**parameters)
deployment = Deployment.build_from_flow(
process_job, name=str(loader.config.id), storage=LocalFileSystem.load("local")
)
deployment.apply()
Nate
10/04/2023, 6:18 PM.serve()
is probably more or less almost exactly what you're doing, except you wouldn't need an agent, the served process would effectively do the agents job and then you could just trigger the deploymentSam Joel
10/05/2023, 7:50 AMNate
10/05/2023, 2:32 PMBring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by