<@ULVA73B9P> create me an async function that will...
# ask-marvin
d
@Marvin create me an async function that will use get_docker_container_logs and stream the logs from the container. Also the logger (logger = get_run_logger()) should not wait for the entire logs but should continuously print them in the UI
m
To create an async function that fetches and streams Docker container logs with Prefect and continuously prints them to the UI, we will need to combine the capabilities of fetching Docker logs and utilizing Prefect's logging system effectively. Below is an example code to achieve this:
Copy code
python
import asyncio
from prefect import flow, get_run_logger, task
from prefect_docker.containers import get_docker_container_logs

@task
async def stream_container_logs(container_name: str):
    logger = get_run_logger()

    async for log_line in get_docker_container_logs(container_name=container_name, stream=True):
        <http://logger.info|logger.info>(log_line)

@flow(log_prints=True)
async def docker_logs_flow(container_name: str):
    await stream_container_logs(container_name)

if __name__ == "__main__":
    asyncio.run(docker_logs_flow(container_name="your_container_name"))
In the code above: 1. The
stream_container_logs
task uses
get_docker_container_logs
with
stream=True
to fetch logs continuously. 2. The
logger
obtained via
get_run_logger()
logs the incoming log lines. 3. The
docker_logs_flow
flow calls the task and can be run with
asyncio.run
. You can check out these resources for more details: - Prefect Docker Containers Documentation: https://docs.prefect.io/latest/integrations/prefect-docker/containers/ - Prefect Logging Guide: https://docs.prefect.io/latest/guides/logs/ - Prefect Async Code Example: https://raw.githubusercontent.com/zzstoatzz/prefect-code-examples/main/examples/flows/async_basic.py
d
@Marvin so in this example you show should I pass as param the container name I want prefect to stream the logs?
m
Yes, you should pass the container name as a parameter to the
docker_logs_flow
so that Prefect knows which Docker container's logs to stream. Here's how you can pass the container name:
Copy code
python
import asyncio
from prefect import flow, get_run_logger, task
from prefect_docker.containers import get_docker_container_logs

@task
async def stream_container_logs(container_name: str):
    logger = get_run_logger()

    async for log_line in get_docker_container_logs(container_name=container_name, stream=True):
        <http://logger.info|logger.info>(log_line)

@flow(log_prints=True)
async def docker_logs_flow(container_name: str):
    await stream_container_logs(container_name)

if __name__ == "__main__":
    # Pass the container name as a parameter to the flow
    container_name = "your_container_name"  # Replace with your specific container name
    asyncio.run(docker_logs_flow(container_name=container_name))
In the
docker_logs_flow(container_name)
call, replace
"your_container_name"
with the name of the Docker container you want Prefect to stream logs from. This way, the
docker_logs_flow
will receive the container name and pass it to the
stream_container_logs
task, which then uses it to fetch and stream the logs.
d
@Marvin it failed with the following: prefect.exceptions.ParameterBindError: Error binding parameters for function 'get_docker_container_logs': missing a required argument: 'container_id'. Function 'get_docker_container_logs' has signature 'container_id: str, docker_host: Optional[prefect_docker.host.DockerHost] = None, **logs_kwargs: Dict[str, Any]) -> str' but received args: () and kwargs: ['container_name', 'stream']. 191601.500 | ERROR | Flow run 'pumpkin-deer' - Finished in state Failed("Flow run encountered an exception: ParameterBindError: Error binding parameters for function 'get_docker_container_logs': missing a required argument: 'container_id'.\nFunction 'get_docker_container_logs' has signature 'container_id: str, docker_host: Optional[prefect_docker.host.DockerHost] = None, **logs_kwargs: Dict[str, Any]) -> str' but received args: () and kwargs: ['container_name', 'stream'].") 191601.501 | ERROR | prefect.engine - Engine execution of flow run '5a9fad86-e977-4242-8233-6cefcde4f949' exited with unexpected exception Traceback (most recent call last):
m
The
get_docker_container_logs
function requires the container ID (
container_id
) rather than the container name (
container_name
). Let's update the code to use the container ID:
Copy code
python
import asyncio
from prefect import flow, get_run_logger, task
from prefect_docker.containers import get_docker_container_logs

@task
async def stream_container_logs(container_id: str):
    logger = get_run_logger()

    async for log_line in get_docker_container_logs(container_id=container_id, stream=True):
        <http://logger.info|logger.info>(log_line)

@flow(log_prints=True)
async def docker_logs_flow(container_id: str):
    await stream_container_logs(container_id)

if __name__ == "__main__":
    # Pass the container ID as a parameter to the flow
    container_id = "your_container_id"  # Replace with your specific container ID
    asyncio.run(docker_logs_flow(container_id=container_id))
