<@ULVA73B9P> Basic question, how do I distribute t...
# marvin-in-the-wild
p
@Marvin Basic question, how do I distribute the work across multiple azure container instances? I need a simple flow example that does that.
m
Here's a basic example of how to distribute work across multiple Azure Container Instances using Prefect. It covers setting up the necessary infrastructure and running a simple flow. ### Creating the Infrastructure Block First, set up the Azure Container Instance credentials and job configuration in your flow:
Copy code
python
from prefect import flow, get_run_logger
from prefect_azure import AzureContainerInstanceCredentials
from prefect_azure.container_instance import AzureContainerInstanceJob

@flow
def log_hello_flow(name="Marvin"):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"{name} said hello!")

if __name__ == "__main__":
    log_hello_flow()
### Using Azure Container Instance as Infrastructure Create an infrastructure block using the Azure Container Instance:
Copy code
python
# Infrastructure Block Setup
from prefect_azure import AzureContainerInstanceCredentials
from prefect_azure.container_instance import AzureContainerInstanceJob

container_instance_job = AzureContainerInstanceJob(
    aci_credentials=AzureContainerInstanceCredentials.load("MY_BLOCK_NAME"),
    resource_group_name="your_resource_group",
    subscription_id="YOUR_AZURE_SUBSCRIPTION_ID",
)
container_instance_job.save("aci-dev")
### Creating the Deployment You can create the deployment from the CLI:
Copy code
sh
prefect deployment build a_flow_module.py:log_hello_flow --name aci-dev -ib container-instance-job/aci-dev
Create an Azure Container Instances typed work pool:
Copy code
sh
prefect work-pool create -t azure-container-instance my-aci-work-pool
### Starting the Worker Run a worker that pulls jobs from the work pool:
Copy code
sh
prefect worker start -n my-aci-worker -p my-aci-work-pool
### Running the Flow Once everything is set up, executing your flow will distribute the work across Azure Container Instances. Additional resources: - Prefect Azure Documentation - Prefect Deployments Guide If you run into issues, Prefect's discourse forums and Slack community can be very helpful places to seek support.
p
@Marvin this is just a single piece of work, it cannot be distributed. Modify this example to so that each log_hello runs on a separate instance
Copy code
@task
def log_hello(name="Marvin"):
    import time

    logger = get_run_logger()
    logger.info(f"{name} said hello!")

    time.sleep(100)
    logger.info("Done!")

    return name


@flow
def log_hello_flow():
    names = ["Marvin", "Alice", "Bob"]
    for name in names:
        log_hello(name)
m
To distribute the
log_hello
tasks across multiple Azure Container Instances, you can modify the
log_hello_flow
to use task mapping. This will parallelize the execution of
log_hello
for each name. Here's the modified code: ### Task Definition
Copy code
python
from prefect import task, flow, get_run_logger
from prefect_azure import AzureContainerInstanceCredentials
from prefect_azure.container_instance import AzureContainerInstanceJob
import time

@task
def log_hello(name="Marvin"):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"{name} said hello!")

    time.sleep(100)
    <http://logger.info|logger.info>("Done!")

    return name
### Flow Definition
Copy code
python
@flow
def log_hello_flow():
    names = ["Marvin", "Alice", "Bob"]
    log_hello.map(names)  # Use .map to distribute tasks

if __name__ == "__main__":
    log_hello_flow()
### Infrastructure Setup Create an infrastructure block using the Azure Container Instance:
Copy code
python
from prefect_azure import AzureContainerInstanceCredentials
from prefect_azure.container_instance import AzureContainerInstanceJob

