Dijar Vrella
07/22/2024, 4:51 PMMarvin
07/22/2024, 4:51 PMpython
import asyncio
from prefect import flow, get_run_logger, task
from prefect_docker.containers import get_docker_container_logs
@task
async def stream_container_logs(container_name: str):
logger = get_run_logger()
async for log_line in get_docker_container_logs(container_name=container_name, stream=True):
<http://logger.info|logger.info>(log_line)
@flow(log_prints=True)
async def docker_logs_flow(container_name: str):
await stream_container_logs(container_name)
if __name__ == "__main__":
asyncio.run(docker_logs_flow(container_name="your_container_name"))
In the code above:
1. The stream_container_logs task uses get_docker_container_logs with stream=True to fetch logs continuously.
2. The logger obtained via get_run_logger() logs the incoming log lines.
3. The docker_logs_flow flow calls the task and can be run with asyncio.run.
You can check out these resources for more details:
- Prefect Docker Containers Documentation: https://docs.prefect.io/latest/integrations/prefect-docker/containers/
- Prefect Logging Guide: https://docs.prefect.io/latest/guides/logs/
- Prefect Async Code Example: https://raw.githubusercontent.com/zzstoatzz/prefect-code-examples/main/examples/flows/async_basic.pyDijar Vrella
07/22/2024, 5:14 PMMarvin
07/22/2024, 5:14 PMdocker_logs_flow so that Prefect knows which Docker container's logs to stream. Here's how you can pass the container name:
python
import asyncio
from prefect import flow, get_run_logger, task
from prefect_docker.containers import get_docker_container_logs
@task
async def stream_container_logs(container_name: str):
logger = get_run_logger()
async for log_line in get_docker_container_logs(container_name=container_name, stream=True):
<http://logger.info|logger.info>(log_line)
@flow(log_prints=True)
async def docker_logs_flow(container_name: str):
await stream_container_logs(container_name)
if __name__ == "__main__":
# Pass the container name as a parameter to the flow
container_name = "your_container_name" # Replace with your specific container name
asyncio.run(docker_logs_flow(container_name=container_name))
In the docker_logs_flow(container_name) call, replace "your_container_name" with the name of the Docker container you want Prefect to stream logs from.
This way, the docker_logs_flow will receive the container name and pass it to the stream_container_logs task, which then uses it to fetch and stream the logs.Dijar Vrella
07/22/2024, 5:16 PMMarvin
07/22/2024, 5:16 PMget_docker_container_logs function requires the container ID (container_id) rather than the container name (container_name). Let's update the code to use the container ID:
python
import asyncio
from prefect import flow, get_run_logger, task
from prefect_docker.containers import get_docker_container_logs
@task
async def stream_container_logs(container_id: str):
logger = get_run_logger()
async for log_line in get_docker_container_logs(container_id=container_id, stream=True):
<http://logger.info|logger.info>(log_line)
@flow(log_prints=True)
async def docker_logs_flow(container_id: str):
await stream_container_logs(container_id)
if __name__ == "__main__":
# Pass the container ID as a parameter to the flow
container_id = "your_container_id" # Replace with your specific container ID
asyncio.run(docker_logs_flow(container_id=container_id))
In the docker_logs_flow(container_id) call, replace "your_container_id" with the ID of the Docker container you want Prefect to stream logs from.
For more information, you can refer to the Prefect Docker Containers Documentation.Dijar Vrella
07/22/2024, 5:22 PMMarvin
07/22/2024, 5:22 PMget_docker_container_logs in Prefect is not directly iterable in an async for loop like typical async generators. We need to handle the coroutine properly.
Below is the revised code with proper handling:
python
import asyncio
from prefect import flow, get_run_logger, task
from prefect_docker.containers import get_docker_container_logs
@task
async def fetch_container_logs(container_id: str):
# Fetch logs as a complete coroutine
logs = await get_docker_container_logs(container_id=container_id, stream=True)
return logs
@task
def stream_logs(logs: str):
logger = get_run_logger()
for log_line in logs.splitlines():
<http://logger.info|logger.info>(log_line)
@flow(log_prints=True)
async def docker_logs_flow(container_id: str):
raw_logs = await fetch_container_logs(container_id)
stream_logs(raw_logs)
if __name__ == "__main__":
# Pass the container ID as a parameter to the flow
container_id = "your_container_id" # Replace with your specific container ID
asyncio.run(docker_logs_flow(container_id=container_id))
1. fetch_container_logs task fetches all logs as a single coroutine.
2. stream_logs task processes and logs each line.
3. docker_logs_flow orchestrates these tasks.
Replace "your_container_id" with the actual container ID you want to fetch logs from. This should solve the coroutine handling and logging issues.Dijar Vrella
07/22/2024, 5:30 PMMarvin
07/22/2024, 5:30 PM192928.871 | ERROR | Task run 'get_docker_container_logs-0' - Task run failed with exception: AttributeError("'CancellableStream' object has no attribute 'decode'") - Retries are exhausted192928.882 | ERROR | Task run 'get_docker_container_logs-0' - Finished in state Failed("Task run encountered an exception AttributeError: 'CancellableStream' object has no attribute 'decode'") 192928.882 | ERROR | Task run 'fetch_container_logs-0' - Task run failed with exception: AttributeError("'CancellableStream' object has no attribute 'decode'") - Retries are exhausted AttributeError: 'CancellableStream' object has no attribute 'decode' 192928.893 | ERROR | Task run 'fetch_container_logs-0' - Finished in state Failed("Task run encountered an exception AttributeError: 'CancellableStream' object has no attribute 'decode'") 192928.893 | ERROR | Flow run 'fluorescent-goldfish' - Encountered exception during execution: AttributeError("'CancellableStream' object has no attribute 'decode'") 192928.908 | ERROR | Flow run 'fluorescent-goldfish' - Finished in state Failed("Flow run encountered an exception: AttributeError: 'CancellableStream' object has no attribute 'decode'") 19:29
Marvin
07/22/2024, 5:31 PMget_docker_container_logs function returns a CancellableStream object which we need to handle properly. This changes our approach slightly. Let's adjust the function to handle a streaming log correctly using an async for loop.
Here is the revised code:
python
import asyncio
from prefect import flow, get_run_logger, task
from prefect_docker.containers import get_docker_container_logs
@task
async def stream_container_logs(container_id: str):
logger = get_run_logger()
async for log_line in (await get_docker_container_logs(container_id=container_id, stream=True)):
<http://logger.info|logger.info>(log_line)
@flow(log_prints=True)
async def docker_logs_flow(container_id: str):
await stream_container_logs(container_id)
if __name__ == "__main__":
# Pass the container ID as a parameter to the flow
container_id = "your_container_id" # Replace with your specific container ID
asyncio.run(docker_logs_flow(container_id=container_id))
Points to note:
1. We await the call to get_docker_container_logs to get the stream object.
2. Then, we use async for to iterate over the stream.
Replace "your_container_id" with the actual Docker container ID. This setup should properly stream logs and handle them correctly with the Prefect logger.Dijar Vrella
07/22/2024, 5:35 PMMarvin
07/22/2024, 5:35 PMCancellableStream object returned by get_docker_container_logs requires specific handling, as it doesn't directly yield strings that can be decoded or logged.
To properly handle the CancellableStream, we need to iterate over its content correctly. The CancellableStream most likely yields byte chunks which can be converted to strings.
Here's the updated code to handle this scenario appropriately:
python
import asyncio
from prefect import flow, get_run_logger, task
from prefect_docker.containers import get_docker_container_logs
@task
async def stream_container_logs(container_id: str):
logger = get_run_logger()
async for log_chunk in (await get_docker_container_logs(container_id=container_id, stream=True)):
log_line = log_chunk.decode("utf-8")
<http://logger.info|logger.info>(log_line)
@flow(log_prints=True)
async def docker_logs_flow(container_id: str):
await stream_container_logs(container_id)
if __name__ == "__main__":
# Pass the container ID as a parameter to the flow
container_id = "your_container_id" # Replace with your specific container ID
asyncio.run(docker_logs_flow(container_id=container_id))
In this version:
1. We decode each log_chunk yielded by the CancellableStream using utf-8.
2. Each decoded line is logged using Prefect's logger.
Make sure to replace "your_container_id" with the actual Docker container ID. This should correctly handle the streaming logs and handle CancellableStream objects.Dijar Vrella
07/22/2024, 6:47 PMMarvin
07/22/2024, 6:48 PMCancellableStream object, which isn't directly consumable in a list comprehension or in a simple for loop. Instead, you need to iterate over the stream as it yields byte strings.
Let's break it down into correctly handling the CancellableStream object and decoding the chunks properly. Here's the corrected version:
python
import asyncio
from prefect import flow, get_run_logger, task
from prefect_docker.containers import get_docker_container_logs
@task
async def fetch_container_logs(container_id: str):
logs = []
async for log_chunk in (await get_docker_container_logs(container_id=container_id, stream=True)):
logs.append(log_chunk)
return logs
@task
def stream_logs(logs: list):
logger = get_run_logger()
for log in logs:
decoded_log = log.decode('utf-8') # Decode using utf-8
<http://logger.info|logger.info>(decoded_log)
@flow(log_prints=True)
async def docker_logs_flow(container_id: str):
logs = await fetch_container_logs(container_id) # Gather logs
stream_logs(logs) # Process the retrieved logs
if __name__ == "__main__":
# Pass the container ID as a parameter to the flow
container_id = "your_container_id" # Replace with your specific container ID
asyncio.run(docker_logs_flow(container_id=container_id))
In this version:
1. `fetch_container_logs`: This task collects all log chunks asynchronously.
2. `stream_logs`: This task decodes each log chunk and logs it.
3. `docker_logs_flow`: This flow coordinates both tasks.
Replace "your_container_id" with the actual Docker container ID. This setup ensures that you correctly handle the CancellableStream and decode each log chunk properly.