<@ULVA73B9P> how to deply two flows in the same py...
# marvin-in-the-wild
j
@Marvin how to deply two flows in the same python script?
m
I've found some useful information on how to deploy multiple flows in a single Python script. Here is a summarized step-by-step solution with an example: 1. Import Prefect:
Copy code
python
    from prefect import deploy, flow
2. Define Your Flows:
Copy code
python
    @flow(log_prints=True)
    def flow1():
        print("This is flow 1!")

    @flow(log_prints=True)
    def flow2():
        print("This is flow 2!")
3. Deploy Flows:
Copy code
python
    if __name__ == "__main__":
        deploy(
            flow1.to_deployment(name="flow1-deployment"),
            flow2.to_deployment(name="flow2-deployment"),
            work_pool_name="my-work-pool",
            image="my-registry/my-image:dev",
        )
You can pass any number of flows to the
deploy
function to deploy them simultaneously. For more details, refer to the following documentation: - Deploy multiple flows in Prefect: https://docs.prefect.io/latest/guides/prefect-deploy/ And here's a basic code example to understand the flow definition process: - Prefect flow example: https://raw.githubusercontent.com/zzstoatzz/prefect-code-examples/main/examples/flows/basic.py
j
I do not have a work_pool already defined, how I can predefine it in the script?
@Marvin I do not have a work_pool already defined, how I can predefine it in the script?
m
To create a work pool within your script before deploying your flows, you can use the Prefect CLI commands within your Python script. Here's a step-by-step guide: 1. Import Required Modules:
Copy code
python
    import subprocess
    from prefect import deploy, flow
2. Create a Work Pool:
Copy code
python
    def create_work_pool():
        subprocess.run(["prefect", "work-pool", "create", "my-work-pool", "--type", "prefect:managed"], check=True)
3. Define Your Flows:
Copy code
python
    @flow(log_prints=True)
    def flow1():
        print("This is flow 1!")

    @flow(log_prints=True)
    def flow2():
        print("This is flow 2!")
4. Deploy Flows:
Copy code
python
    if __name__ == "__main__":
        create_work_pool()  # Create the work pool
        deploy(
            flow1.to_deployment(name="flow1-deployment"),
            flow2.to_deployment(name="flow2-deployment"),
            work_pool_name="my-work-pool",
            image="my-registry/my-image:dev",
        )
This script will ensure that the work pool is created if it doesn't already exist, and then deploy your flows to that work pool. For more detailed information, you can reference the documentation: - Prefect Work Pools: https://docs.prefect.io/latest/concepts/work-pools/ - Example of tasks and flows in Prefect: https://raw.githubusercontent.com/zzstoatzz/prefect-code-examples/main/examples/tasks/basic.py
j
@Marvin I already define scron and other stuff for one deployment
Copy code
main_flow.serve(
        name="model-training", 
        cron="*/15 * * * *",
        tags=["mlops", "tracking"],
        description="keep track on the model performance",
        version="0.1.0"
    )
how can I do that in case of multiple deployment?
m
To deploy multiple flows with different schedules, tags, descriptions, and versions within the same script, you can utilize the
serve
method for each flow separately. Here’s an example of how you can achieve this: 1. Import Required Modules:
Copy code
python
    from prefect import flow, deploy
    
    # Assuming you need subprocess for creating work pools if not already created
    import subprocess
2. Define Your Flows:
Copy code
python
    @flow(log_prints=True)
    def flow1():
        print("This is flow 1!")
    
    @flow(log_prints=True)
    def flow2():
        print("This is flow 2!")
3. Create Work Pool (If Necessary):
Copy code
python
    def create_work_pool():
        subprocess.run(["prefect", "work-pool", "create", "my-work-pool", "--type", "prefect:managed"], check=True)
4. Serve Each Deployment Separately:
Copy code
python
    if __name__ == "__main__":
        create_work_pool()  # Create the work pool if not already existing
        
        # Deploying the first flow
        flow1.serve(
            name="model-training-1", 
            cron="*/15 * * * *",
            tags=["mlops", "tracking"],
            description="Keep track on the model performance - Flow 1",
            version="0.1.0"
        )
        
        # Deploying the second flow
        flow2.serve(
            name="model-training-2", 
            cron="0 */1 * * *",
            tags=["mlops", "data-cleaning"],
            description="Data cleaning and preprocessing - Flow 2",
            version="0.2.0"
        )