container_instance_job = AzureContainerInstanceJob(
    aci_credentials=AzureContainerInstanceCredentials.load("MY_BLOCK_NAME"),
    resource_group_name="your_resource_group",
    subscription_id="YOUR_AZURE_SUBSCRIPTION_ID",
)
container_instance_job.save("aci-dev")
### Creating the Deployment You can create the deployment from the CLI:
Copy code
sh
prefect deployment build a_flow_module.py:log_hello_flow --name aci-dev -ib container-instance-job/aci-dev
### Work Pool and Worker Create an Azure Container Instances typed work pool and start a worker:
Copy code
sh
prefect work-pool create -t azure-container-instance my-aci-work-pool
prefect worker start -n my-aci-worker -p my-aci-work-pool
### Running the Flow Executing your flow will now distribute each
log_hello
task across separate Azure Container Instances. For more details, check the Prefect Azure Documentation.
p
@Marvin when running a flow on azure container instance, do I need more workers to be able to create more then one instance?
m
When running a flow on Azure Container Instances (ACI) using Prefect, you do not need multiple workers to create multiple instances. A single Azure Container Instance worker can handle multiple flow runs by provisioning multiple containers as needed. Here’s a consolidated step-by-step on how to run tasks in parallel across multiple Azure Container Instances: ### Task and Flow Definition
Copy code
python
from prefect import task, flow, get_run_logger

@task
def log_hello(name="Marvin"):
    import time
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"{name} said hello!")
    time.sleep(100)
    <http://logger.info|logger.info>("Done!")
    return name

@flow
def log_hello_flow():
    names = ["Marvin", "Alice", "Bob"]
    log_hello.map(names)  # Using .map to parallelize task execution
### Infrastructure Block Setup Define Azure Container Instance as your infrastructure:
Copy code
python
from prefect_azure import AzureContainerInstanceCredentials
from prefect_azure.container_instance import AzureContainerInstanceJob

container_instance_job = AzureContainerInstanceJob(
    aci_credentials=AzureContainerInstanceCredentials.load("MY_BLOCK_NAME"),
    resource_group_name="your_resource_group",
    subscription_id="YOUR_AZURE_SUBSCRIPTION_ID",
)
container_instance_job.save("aci-dev")
### Deployment Creation Create the deployment from the CLI:
Copy code
sh
prefect deployment build a_flow_module.py:log_hello_flow --name aci-dev -ib container-instance-job/aci-dev
### Work Pool and Worker 1. Create an Azure Container Instances typed work pool:
Copy code
sh
    prefect work-pool create -t azure-container-instance my-aci-work-pool
2. Start a worker to handle jobs from the work pool:
Copy code
sh
    prefect worker start -n my-aci-worker -p my-aci-work-pool
### Running the Flow Executing the flow will now distribute
log_hello
tasks across separate Azure Container Instances. The single worker will manage the creation and execution of multiple container instances. For more information, refer to Prefect Azure Documentation and Prefect Deployment Guide. If you have any questions or need further assistance, feel free to visit the Prefect Discourse forum or Slack community.
p
@Marvin I followed the setup for ACI from the docs, I have a working worker pool, worker and the jobs are starting just fine. I used the
.map
method, and yet I only see one instance starting, and seemingly executing all the top level flow and all three tasks.
m
It appears that you're facing an issue where tasks using
.map
are not being distributed to separate Azure Container Instances as expected. Each task should run on its own instance, but instead, they are all running in a single instance. Here's how to address this situation: 1. Confirm Worker Configuration: Ensure that your worker is correctly configured to use the
azure-container-instance
pool. 2. Check Flow Run Configuration: Each task mapped should create a new container. Ensure that the correct infrastructure block is being used. ### Update Code Example with Task Overrides Here’s an update to your existing setup which includes task overrides, ensuring each task runs in a different container.
Copy code
python
from prefect import task, flow, get_run_logger
from prefect.tasks import create_task_run

@task
def log_hello(name="Marvin"):
    import time
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"{name} said hello!")
    time.sleep(100)
    <http://logger.info|logger.info>("Done!")
    return name

@flow
def log_hello_flow():
    names = ["Marvin", "Alice", "Bob"]
    # Map tasks with retries or other options if needed
    log_hello.map(names)

if __name__ == "__main__":
    log_hello_flow()
### Infrastructure Block Setup Ensure you properly set up the infrastructure block:
Copy code
python
from prefect_azure import AzureContainerInstanceCredentials
from prefect_azure.container_instance import AzureContainerInstanceJob

