<@ULVA73B9P> how do I add env variables to the fol...
# marvin-in-the-wild
v
@Marvin how do I add env variables to the following code that I can then read in my docker container: docker_container_block = DockerContainer.load("my-docker-block") container_result = docker_container_block.run()
m
To add environment variables to your
DockerContainer
block, you can set them directly in the DockerContainer initialization. Here is an example of how it can be done:
Copy code
python
from prefect.infrastructure import DockerContainer

# Load your Docker container block
docker_container_block = DockerContainer.load("my-docker-block")

# Add environment variables
docker_container_block.env = {"MY_ENV_VAR": "value", "ANOTHER_ENV_VAR": "another_value"}

# Run the container
container_result = docker_container_block.run()
This code snippet demonstrates how to add environment variables to your Docker container by modifying the
env
attribute of the
DockerContainer
object before running it. For more detailed information, you can refer to these sources: - AWS ECS Integration - Overriding Job Variables If you need more examples or details, feel free to ask.
v
@Marvin that works but then I'm overwriting the env's already configured in the block. How can I add to the existing env?
m
To append new environment variables to the existing ones in your
DockerContainer
block, you can first retrieve the current environment variables and update them with the new ones. Here's how you can do it:
Copy code
python
from prefect.infrastructure import DockerContainer

# Load your Docker container block
docker_container_block = DockerContainer.load("my-docker-block")

# Retrieve the existing environment variables
existing_env = docker_container_block.env or {}

# Define new environment variables to add
new_env = {"MY_ENV_VAR": "value", "ANOTHER_ENV_VAR": "another_value"}

# Update the existing environment variables with the new ones
existing_env.update(new_env)

# Set the updated environment variables back to the Docker container block
docker_container_block.env = existing_env

# Run the container
container_result = docker_container_block.run()
By retrieving the current environment variables using
docker_container_block.env
, updating them with the new ones using
update()
, and then reassigning them back to
docker_container_block.env
, you can effectively add new environment variables without overwriting the existing ones. For more detailed information, you can refer to this source: - Dockerizing Python Applications Feel free to ask if you need further assistance!
v
@Marvin can I run a task from within a task?
m
Yes, in Prefect, you can call a task from within another task. This feature allows you to build more complex workflows by nesting tasks. Here’s an example to illustrate how to do this:
Copy code
python
from prefect import task, flow

@task
def my_task():
    print("Hello, I'm a task")

@task(log_prints=True)
def my_parent_task():
    my_task()

@flow
def my_flow():
    my_parent_task()

# Run the flow
if __name__ == "__main__":
    my_flow()
In this example: -
my_task
is a simple task that prints a message. -
my_parent_task
is another task that calls
my_task()
. -
my_flow
is a flow that orchestrates the execution by calling
my_parent_task()
. By organizing your tasks this way, you can build modular and reusable components in your workflows. For more detailed information, you can refer to these sources: - Tasks Documentation - Submitting Tasks Examples Feel free to ask if you have more questions!
v
@Marvin what is a good way to propagate errors from inside the container to the task so if the container has an error, so does the task? (without using prefect_docker package btw) I'm asking because I have task configured the task with log_prints=True so the docker container out is printed correctly in the UI. The only issue is that everything is printed as INFO severity. Even though in the container I'm pringint everything with {severity_level} | message. So it comes out like this: "SUCCESS | Retrieved 10 jobs from DB" But in the prefect UI this is interpreted as being INFO just like all other messages
m
To propagate errors from inside the container to the Prefect task, you can enhance your task to monitor the standard output and error streams and raise an exception if an error is detected. This approach ensures that the task in Prefect reflects the error status if the container encounters an issue. Here's an example of how to achieve this:
Copy code
python
from prefect import task, flow
import docker

