<@ULVA73B9P> example of flow calling two deploymen...
# ask-marvin
m
@Marvin example of flow calling two deployments sequentially. another example flow calling two deployments in parallel
m
Here are examples of flows calling two deployments: Sequentially:
Copy code
from prefect import flow, Deployment
from prefect.deployments import run_deployment

@flow
def sequential_flow():
    # Call the first deployment
    run_deployment("first-deployment-name")
    
    # Call the second deployment
    run_deployment("second-deployment-name")

if __name__ == "__main__":
    sequential_flow()
Parallel:
Copy code
from prefect import flow
from prefect.deployments import run_deployment
import asyncio

@flow
async def parallel_flow():
    # Define async tasks for deployments
    task1 = asyncio.create_task(run_deployment("first-deployment-name"))
    task2 = asyncio.create_task(run_deployment("second-deployment-name"))

    # Run tasks in parallel
    await asyncio.gather(task1, task2)

if __name__ == "__main__":
    asyncio.run(parallel_flow())
For more detailed examples of Prefect flows, you can refer to these resources: - LINK: https://raw.githubusercontent.com/zzstoatzz/prefect-code-examples/main/examples/flows/basic.py - LINK: https://raw.githubusercontent.com/zzstoatzz/prefect-code-examples/main/examples/flows/async_basic.py
m
@Marvin pass in deployment params to run_reployment
m
To pass parameters to a deployment using
run_deployment
, you can do the following:
Copy code
python
from prefect import flow
from prefect.deployments import run_deployment

@flow
def sequential_flow_with_params():
    # Call the first deployment with parameters
    run_deployment("first-deployment-name", parameters={"param1": "value1", "param2": "value2"})
    
    # Call the second deployment with parameters
    run_deployment("second-deployment-name", parameters={"paramA": "valueA", "paramB": "valueB"})

if __name__ == "__main__":
    sequential_flow_with_params()
For the parallel case:
Copy code
python
from prefect import flow
from prefect.deployments import run_deployment
import asyncio

@flow
async def parallel_flow_with_params():
    # Define async tasks for deployments with parameters
    task1 = asyncio.create_task(run_deployment("first-deployment-name", parameters={"param1": "value1", "param2": "value2"}))
    task2 = asyncio.create_task(run_deployment("second-deployment-name", parameters={"paramA": "valueA", "paramB": "valueB"}))

    # Run tasks in parallel
    await asyncio.gather(task1, task2)

if __name__ == "__main__":
    asyncio.run(parallel_flow_with_params())
For more detailed information on deployment parameters, you can refer to these Prefect docs. Additionally, Prefect code examples are available here.
m
@Marvin update example to use list of tasks
m
Here is your updated example, incorporating a list of tasks within the flow:
Copy code
python
from prefect import flow, task
from prefect.deployments import run_deployment
import asyncio

@task
def task_1(param1, param2):
    print(f"Task 1 with parameters: {param1}, {param2}")

@task
def task_2(paramA, paramB):
    print(f"Task 2 with parameters: {paramA}, {paramB}")

@flow
def sequential_flow_with_tasks():
    task_list = [
        run_deployment("first-deployment-name", parameters={"param1": "value1", "param2": "value2"}),
        run_deployment("second-deployment-name", parameters={"paramA": "valueA", "paramB": "valueB"})
    ]
    for task in task_list:
        task_1("value1", "value2")
        task_2("valueA", "valueB")

@flow
async def parallel_flow_with_tasks():
    task_list = [
        asyncio.create_task(run_deployment("first-deployment-name", parameters={"param1": "value1", "param2": "value2"})),
        asyncio.create_task(run_deployment("second-deployment-name", parameters={"paramA": "valueA", "paramB": "valueB"}))
    ]
    await asyncio.gather(*task_list)
    await asyncio.gather(task_1("value1", "value2"), task_2("valueA", "valueB"))

if __name__ == "__main__":
    sequential_flow_with_tasks()
    asyncio.run(parallel_flow_with_tasks())
In this example: -
task_1
and
task_2
are individual tasks that print out received parameters. -
sequential_flow_with_tasks
iterates over a list of deployment tasks, running them sequentially. -
parallel_flow_with_tasks
runs deployments in parallel using
asyncio.create_task
and gathers results asynchronously. For more detailed information on using tasks and their features, you can visit these Prefect docs on tasks.
m
@Marvin rest api list deployments
m
To list deployments using the Prefect REST API, you can use the endpoint designed for this purpose. Here's an example of how to do so: 1. Endpoint URL:
Copy code
GET /deployments
This endpoint retrieves a list of all deployments. 2. Python Code Example:
Copy code
python
import requests

# Set up the API URL and authentication
api_url = "<https://api.prefect.io/deployments>"
headers = {
    "Authorization": "Bearer YOUR_API_TOKEN"
}

# Make a GET request to the Prefect API to list deployments
response = requests.get(api_url, headers=headers)

# Check if the request was successful
if response.status_code == 200:
    deployments = response.json()
    for deployment in deployments:
        print(deployment)
else:
    print(f"Failed to retrieve deployments: {response.status_code} - {response.text}")
Replace
YOUR_API_TOKEN
with your actual Prefect API token. For more detailed examples and documentation, you can visit this Prefect Kubernetes documentation. While it provides information on Kubernetes-specific deployments, it often includes more general deployment management techniques relevant to Prefect users.