Paweł Biernat
05/29/2024, 10:54 AMMarvin
05/29/2024, 10:55 AMpython
from prefect import flow, get_run_logger
from prefect_azure import AzureContainerInstanceCredentials
from prefect_azure.container_instance import AzureContainerInstanceJob
@flow
def log_hello_flow(name="Marvin"):
logger = get_run_logger()
<http://logger.info|logger.info>(f"{name} said hello!")
if __name__ == "__main__":
log_hello_flow()
### Using Azure Container Instance as Infrastructure
Create an infrastructure block using the Azure Container Instance:
python
# Infrastructure Block Setup
from prefect_azure import AzureContainerInstanceCredentials
from prefect_azure.container_instance import AzureContainerInstanceJob
container_instance_job = AzureContainerInstanceJob(
aci_credentials=AzureContainerInstanceCredentials.load("MY_BLOCK_NAME"),
resource_group_name="your_resource_group",
subscription_id="YOUR_AZURE_SUBSCRIPTION_ID",
)
container_instance_job.save("aci-dev")
### Creating the Deployment
You can create the deployment from the CLI:
sh
prefect deployment build a_flow_module.py:log_hello_flow --name aci-dev -ib container-instance-job/aci-dev
Create an Azure Container Instances typed work pool:
sh
prefect work-pool create -t azure-container-instance my-aci-work-pool
### Starting the Worker
Run a worker that pulls jobs from the work pool:
sh
prefect worker start -n my-aci-worker -p my-aci-work-pool
### Running the Flow
Once everything is set up, executing your flow will distribute the work across Azure Container Instances.
Additional resources:
- Prefect Azure Documentation
- Prefect Deployments Guide
If you run into issues, Prefect's discourse forums and Slack community can be very helpful places to seek support.Paweł Biernat
05/29/2024, 10:57 AM@task
def log_hello(name="Marvin"):
import time
logger = get_run_logger()
logger.info(f"{name} said hello!")
time.sleep(100)
logger.info("Done!")
return name
@flow
def log_hello_flow():
names = ["Marvin", "Alice", "Bob"]
for name in names:
log_hello(name)
Marvin
05/29/2024, 10:59 AMlog_hello
tasks across multiple Azure Container Instances, you can modify the log_hello_flow
to use task mapping. This will parallelize the execution of log_hello
for each name.
Here's the modified code:
### Task Definition
python
from prefect import task, flow, get_run_logger
from prefect_azure import AzureContainerInstanceCredentials
from prefect_azure.container_instance import AzureContainerInstanceJob
import time
@task
def log_hello(name="Marvin"):
logger = get_run_logger()
<http://logger.info|logger.info>(f"{name} said hello!")
time.sleep(100)
<http://logger.info|logger.info>("Done!")
return name
### Flow Definition
python
@flow
def log_hello_flow():
names = ["Marvin", "Alice", "Bob"]
log_hello.map(names) # Use .map to distribute tasks
if __name__ == "__main__":
log_hello_flow()
### Infrastructure Setup
Create an infrastructure block using the Azure Container Instance:
python
from prefect_azure import AzureContainerInstanceCredentials
from prefect_azure.container_instance import AzureContainerInstanceJob
container_instance_job = AzureContainerInstanceJob(
aci_credentials=AzureContainerInstanceCredentials.load("MY_BLOCK_NAME"),
resource_group_name="your_resource_group",
subscription_id="YOUR_AZURE_SUBSCRIPTION_ID",
)
container_instance_job.save("aci-dev")
### Creating the Deployment
You can create the deployment from the CLI:
sh
prefect deployment build a_flow_module.py:log_hello_flow --name aci-dev -ib container-instance-job/aci-dev
### Work Pool and Worker
Create an Azure Container Instances typed work pool and start a worker:
sh
prefect work-pool create -t azure-container-instance my-aci-work-pool
prefect worker start -n my-aci-worker -p my-aci-work-pool
### Running the Flow
Executing your flow will now distribute each log_hello
task across separate Azure Container Instances.
For more details, check the Prefect Azure Documentation.Paweł Biernat
05/29/2024, 11:07 AMMarvin
05/29/2024, 11:07 AMpython
from prefect import task, flow, get_run_logger
@task
def log_hello(name="Marvin"):
import time
logger = get_run_logger()
<http://logger.info|logger.info>(f"{name} said hello!")
time.sleep(100)
<http://logger.info|logger.info>("Done!")
return name
@flow
def log_hello_flow():
names = ["Marvin", "Alice", "Bob"]
log_hello.map(names) # Using .map to parallelize task execution
### Infrastructure Block Setup
Define Azure Container Instance as your infrastructure:
python
from prefect_azure import AzureContainerInstanceCredentials
from prefect_azure.container_instance import AzureContainerInstanceJob
container_instance_job = AzureContainerInstanceJob(
aci_credentials=AzureContainerInstanceCredentials.load("MY_BLOCK_NAME"),
resource_group_name="your_resource_group",
subscription_id="YOUR_AZURE_SUBSCRIPTION_ID",
)
container_instance_job.save("aci-dev")
### Deployment Creation
Create the deployment from the CLI:
sh
prefect deployment build a_flow_module.py:log_hello_flow --name aci-dev -ib container-instance-job/aci-dev
### Work Pool and Worker
1. Create an Azure Container Instances typed work pool:
sh
prefect work-pool create -t azure-container-instance my-aci-work-pool
2. Start a worker to handle jobs from the work pool:
sh
prefect worker start -n my-aci-worker -p my-aci-work-pool
### Running the Flow
Executing the flow will now distribute log_hello
tasks across separate Azure Container Instances. The single worker will manage the creation and execution of multiple container instances.
For more information, refer to Prefect Azure Documentation and Prefect Deployment Guide. If you have any questions or need further assistance, feel free to visit the Prefect Discourse forum or Slack community.Paweł Biernat
05/29/2024, 11:13 AM.map
method, and yet I only see one instance starting, and seemingly executing all the top level flow and all three tasks.Marvin
05/29/2024, 11:13 AM.map
are not being distributed to separate Azure Container Instances as expected. Each task should run on its own instance, but instead, they are all running in a single instance.
Here's how to address this situation:
1. Confirm Worker Configuration: Ensure that your worker is correctly configured to use the azure-container-instance
pool.
2. Check Flow Run Configuration: Each task mapped should create a new container. Ensure that the correct infrastructure block is being used.
### Update Code Example with Task Overrides
Here’s an update to your existing setup which includes task overrides, ensuring each task runs in a different container.
python
from prefect import task, flow, get_run_logger
from prefect.tasks import create_task_run
@task
def log_hello(name="Marvin"):
import time
logger = get_run_logger()
<http://logger.info|logger.info>(f"{name} said hello!")
time.sleep(100)
<http://logger.info|logger.info>("Done!")
return name
@flow
def log_hello_flow():
names = ["Marvin", "Alice", "Bob"]
# Map tasks with retries or other options if needed
log_hello.map(names)
if __name__ == "__main__":
log_hello_flow()
### Infrastructure Block Setup
Ensure you properly set up the infrastructure block:
python
from prefect_azure import AzureContainerInstanceCredentials
from prefect_azure.container_instance import AzureContainerInstanceJob
container_instance_job = AzureContainerInstanceJob(
aci_credentials=AzureContainerInstanceCredentials.load("MY_BLOCK_NAME"),
resource_group_name="your_resource_group",
subscription_id="YOUR_AZURE_SUBSCRIPTION_ID",
)
container_instance_job.save("aci-dev")
### Additional Considerations
- Ensure worker is correctly processing tasks: Verify that the worker is configured to execute each task separately. Multiple workers are not necessary but should handle scheduling correctly.
- Concurrency and Scaling within ACI: Make sure your Azure subscription and resource group settings allow for multiple concurrent container instances.
### Useful Documentation and Links
- Prefect Azure Documentation
- Prefect Deployment Guide
If these suggestions do not resolve your issue, consider checking the Prefect Discourse or Slack community for additional support.Paweł Biernat
05/29/2024, 11:16 AMprefect work-pool create --type azure-container-instance:push --provision-infra my-aci-pool
Do I still have to set up the aci-dev
container instance job? If yes, why?Marvin
05/29/2024, 11:16 AMpush
work pool with --provision-infra
, you do not need to manually set up the aci-dev
container instance job. The push-based work pool with infrastructure provisioning will handle the creation and management of Azure Container Instances for you.
Here's what happens when you use `--provision-infra`:
1. Automatic Infrastructure Management: This option tells Prefect to automatically provision the necessary infrastructure for executing your flow tasks. It handles creating and managing the Azure Container Instances without the need for additional manual setup.
2. Simplified Configuration: By using a push work pool with infrastructure provisioning, you've simplified the setup process, ensuring that tasks are distributed across multiple instances as required.
### Example Workflow with Push Work Pool
Let's ensure your setup is coherent with the push work pool:
1. Create a Work Pool:
sh
prefect work-pool create --type azure-container-instance:push --provision-infra my-aci-pool
2. Modify Task and Flow:
Since the infrastructure provisioning is handled automatically, you just need to focus on writing your tasks and flows.
python
from prefect import task, flow, get_run_logger
@task
def log_hello(name="Marvin"):
import time
logger = get_run_logger()
<http://logger.info|logger.info>(f"{name} said hello!")
time.sleep(100)
<http://logger.info|logger.info>("Done!")
return name
@flow
def log_hello_flow():
names = ["Marvin", "Alice", "Bob"]
log_hello.map(names) # Using .map to distribute tasks
if __name__ == "__main__":
log_hello_flow()
3. Deploy the Flow:
Deploy the flow using the configured work pool:
sh
prefect deployment build a_flow_module.py:log_hello_flow --name aci-dev --pool my-aci-pool
4. Start the Worker:
Start an Azure Container Instance worker to pull jobs from the work pool:
sh
prefect worker start -n my-aci-worker -p my-aci-pool
### Verification
1. Check Work Pool: Ensure your work pool is correctly set up and associated with the deployment.
2. Monitor Task Execution: Verify that tasks are being executed on separate Azure Container Instances by checking the Prefect UI or logs.
By using the push-based work pool with --provision-infra
, you leverage Prefect's capability to manage infrastructure automatically, reducing manual setup and complexity. This should resolve your concern about manually setting up the aci-dev
container instance job.Kevin Grismore
05/29/2024, 11:24 AMKevin Grismore
05/29/2024, 11:24 AMKevin Grismore
05/29/2024, 11:25 AMfrom prefect.deployments import run_deployment
Kevin Grismore
05/29/2024, 11:26 AMrun_deployment
to run another deployment's code in a different container instance and wait for it to finishKevin Grismore
05/29/2024, 11:26 AM.submit
on the task function many times with different parametersKevin Grismore
05/29/2024, 11:27 AMPaweł Biernat
05/29/2024, 11:28 AMPaweł Biernat
05/29/2024, 11:29 AMKevin Grismore
05/29/2024, 11:29 AMPaweł Biernat
05/29/2024, 11:31 AM.run_deployment
and AzureContainerInstanceJob(...).run()
? I guess the flow in the deployment can be more complicated, while a AzureContainerInstanceJob
is for simpler tasks?Paweł Biernat
05/29/2024, 11:32 AMKevin Grismore
05/29/2024, 11:32 AMKevin Grismore
05/29/2024, 11:33 AMPaweł Biernat
05/29/2024, 11:35 AMKevin Grismore
05/29/2024, 2:35 PM