Juanes Grimaldos
07/28/2024, 4:28 PMMarvin
07/28/2024, 4:28 PMpython
from prefect import deploy, flow
2. Define Your Flows:
python
@flow(log_prints=True)
def flow1():
print("This is flow 1!")
@flow(log_prints=True)
def flow2():
print("This is flow 2!")
3. Deploy Flows:
python
if __name__ == "__main__":
deploy(
flow1.to_deployment(name="flow1-deployment"),
flow2.to_deployment(name="flow2-deployment"),
work_pool_name="my-work-pool",
image="my-registry/my-image:dev",
)
You can pass any number of flows to the deploy
function to deploy them simultaneously.
For more details, refer to the following documentation:
- Deploy multiple flows in Prefect: https://docs.prefect.io/latest/guides/prefect-deploy/
And here's a basic code example to understand the flow definition process:
- Prefect flow example: https://raw.githubusercontent.com/zzstoatzz/prefect-code-examples/main/examples/flows/basic.pyJuanes Grimaldos
07/28/2024, 4:29 PMJuanes Grimaldos
07/28/2024, 4:30 PMMarvin
07/28/2024, 4:30 PMpython
import subprocess
from prefect import deploy, flow
2. Create a Work Pool:
python
def create_work_pool():
subprocess.run(["prefect", "work-pool", "create", "my-work-pool", "--type", "prefect:managed"], check=True)
3. Define Your Flows:
python
@flow(log_prints=True)
def flow1():
print("This is flow 1!")
@flow(log_prints=True)
def flow2():
print("This is flow 2!")
4. Deploy Flows:
python
if __name__ == "__main__":
create_work_pool() # Create the work pool
deploy(
flow1.to_deployment(name="flow1-deployment"),
flow2.to_deployment(name="flow2-deployment"),
work_pool_name="my-work-pool",
image="my-registry/my-image:dev",
)
This script will ensure that the work pool is created if it doesn't already exist, and then deploy your flows to that work pool.
For more detailed information, you can reference the documentation:
- Prefect Work Pools: https://docs.prefect.io/latest/concepts/work-pools/
- Example of tasks and flows in Prefect: https://raw.githubusercontent.com/zzstoatzz/prefect-code-examples/main/examples/tasks/basic.pyJuanes Grimaldos
07/28/2024, 4:33 PMmain_flow.serve(
name="model-training",
cron="*/15 * * * *",
tags=["mlops", "tracking"],
description="keep track on the model performance",
version="0.1.0"
)
how can I do that in case of multiple deployment?Marvin
07/28/2024, 4:33 PMserve
method for each flow separately. Here’s an example of how you can achieve this:
1. Import Required Modules:
python
from prefect import flow, deploy
# Assuming you need subprocess for creating work pools if not already created
import subprocess
2. Define Your Flows:
python
@flow(log_prints=True)
def flow1():
print("This is flow 1!")
@flow(log_prints=True)
def flow2():
print("This is flow 2!")
3. Create Work Pool (If Necessary):
python
def create_work_pool():
subprocess.run(["prefect", "work-pool", "create", "my-work-pool", "--type", "prefect:managed"], check=True)
4. Serve Each Deployment Separately:
python
if __name__ == "__main__":
create_work_pool() # Create the work pool if not already existing
# Deploying the first flow
flow1.serve(
name="model-training-1",
cron="*/15 * * * *",
tags=["mlops", "tracking"],
description="Keep track on the model performance - Flow 1",
version="0.1.0"
)
# Deploying the second flow
flow2.serve(
name="model-training-2",
cron="0 */1 * * *",
tags=["mlops", "data-cleaning"],
description="Data cleaning and preprocessing - Flow 2",
version="0.2.0"
)
### Explanation:
1. Create Work Pool: The create_work_pool
function uses subprocess
to create a work pool if it doesn't exist.
2. Define Flows: Define as many flows as necessary.
3. Serve Deployments: Use the serve
method for each flow with different parameters like cron
, tags
, description
and version
.
This script will create separate deployments for each flow with their own specific schedules and metadata.
### References:
- Prefect Work Pools Documentation
- Prefect Deployment GuideJuanes Grimaldos
07/28/2024, 4:45 PMpython
from prefect import serve
from workflow1 import task1
from workflow2 import task2
from workflow3 import task3
if __name__ == "__main__":
task1_deploy = task1.to_deployment(name='Task 1')
task2_deploy = task2.to_deployment(name='Task 2')
task3_deploy = task3.to_deployment(name='Task 3')
serve(
task1_deploy,
task2_deploy,
task3_deploy
)
Marvin
07/28/2024, 4:45 PMpython
from prefect import flow, deploy
2. Define Your Flows:
python
@flow(log_prints=True)
def flow1():
print("This is flow 1!")
@flow(log_prints=True)
def flow2():
print("This is flow 2!")
@flow(log_prints=True)
def flow3():
print("This is flow 3!")
3. Deploy Flows Simultaneously:
python
if __name__ == "__main__":
# Define deployments for each flow
flow1_deploy = flow1.to_deployment(
name='Flow 1',
cron="*/15 * * * *",
tags=["mlops", "tracking"],
description="Track model performance - Flow 1",
version="0.1.0"
)
flow2_deploy = flow2.to_deployment(
name='Flow 2',
cron="0 */1 * * *",
tags=["mlops", "data-preprocessing"],
description="Data preprocessing - Flow 2",
version="0.2.0"
)
flow3_deploy = flow3.to_deployment(
name='Flow 3',
cron="0 0 * * *",
tags=["mlops", "model-training"],
description="Model training - Flow 3",
version="0.3.0"
)
# Serve the deployments
deploy.serve(
flow1_deploy,
flow2_deploy,
flow3_deploy
)
### Explanation:
1. Define Flows: Each flow (e.g., flow1
, flow2
, flow3
) is defined with a simple print statement for illustration.
2. Deploy Flows: Use to_deployment
to define deployments for each flow with specific attributes like cron
, tags
, description
, and version
.
3. Serve Deployments: Use deploy.serve
to serve flow1_deploy
, flow2_deploy
, and flow3_deploy
simultaneously.
This approach ensures that you can manage multiple flows with different schedules and configurations within the same script.
### References:
- Prefect Deployment Guide
- Prefect Schedules DocumentationJuanes Grimaldos
07/28/2024, 4:52 PMMarvin
07/28/2024, 4:52 PM.serve
method isn't part of the deploy
object but is actually a method on individual flow objects. For deploying multiple flows in the same script, you'll need to serve each flow individually. Here’s how you can modify your script to properly deploy multiple flows:
1. Import Required Modules:
python
from prefect import flow
2. Define Your Flows:
python
@flow(log_prints=True)
def flow1():
print("This is flow 1!")
@flow(log_prints=True)
def flow2():
print("This is flow 2!")
@flow(log_prints=True)
def flow3():
print("This is flow 3!")
3. Serve Each Deployment Separately:
python
if __name__ == "__main__":
flow1.serve(
name="Flow 1",
cron="*/15 * * * *",
tags=["mlops", "tracking"],
description="Track model performance - Flow 1",
version="0.1.0"
)
flow2.serve(
name="Flow 2",
cron="0 */1 * * *",
tags=["mlops", "data-preprocessing"],
description="Data preprocessing - Flow 2",
version="0.2.0"
)
flow3.serve(
name="Flow 3",
cron="0 0 * * *",
tags=["mlops", "model-training"],
description="Model training - Flow 3",
version="0.3.0"
)
### Explanation:
1. Define Flows: Each flow (flow1
, flow2
, flow3
) is defined.
2. Serve Deployments: Use the .serve()
method directly on each flow to create and run deployments with specific attributes like cron
, tags
, description
, and version
.
This script will start each flow's deployment, and they will run according to their respective schedules.
### References:
- Prefect Deployment Guide
- Prefect Flows DocumentationJuanes Grimaldos
07/28/2024, 5:24 PMMarvin
07/28/2024, 5:25 PMwait_for
capability to achieve such a dependency:
1. Import Required Modules:
python
from prefect import flow, task
from prefect.deployments import Deployment
from prefect.orion.schemas.states import Completed
2. Define Flows and Tasks:
python
@task
def task1():
# Task for the first workflow
print("Task 1 executed")
@flow
def workflow():
task1()
print("Workflow 1 completed")
@task
def task2():
# Task for the second workflow
print("Task 2 executed after workflow 1")
@flow
def data_monitoring():
task2()
print("Workflow 2 - Monitoring completed")
3. Create Dependent Flow and Deployments:
python
if __name__ == "__main__":
# Create and apply the first deployment
workflow_deploy = Deployment.build_from_flow(
flow=workflow,
name='workflow-deployment',
parameters={},
version='0.1.0',
description='Initial workflow deployment.',
tags=['mlops', 'tracking'],
schedule={'type': 'cron', 'cron': '*/15 * * * *'},
)
workflow_deploy.apply()
# Create and apply the dependent deployment (will wait for the first workflow to complete)
data_monitoring_deploy = Deployment.build_from_flow(
flow=data_monitoring,
name='data-monitoring-deployment',
parameters={},
version='0.1.0',
description='Data monitoring deployment dependent on workflow.',
tags=['evently', 'grafana'],
schedule={'type': 'cron', 'cron': '*/10 * * * *'},
)
data_monitoring_deploy.apply()
### Explanation:
1. Define Tasks and Flows: Tasks task1
and task2
are defined, and the flows workflow
and data_monitoring
contain these tasks.
2. Build and Apply Deployments:
- `workflow_deploy`: First deployment for the workflow
flow with its own schedule and settings.
- `data_monitoring_deploy`: Deployment for the data_monitoring
flow which will wait for the completion of the first workflow.
### Making Workflows Dependent:
Instead of running them simultaneously with serve
, creating sequential dependencies ensures that data_monitoring
only runs after workflow
is completed.
For additional guides and examples, please refer to:
- Prefect Deployment Guide
- Example Prefect Flows