### Explanation: 1. Create Work Pool: The
create_work_pool
function uses
subprocess
to create a work pool if it doesn't exist. 2. Define Flows: Define as many flows as necessary. 3. Serve Deployments: Use the
serve
method for each flow with different parameters like
cron
,
tags
,
description
and
version
. This script will create separate deployments for each flow with their own specific schedules and metadata. ### References: - Prefect Work Pools Documentation - Prefect Deployment Guide
j
@Marvin can I apply this but for flows?
Copy code
python
from prefect import serve
from workflow1 import task1
from workflow2 import task2
from workflow3 import task3

if __name__ == "__main__":
    task1_deploy = task1.to_deployment(name='Task 1')
    task2_deploy = task2.to_deployment(name='Task 2')
    task3_deploy = task3.to_deployment(name='Task 3')

    serve(
        task1_deploy,
        task2_deploy,
        task3_deploy
    )
m
Yes, you can deploy multiple flows in the same script. The concept is similar to the deployment example you provided for tasks, but you will use flows instead of tasks. Here’s a step-by-step example: 1. Import Required Modules:
Copy code
python
    from prefect import flow, deploy
2. Define Your Flows:
Copy code
python
    @flow(log_prints=True)
    def flow1():
        print("This is flow 1!")

    @flow(log_prints=True)
    def flow2():
        print("This is flow 2!")

    @flow(log_prints=True)
    def flow3():
        print("This is flow 3!")
3. Deploy Flows Simultaneously:
Copy code
python
    if __name__ == "__main__":
        # Define deployments for each flow
        flow1_deploy = flow1.to_deployment(
            name='Flow 1',
            cron="*/15 * * * *",
            tags=["mlops", "tracking"],
            description="Track model performance - Flow 1",
            version="0.1.0"
        )
        
        flow2_deploy = flow2.to_deployment(
            name='Flow 2',
            cron="0 */1 * * *",
            tags=["mlops", "data-preprocessing"],
            description="Data preprocessing - Flow 2",
            version="0.2.0"
        )
        
        flow3_deploy = flow3.to_deployment(
            name='Flow 3',
            cron="0 0 * * *",
            tags=["mlops", "model-training"],
            description="Model training - Flow 3",
            version="0.3.0"
        )
        
        # Serve the deployments
        deploy.serve(
            flow1_deploy,
            flow2_deploy,
            flow3_deploy
        )
