<@ULVA73B9P> is there a way to deploy --all direct...
# ask-marvin
m
@Marvin is there a way to deploy --all directly from code?
m
There isn't a direct way to deploy all flows from code. However, you can create a deployment using the Prefect CLI. Here are the steps: 1. Build the deployment definition file
deployment.yaml
. This step includes uploading your flow to its configured remote storage location, if one is specified. 2. Create the deployment on the API. To build the deployment definition file
deployment.yaml
, run the
prefect deployment build
Prefect CLI command from the folder containing your flow script and any dependencies of the script. For example:
Copy code
bash
$ prefect deployment build -n marvin -p default-agent-pool -q test flows/marvin.py:say_hi
When you run this command, Prefect: - Creates a
marvin_flow-deployment.yaml
file for your deployment based on your flow code and options. - Uploads your flow files to the configured storage location (local by default). - Submit your deployment to the work queue
test
. The work queue
test
will be created if it doesn't exist. Please note that the appropriate filesystem library supporting the storage location must be installed prior to building a deployment with a storage block. For more details, you can check the Prefect documentation here.
n
🙅
you could always say like
Copy code
subprocess.check_call("prefect --no-prompt deploy --all".split())
m
thank you @Nate for your reply, we would like to deploy --all from code. We have tried using in our code the asyncr function you use in prefect.cli.deploy _deploy(deploy_all=True)_ but it gives us the following error File "C:\prefect-test\venv\Lib\site-packages\prefect\cli\deploy.py", line 226, in deploy app.console.print( ^^^^^^^^^^^ AttributeError: 'PrefectTyper' object has no attribute 'console' so can you confirm that this function of the CLI cannot be used from code? or is there any way I can use it?
n
can you show your actual code?
m
@Nate posting it
Copy code
import asyncio
import os
import pathlib
import subprocess
import yaml
from typing import Union

from prefect.cli.deploy import deploy


class FlowDeployer:
    DEPLOYMENT_FILENAME = 'prefect.yaml'
    #DEPLOYMENT_BATCH = "prefect_deploy.bat"

    def __init__(self, parent_folder_path: Union[str, pathlib.Path],
                 deployment_name: str = None, root_folder: Union[str, pathlib.Path] = None):
        self.deployment_name = deployment_name
        self.root_folder = pathlib.Path(__file__).resolve().parent if not root_folder else root_folder
        self.parent_folder_path = self.root_folder / parent_folder_path

    @property
    def parent_folder_path(self):
        return self._parent_folder_path

    @parent_folder_path.setter
    def parent_folder_path(self, v):
        if isinstance(v, str):
            v = pathlib.Path(v)
        self._parent_folder_path = v

    async def deploy_flows_from_parent_folder(self):
        try:
            path = self.parent_folder_path
            if isinstance(path, str):
                path = pathlib.Path(path)
            parent_deployment_file = path.joinpath(self.DEPLOYMENT_FILENAME)
            if not parent_deployment_file.is_file():
                raise Exception(f"In questo folder {path} non c'è il file {self.DEPLOYMENT_FILENAME}")
            pj_root_deployment_file = self.root_folder.joinpath(self.DEPLOYMENT_FILENAME)
            if pj_root_deployment_file.is_file():
                os.remove(pj_root_deployment_file)
            with open(parent_deployment_file) as f:
                yaml_file = yaml.safe_load(f)
            for d in yaml_file['deployments']:
                d['entrypoint'] = str(path.joinpath(d['entrypoint'])).replace('\\', '/')
            with open(pj_root_deployment_file, 'w') as f:
                yaml.dump(yaml_file, f, sort_keys=False)
            await deploy(deploy_all=True)
            #here the old way to deploy all that we want to remove
            #cmd = 'call "venv/scripts/activate" \n'
            #cmd += f"prefect deploy --all"

            #with open(self.DEPLOYMENT_BATCH, 'w') as f:
                #f.write(cmd)
            #subprocess.check_output(self.DEPLOYMENT_BATCH)
        except Exception as ex:
            raise ex

    def execute(self):
        asyncio.run(self.deploy_flows_from_parent_folder())


if __name__ == "__main__":
    parent_folder = pathlib.Path('./customers/****')
    root_folder = pathlib.Path('C:/prefect')
    d = FlowDeployer(parent_folder_path=parent_folder, root_folder=root_folder)
    d.execute()
As you can see, we have been using subprocess, but if it is possible to use the client's deploy function or some other way you recommend, that would be perfect.
n
ah i see. yeah i think we could do whatever the cli is actually doing under the hood directly, which i dont think we've made first class programmatically, but we likely could