Mark Reyes
08/07/2024, 9:01 PMMarvin
08/07/2024, 9:02 PMfrom prefect import flow, Deployment
from prefect.deployments import run_deployment
@flow
def sequential_flow():
# Call the first deployment
run_deployment("first-deployment-name")
# Call the second deployment
run_deployment("second-deployment-name")
if __name__ == "__main__":
sequential_flow()
Parallel:
from prefect import flow
from prefect.deployments import run_deployment
import asyncio
@flow
async def parallel_flow():
# Define async tasks for deployments
task1 = asyncio.create_task(run_deployment("first-deployment-name"))
task2 = asyncio.create_task(run_deployment("second-deployment-name"))
# Run tasks in parallel
await asyncio.gather(task1, task2)
if __name__ == "__main__":
asyncio.run(parallel_flow())
For more detailed examples of Prefect flows, you can refer to these resources:
- LINK: https://raw.githubusercontent.com/zzstoatzz/prefect-code-examples/main/examples/flows/basic.py
- LINK: https://raw.githubusercontent.com/zzstoatzz/prefect-code-examples/main/examples/flows/async_basic.pyMark Reyes
08/07/2024, 9:03 PMMarvin
08/07/2024, 9:04 PMrun_deployment
, you can do the following:
python
from prefect import flow
from prefect.deployments import run_deployment
@flow
def sequential_flow_with_params():
# Call the first deployment with parameters
run_deployment("first-deployment-name", parameters={"param1": "value1", "param2": "value2"})
# Call the second deployment with parameters
run_deployment("second-deployment-name", parameters={"paramA": "valueA", "paramB": "valueB"})
if __name__ == "__main__":
sequential_flow_with_params()
For the parallel case:
python
from prefect import flow
from prefect.deployments import run_deployment
import asyncio
@flow
async def parallel_flow_with_params():
# Define async tasks for deployments with parameters
task1 = asyncio.create_task(run_deployment("first-deployment-name", parameters={"param1": "value1", "param2": "value2"}))
task2 = asyncio.create_task(run_deployment("second-deployment-name", parameters={"paramA": "valueA", "paramB": "valueB"}))
# Run tasks in parallel
await asyncio.gather(task1, task2)
if __name__ == "__main__":
asyncio.run(parallel_flow_with_params())
For more detailed information on deployment parameters, you can refer to these Prefect docs.
Additionally, Prefect code examples are available here.Mark Reyes
08/07/2024, 9:33 PMMarvin
08/07/2024, 9:34 PMpython
from prefect import flow, task
from prefect.deployments import run_deployment
import asyncio
@task
def task_1(param1, param2):
print(f"Task 1 with parameters: {param1}, {param2}")
@task
def task_2(paramA, paramB):
print(f"Task 2 with parameters: {paramA}, {paramB}")
@flow
def sequential_flow_with_tasks():
task_list = [
run_deployment("first-deployment-name", parameters={"param1": "value1", "param2": "value2"}),
run_deployment("second-deployment-name", parameters={"paramA": "valueA", "paramB": "valueB"})
]
for task in task_list:
task_1("value1", "value2")
task_2("valueA", "valueB")
@flow
async def parallel_flow_with_tasks():
task_list = [
asyncio.create_task(run_deployment("first-deployment-name", parameters={"param1": "value1", "param2": "value2"})),
asyncio.create_task(run_deployment("second-deployment-name", parameters={"paramA": "valueA", "paramB": "valueB"}))
]
await asyncio.gather(*task_list)
await asyncio.gather(task_1("value1", "value2"), task_2("valueA", "valueB"))
if __name__ == "__main__":
sequential_flow_with_tasks()
asyncio.run(parallel_flow_with_tasks())
In this example:
- task_1
and task_2
are individual tasks that print out received parameters.
- sequential_flow_with_tasks
iterates over a list of deployment tasks, running them sequentially.
- parallel_flow_with_tasks
runs deployments in parallel using asyncio.create_task
and gathers results asynchronously.
For more detailed information on using tasks and their features, you can visit these Prefect docs on tasks.Mark Reyes
08/07/2024, 9:57 PMMarvin
08/07/2024, 9:57 PMGET /deployments
This endpoint retrieves a list of all deployments.
2. Python Code Example:
python
import requests
# Set up the API URL and authentication
api_url = "<https://api.prefect.io/deployments>"
headers = {
"Authorization": "Bearer YOUR_API_TOKEN"
}
# Make a GET request to the Prefect API to list deployments
response = requests.get(api_url, headers=headers)
# Check if the request was successful
if response.status_code == 200:
deployments = response.json()
for deployment in deployments:
print(deployment)
else:
print(f"Failed to retrieve deployments: {response.status_code} - {response.text}")
Replace YOUR_API_TOKEN
with your actual Prefect API token.
For more detailed examples and documentation, you can visit this Prefect Kubernetes documentation. While it provides information on Kubernetes-specific deployments, it often includes more general deployment management techniques relevant to Prefect users.