In the
docker_logs_flow(container_id)
call, replace
"your_container_id"
with the ID of the Docker container you want Prefect to stream logs from. For more information, you can refer to the Prefect Docker Containers Documentation.
d
@Marvin I get the following error: 192143.027 | ERROR | Task run 'stream_container_logs-0' - Task run failed with exception: TypeError("'async for' requires an object with aiter method, got coroutine") - Retries are exhausted 192143.037 | ERROR | Task run 'stream_container_logs-0' - Finished in state Failed("Task run encountered an exception TypeError: 'async for' requires an object with aiter method, got coroutine") 192143.038 | ERROR | Flow run 'lurking-dragon' - Encountered exception during execution: TypeError("'async for' requires an object with aiter method, got coroutine") async for log_line in get_docker_container_logs(container_id=container_id, stream=True):
m
It seems the issue arises because
get_docker_container_logs
in Prefect is not directly iterable in an
async for
loop like typical async generators. We need to handle the coroutine properly. Below is the revised code with proper handling:
Copy code
python
import asyncio
from prefect import flow, get_run_logger, task
from prefect_docker.containers import get_docker_container_logs

@task
async def fetch_container_logs(container_id: str):
    # Fetch logs as a complete coroutine
    logs = await get_docker_container_logs(container_id=container_id, stream=True)
    return logs

@task
def stream_logs(logs: str):
    logger = get_run_logger()
    for log_line in logs.splitlines():
        <http://logger.info|logger.info>(log_line)

@flow(log_prints=True)
async def docker_logs_flow(container_id: str):
    raw_logs = await fetch_container_logs(container_id)
    stream_logs(raw_logs)

if __name__ == "__main__":
    # Pass the container ID as a parameter to the flow
    container_id = "your_container_id"  # Replace with your specific container ID
    asyncio.run(docker_logs_flow(container_id=container_id))
1.
fetch_container_logs
task fetches all logs as a single coroutine. 2.
stream_logs
task processes and logs each line. 3.
docker_logs_flow
orchestrates these tasks. Replace
"your_container_id"
with the actual container ID you want to fetch logs from. This should solve the coroutine handling and logging issues.
d
@Marvin ERROR | Task run 'get_docker_container_logs-0' - Task run failed with exception: AttributeError("'CancellableStream' object has no attribute 'decode'") - Retries are exhausted ERROR | Task run 'get_docker_container_logs-0' - Finished in state Failed("Task run encountered an exception AttributeError: 'CancellableStream' object has no attribute 'decode'") ERROR | Task run 'fetch_container_logs-0' - Task run failed with exception: AttributeError("'CancellableStream' object has no attribute 'decode'") - Retries are exhausted AttributeError: 'CancellableStream' object has no attribute 'decode' Task run 'fetch_container_logs-0' - Finished in state Failed("Task run encountered an exception AttributeError: 'CancellableStream' object has no attribute 'decode'") ERROR | Flow run 'fluorescent-goldfish' - Encountered exception during execution: AttributeError("'CancellableStream' object has no attribute 'decode'") ERROR | Flow run 'fluorescent-goldfish' - Finished in state Failed("Flow run encountered an exception: AttributeError: 'CancellableStream' object has no attribute 'decode'") ERROR | prefect.engine - Engine execution of flow run '3176fb8c-d5fc-466e-8de4-857d4edc94e6' exited with unexpected exception
m
Your message was too long by 46 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
192928.871 | ERROR | Task run 'get_docker_container_logs-0' - Task run failed with exception: AttributeError("'CancellableStream' object has no attribute 'decode'") - Retries are exhausted
192928.882 | ERROR | Task run 'get_docker_container_logs-0' - Finished in state Failed("Task run encountered an exception AttributeError: 'CancellableStream' object has no attribute 'decode'") 192928.882 | ERROR | Task run 'fetch_container_logs-0' - Task run failed with exception: AttributeError("'CancellableStream' object has no attribute 'decode'") - Retries are exhausted AttributeError: 'CancellableStream' object has no attribute 'decode' 192928.893 | ERROR | Task run 'fetch_container_logs-0' - Finished in state Failed("Task run encountered an exception AttributeError: 'CancellableStream' object has no attribute 'decode'") 192928.893 | ERROR | Flow run 'fluorescent-goldfish' - Encountered exception during execution: AttributeError("'CancellableStream' object has no attribute 'decode'") 192928.908 | ERROR | Flow run 'fluorescent-goldfish' - Finished in state Failed("Flow run encountered an exception: AttributeError: 'CancellableStream' object has no attribute 'decode'") 19:29
It looks like the
get_docker_container_logs
function returns a
CancellableStream
object which we need to handle properly. This changes our approach slightly. Let's adjust the function to handle a streaming log correctly using an
async for
loop. Here is the revised code:
Copy code
python
import asyncio
from prefect import flow, get_run_logger, task
from prefect_docker.containers import get_docker_container_logs

@task
async def stream_container_logs(container_id: str):
    logger = get_run_logger()

    async for log_line in (await get_docker_container_logs(container_id=container_id, stream=True)):
        <http://logger.info|logger.info>(log_line)

@flow(log_prints=True)
async def docker_logs_flow(container_id: str):
    await stream_container_logs(container_id)