@task(log_prints=True)
def run_container_with_error_handling(image_name: str):
    client = docker.from_env()
    
    try:
        # Run the container
        container = client.containers.run(image_name, stdout=True, stderr=True, detach=True)
        
        # Monitor the logs
        for log in container.logs(stream=True):
            log_message = log.decode("utf-8").strip()
            print(log_message)
            
            # Check for severity in log message and raise an exception if it indicates an error
            if "ERROR" in log_message or "CRITICAL" in log_message or "WARNING" in log_message:
                raise Exception(f"Container log indicates an error: {log_message}")
        
        # Wait for the container to finish
        exit_code = container.wait()
        
        # Check the exit code to handle non-zero exit status
        if exit_code['StatusCode'] != 0:
            raise Exception(f"Container exited with non-zero status code: {exit_code['StatusCode']}")
        
    except docker.errors.ContainerError as e:
        raise Exception(f"Container error: {str(e)}")
    finally:
        container.remove()

@flow
def my_flow():
    run_container_with_error_handling("my-docker-image")

if __name__ == "__main__":
    my_flow()
Explanation: 1. Docker Client Initialization: Initialize a Docker client to interact with the Docker daemon. 2. Run Container: Launch the container in detached mode, capturing both standard output and error streams. 3. Log Monitoring: Stream the container logs and print each log message. If a log message indicates an error (e.g., contains "ERROR", "CRITICAL", or "WARNING"), raise an exception to propagate the error to the Prefect task. 4. Exit Code Checking: Wait for the container to finish and check its exit code. If the exit code is non-zero, raise an exception to indicate a failure. 5. Exception Handling: Wrap the container run logic in a try-except block to handle Docker container errors gracefully. For more detailed information, you can refer to these examples: - Basic Task Example - AWS ECS Worker Integration Feel free to ask if you have more questions!
v
no this is bad because the containers are cleaned up after they run. And my containers run like this: docker_container_block = DockerContainer.load("docker-parser") container_result = docker_container_block.run()
@Marvin no this is bad because the containers are cleaned up after they run. And my containers run like this: docker_container_block = DockerContainer.load("docker-parser") container_result = docker_container_block.run()
m
Given your use case where containers are managed via the
DockerContainer
block, we need a way to capture and propagate errors without modifying how containers are launched and managed. You can use a monitoring loop that checks the logs of the running container and raises an error if specific conditions are met. Here's how you can adapt your Prefect task to handle this:
Copy code
python
from prefect.infrastructure.docker import DockerContainer
from prefect import task, flow
import time

@task(log_prints=True)
def run_and_monitor_container():
    # Load and run the Docker container block
    docker_container_block = DockerContainer.load("docker-parser")
    container_result = docker_container_block.run()
    
    # Extract the container ID from the result
    container_id = container_result.stdout.strip()  # Adjust based on actual DockerContainer output
    
    client = docker.from_env()
    container = client.containers.get(container_id)
    
    try:
        for log in container.logs(stream=True):
            log_message = log.decode("utf-8").strip()
            print(log_message)
            
            # Raise an exception if a log message indicates an error
            if "ERROR" in log_message or "CRITICAL" in log_message or "WARNING" in log_message:
                raise Exception(f"Container log indicates an error: {log_message}")
        
        # Wait for the container to finish
        exit_code = container.wait()
        
        # Check the exit code to handle non-zero exit status
        if exit_code['StatusCode'] != 0:
            raise Exception(f"Container exited with non-zero status code: {exit_code['StatusCode']}")
            
    except docker.errors.ContainerError as e:
        raise Exception(f"Container error: {str(e)}")

@flow
def my_flow():
    run_and_monitor_container()

if __name__ == "__main__":
    my_flow()
### Explanation: 1. Run the Docker Container Block: The container is started using the
DockerContainer
block. 2. Monitor Logs: After the container is started, retrieve the container ID and monitor its logs. 3. Error Detection: Check the container logs for specific error indicators and raise an exception if any are found. 4. Exit Code Handling: Check the container's exit code to determine if it finished successfully or with an error. This should help you propagate errors from the Docker container into your Prefect task while maintaining the use of the
DockerContainer
block without requiring any external cleanup logic. Feel free to adjust based on the actual output format of your specific
DockerContainer
run method. For more details, you can check the Prefect Docker Container Documentation or the associated code examples.