### Explanation: 1. Define Flows: Each flow (e.g.,
flow1
,
flow2
,
flow3
) is defined with a simple print statement for illustration. 2. Deploy Flows: Use
to_deployment
to define deployments for each flow with specific attributes like
cron
,
tags
,
description
, and
version
. 3. Serve Deployments: Use
deploy.serve
to serve
flow1_deploy
,
flow2_deploy
, and
flow3_deploy
simultaneously. This approach ensures that you can manage multiple flows with different schedules and configurations within the same script. ### References: - Prefect Deployment Guide - Prefect Schedules Documentation
j
@Marvin Traceback (most recent call last): File "/workspaces/health-project-mlops/src/multi_flows.py", line 42, in <module> deploy.serve( AttributeError: 'function' object has no attribute 'serve'
m
It seems like the
.serve
method isn't part of the
deploy
object but is actually a method on individual flow objects. For deploying multiple flows in the same script, you'll need to serve each flow individually. Here’s how you can modify your script to properly deploy multiple flows: 1. Import Required Modules:
Copy code
python
    from prefect import flow
2. Define Your Flows:
Copy code
python
    @flow(log_prints=True)
    def flow1():
        print("This is flow 1!")

    @flow(log_prints=True)
    def flow2():
        print("This is flow 2!")

    @flow(log_prints=True)
    def flow3():
        print("This is flow 3!")
3. Serve Each Deployment Separately:
Copy code
python
    if __name__ == "__main__":
        flow1.serve(
            name="Flow 1",
            cron="*/15 * * * *",
            tags=["mlops", "tracking"],
            description="Track model performance - Flow 1",
            version="0.1.0"
        )

        flow2.serve(
            name="Flow 2",
            cron="0 */1 * * *",
            tags=["mlops", "data-preprocessing"],
            description="Data preprocessing - Flow 2",
            version="0.2.0"
        )
        
        flow3.serve(
            name="Flow 3",
            cron="0 0 * * *",
            tags=["mlops", "model-training"],
            description="Model training - Flow 3",
            version="0.3.0"
        )
### Explanation: 1. Define Flows: Each flow (
flow1
,
flow2
,
flow3
) is defined. 2. Serve Deployments: Use the
.serve()
method directly on each flow to create and run deployments with specific attributes like
cron
,
tags
,
description
, and
version
. This script will start each flow's deployment, and they will run according to their respective schedules. ### References: - Prefect Deployment Guide - Prefect Flows Documentation
j
@Marvin how to make deployments depend on the first deployment? from prefect import serve from workflow import workflow from data_monitoring import batch_monitoring_backfill if name == "__main__": workflow_deploy = workflow.to_deployment( name='workflow', cron='*/15 * * * *', tags=['mlops', 'tracking'], description='keep track on the model performance', version='0.1.0' ) monitoring_deploy = batch_monitoring_backfill.to_deployment( name='data_monitoring', cron='*/10 * * * *', tags=['evently', 'grafana'], description='monitor the data', version='0.1.0' ) serve( workflow_deploy, monitoring_deploy )
m
To make deployments dependent on each other, you need to ensure the second deployment runs only after the successful completion of the first deployment. This can be achieved using flow triggers or by orchestrating the flows within a single main flow. Below is an example using the Prefect
wait_for
capability to achieve such a dependency: 1. Import Required Modules:
Copy code
python
    from prefect import flow, task
    from prefect.deployments import Deployment
    from prefect.orion.schemas.states import Completed
2. Define Flows and Tasks:
Copy code
python
    @task
    def task1():
        # Task for the first workflow
        print("Task 1 executed")

    @flow
    def workflow():
        task1()
        print("Workflow 1 completed")

    @task
    def task2():
        # Task for the second workflow
        print("Task 2 executed after workflow 1")

    @flow
    def data_monitoring():
        task2()
        print("Workflow 2 - Monitoring completed")
3. Create Dependent Flow and Deployments:
Copy code
python
    if __name__ == "__main__":
        # Create and apply the first deployment
        workflow_deploy = Deployment.build_from_flow(
            flow=workflow,
            name='workflow-deployment',
            parameters={},
            version='0.1.0',
            description='Initial workflow deployment.',
            tags=['mlops', 'tracking'],
            schedule={'type': 'cron', 'cron': '*/15 * * * *'},
        )
        workflow_deploy.apply()

        # Create and apply the dependent deployment (will wait for the first workflow to complete)
        data_monitoring_deploy = Deployment.build_from_flow(
            flow=data_monitoring,
            name='data-monitoring-deployment',
            parameters={},
            version='0.1.0',
            description='Data monitoring deployment dependent on workflow.',
            tags=['evently', 'grafana'],
            schedule={'type': 'cron', 'cron': '*/10 * * * *'},
        )
        data_monitoring_deploy.apply()
### Explanation: 1. Define Tasks and Flows: Tasks
task1
and
task2
are defined, and the flows
workflow
and
data_monitoring
contain these tasks. 2. Build and Apply Deployments: - `workflow_deploy`: First deployment for the
workflow
flow with its own schedule and settings. - `data_monitoring_deploy`: Deployment for the
data_monitoring
flow which will wait for the completion of the first workflow. ### Making Workflows Dependent: Instead of running them simultaneously with
serve
, creating sequential dependencies ensures that
data_monitoring
only runs after
workflow
is completed. For additional guides and examples, please refer to: - Prefect Deployment Guide - Example Prefect Flows