container_instance_job = AzureContainerInstanceJob(
    aci_credentials=AzureContainerInstanceCredentials.load("MY_BLOCK_NAME"),
    resource_group_name="your_resource_group",
    subscription_id="YOUR_AZURE_SUBSCRIPTION_ID",
)
container_instance_job.save("aci-dev")
### Additional Considerations - Ensure worker is correctly processing tasks: Verify that the worker is configured to execute each task separately. Multiple workers are not necessary but should handle scheduling correctly. - Concurrency and Scaling within ACI: Make sure your Azure subscription and resource group settings allow for multiple concurrent container instances. ### Useful Documentation and Links - Prefect Azure Documentation - Prefect Deployment Guide If these suggestions do not resolve your issue, consider checking the Prefect Discourse or Slack community for additional support.
p
@Marvin I used the push work pool with provision-infra:
prefect work-pool create --type azure-container-instance:push --provision-infra my-aci-pool
Do I still have to set up the
aci-dev
container instance job? If yes, why?
m
If you have used the
push
work pool with
--provision-infra
, you do not need to manually set up the
aci-dev
container instance job. The push-based work pool with infrastructure provisioning will handle the creation and management of Azure Container Instances for you. Here's what happens when you use `--provision-infra`: 1. Automatic Infrastructure Management: This option tells Prefect to automatically provision the necessary infrastructure for executing your flow tasks. It handles creating and managing the Azure Container Instances without the need for additional manual setup. 2. Simplified Configuration: By using a push work pool with infrastructure provisioning, you've simplified the setup process, ensuring that tasks are distributed across multiple instances as required. ### Example Workflow with Push Work Pool Let's ensure your setup is coherent with the push work pool: 1. Create a Work Pool:
Copy code
sh
    prefect work-pool create --type azure-container-instance:push --provision-infra my-aci-pool
2. Modify Task and Flow: Since the infrastructure provisioning is handled automatically, you just need to focus on writing your tasks and flows.
Copy code
python
    from prefect import task, flow, get_run_logger

    @task
    def log_hello(name="Marvin"):
        import time
        logger = get_run_logger()
        <http://logger.info|logger.info>(f"{name} said hello!")
        time.sleep(100)
        <http://logger.info|logger.info>("Done!")
        return name

    @flow
    def log_hello_flow():
        names = ["Marvin", "Alice", "Bob"]
        log_hello.map(names)  # Using .map to distribute tasks

    if __name__ == "__main__":
        log_hello_flow()
3. Deploy the Flow: Deploy the flow using the configured work pool:
Copy code
sh
    prefect deployment build a_flow_module.py:log_hello_flow --name aci-dev --pool my-aci-pool
4. Start the Worker: Start an Azure Container Instance worker to pull jobs from the work pool:
Copy code
sh
    prefect worker start -n my-aci-worker -p my-aci-pool
### Verification 1. Check Work Pool: Ensure your work pool is correctly set up and associated with the deployment. 2. Monitor Task Execution: Verify that tasks are being executed on separate Azure Container Instances by checking the Prefect UI or logs. By using the push-based work pool with
--provision-infra
, you leverage Prefect's capability to manage infrastructure automatically, reducing manual setup and complexity. This should resolve your concern about manually setting up the
aci-dev
container instance job.
k
ignore marvin here
to distribute work, you'll need to make multiple deployments, one for each script you want to be computed independently
then you can do
from prefect.deployments import run_deployment
from a parent flow, call
run_deployment
to run another deployment's code in a different container instance and wait for it to finish
you can run many in parallel by placing that call inside a task and calling
.submit
on the task function many times with different parameters
p
Thanks for the suggestion, I will check it out.
so kind of like a flow of deployments
k
exactly
p
so what's the practical difference between
.run_deployment
and
AzureContainerInstanceJob(...).run()
? I guess the flow in the deployment can be more complicated, while a
AzureContainerInstanceJob
is for simpler tasks?
k
that ACI job class is being deprecated
👍 1
it's affiliated with the old deployment style that involved agents and infra blocks
p
thanks for all the help, I wouldn't be able to figure it out from the docs alone, they are super confusing 😉
k
I hear ya. there are big docs improvements on the way 🙂
🙏 1