if __name__ == "__main__":
    # Pass the container ID as a parameter to the flow
    container_id = "your_container_id"  # Replace with your specific container ID
    asyncio.run(docker_logs_flow(container_id=container_id))
Points to note: 1. We await the call to
get_docker_container_logs
to get the stream object. 2. Then, we use
async for
to iterate over the stream. Replace
"your_container_id"
with the actual Docker container ID. This setup should properly stream logs and handle them correctly with the Prefect logger.
d
@Marvin I get this: 193433.998 | ERROR | Flow run 'attentive-rook' - Encountered exception during execution: AttributeError("'CancellableStream' object has no attribute 'decode'") AttributeError: 'CancellableStream' object has no attribute 'decode' 193434.013 | ERROR | Flow run 'attentive-rook' - Finished in state Failed("Flow run encountered an exception: AttributeError: 'CancellableStream' object has no attribute 'decode'") 193434.014 | ERROR | prefect.engine - Engine execution of flow run '39b27857-7e65-49f3-9e66-20b607d8c95c' exited with unexpected exception AttributeError: 'CancellableStream' object has no attribute 'decode'
m
The error indicates that the
CancellableStream
object returned by
get_docker_container_logs
requires specific handling, as it doesn't directly yield strings that can be decoded or logged. To properly handle the
CancellableStream
, we need to iterate over its content correctly. The
CancellableStream
most likely yields byte chunks which can be converted to strings. Here's the updated code to handle this scenario appropriately:
Copy code
python
import asyncio
from prefect import flow, get_run_logger, task
from prefect_docker.containers import get_docker_container_logs

@task
async def stream_container_logs(container_id: str):
    logger = get_run_logger()

    async for log_chunk in (await get_docker_container_logs(container_id=container_id, stream=True)):
        log_line = log_chunk.decode("utf-8")
        <http://logger.info|logger.info>(log_line)

@flow(log_prints=True)
async def docker_logs_flow(container_id: str):
    await stream_container_logs(container_id)

if __name__ == "__main__":
    # Pass the container ID as a parameter to the flow
    container_id = "your_container_id"  # Replace with your specific container ID
    asyncio.run(docker_logs_flow(container_id=container_id))
In this version: 1. We decode each
log_chunk
yielded by the
CancellableStream
using
utf-8
. 2. Each decoded line is logged using Prefect's logger. Make sure to replace
"your_container_id"
with the actual Docker container ID. This should correctly handle the streaming logs and handle
CancellableStream
objects.
d
in this code: import asyncio from prefect import flow, get_run_logger, task from prefect_docker.containers import get_docker_container_logs @task def stream_logs(logs: list): logger = get_run_logger() decoded_logs = [log.decode('utf-8') for log in logs] # Decode the entire list of logs using utf-8 for decoded_log in decoded_logs: # Process the decoded log line logger.info(decoded_log) @flow(log_prints=True) async def docker_logs_flow(container_id: str): all_logs = await get_docker_container_logs(stream=True, container_id=container_id) # Use await log_rows = [log.decode('utf-8') for log in all_logs] # Decode each log line using utf-8 stream_logs(log_rows) # Process the retrieved logs here if name == "__main__": # Pass the container ID as a parameter to the flow container_id = "4f523f70fb51" # Replace with your specific container ID asyncio.run(docker_logs_flow(container_id=container_id)) @Marvin help me fix AttributeError: 'CancellableStream' object has no attribute 'decode'
m
The issue you're encountering stems from trying to decode the entire
CancellableStream
object, which isn't directly consumable in a list comprehension or in a simple
for
loop. Instead, you need to iterate over the stream as it yields byte strings. Let's break it down into correctly handling the
CancellableStream
object and decoding the chunks properly. Here's the corrected version:
Copy code
python
import asyncio
from prefect import flow, get_run_logger, task
from prefect_docker.containers import get_docker_container_logs

@task
async def fetch_container_logs(container_id: str):
    logs = []
    async for log_chunk in (await get_docker_container_logs(container_id=container_id, stream=True)):
        logs.append(log_chunk)
    return logs

@task
def stream_logs(logs: list):
    logger = get_run_logger()
    for log in logs:
        decoded_log = log.decode('utf-8')  # Decode using utf-8
        <http://logger.info|logger.info>(decoded_log)

@flow(log_prints=True)
async def docker_logs_flow(container_id: str):
    logs = await fetch_container_logs(container_id)  # Gather logs
    stream_logs(logs)  # Process the retrieved logs

if __name__ == "__main__":
    # Pass the container ID as a parameter to the flow
    container_id = "your_container_id"  # Replace with your specific container ID
    asyncio.run(docker_logs_flow(container_id=container_id))
In this version: 1. `fetch_container_logs`: This task collects all log chunks asynchronously. 2. `stream_logs`: This task decodes each log chunk and logs it. 3. `docker_logs_flow`: This flow coordinates both tasks. Replace
"your_container_id"
with the actual Docker container ID. This setup ensures that you correctly handle the
CancellableStream
and decode each log chunk properly.