Dijar Vrella
07/22/2024, 4:51 PMMarvin
07/22/2024, 4:51 PMpython
import asyncio
from prefect import flow, get_run_logger, task
from prefect_docker.containers import get_docker_container_logs
@task
async def stream_container_logs(container_name: str):
logger = get_run_logger()
async for log_line in get_docker_container_logs(container_name=container_name, stream=True):
<http://logger.info|logger.info>(log_line)
@flow(log_prints=True)
async def docker_logs_flow(container_name: str):
await stream_container_logs(container_name)
if __name__ == "__main__":
asyncio.run(docker_logs_flow(container_name="your_container_name"))
In the code above:
1. The stream_container_logs
task uses get_docker_container_logs
with stream=True
to fetch logs continuously.
2. The logger
obtained via get_run_logger()
logs the incoming log lines.
3. The docker_logs_flow
flow calls the task and can be run with asyncio.run
.
You can check out these resources for more details:
- Prefect Docker Containers Documentation: https://docs.prefect.io/latest/integrations/prefect-docker/containers/
- Prefect Logging Guide: https://docs.prefect.io/latest/guides/logs/
- Prefect Async Code Example: https://raw.githubusercontent.com/zzstoatzz/prefect-code-examples/main/examples/flows/async_basic.pyDijar Vrella
07/22/2024, 5:14 PMMarvin
07/22/2024, 5:14 PMdocker_logs_flow
so that Prefect knows which Docker container's logs to stream. Here's how you can pass the container name:
python
import asyncio
from prefect import flow, get_run_logger, task
from prefect_docker.containers import get_docker_container_logs
@task
async def stream_container_logs(container_name: str):
logger = get_run_logger()
async for log_line in get_docker_container_logs(container_name=container_name, stream=True):
<http://logger.info|logger.info>(log_line)
@flow(log_prints=True)
async def docker_logs_flow(container_name: str):
await stream_container_logs(container_name)
if __name__ == "__main__":
# Pass the container name as a parameter to the flow
container_name = "your_container_name" # Replace with your specific container name
asyncio.run(docker_logs_flow(container_name=container_name))
In the docker_logs_flow(container_name)
call, replace "your_container_name"
with the name of the Docker container you want Prefect to stream logs from.
This way, the docker_logs_flow
will receive the container name and pass it to the stream_container_logs
task, which then uses it to fetch and stream the logs.Dijar Vrella
07/22/2024, 5:16 PMMarvin
07/22/2024, 5:16 PMget_docker_container_logs
function requires the container ID (container_id
) rather than the container name (container_name
). Let's update the code to use the container ID:
python
import asyncio
from prefect import flow, get_run_logger, task
from prefect_docker.containers import get_docker_container_logs
@task
async def stream_container_logs(container_id: str):
logger = get_run_logger()
async for log_line in get_docker_container_logs(container_id=container_id, stream=True):
<http://logger.info|logger.info>(log_line)
@flow(log_prints=True)
async def docker_logs_flow(container_id: str):
await stream_container_logs(container_id)
if __name__ == "__main__":
# Pass the container ID as a parameter to the flow
container_id = "your_container_id" # Replace with your specific container ID
asyncio.run(docker_logs_flow(container_id=container_id))
In the docker_logs_flow(container_id)
call, replace "your_container_id"
with the ID of the Docker container you want Prefect to stream logs from.
For more information, you can refer to the Prefect Docker Containers Documentation.Dijar Vrella
07/22/2024, 5:22 PMMarvin
07/22/2024, 5:22 PMget_docker_container_logs
in Prefect is not directly iterable in an async for
loop like typical async generators. We need to handle the coroutine properly.
Below is the revised code with proper handling:
python
import asyncio
from prefect import flow, get_run_logger, task
from prefect_docker.containers import get_docker_container_logs
@task
async def fetch_container_logs(container_id: str):
# Fetch logs as a complete coroutine
logs = await get_docker_container_logs(container_id=container_id, stream=True)
return logs
@task
def stream_logs(logs: str):
logger = get_run_logger()
for log_line in logs.splitlines():
<http://logger.info|logger.info>(log_line)
@flow(log_prints=True)
async def docker_logs_flow(container_id: str):
raw_logs = await fetch_container_logs(container_id)
stream_logs(raw_logs)
if __name__ == "__main__":
# Pass the container ID as a parameter to the flow
container_id = "your_container_id" # Replace with your specific container ID
asyncio.run(docker_logs_flow(container_id=container_id))
1. fetch_container_logs
task fetches all logs as a single coroutine.
2. stream_logs
task processes and logs each line.
3. docker_logs_flow
orchestrates these tasks.
Replace "your_container_id"
with the actual container ID you want to fetch logs from. This should solve the coroutine handling and logging issues.Dijar Vrella
07/22/2024, 5:30 PMMarvin
07/22/2024, 5:30 PM192928.871 | ERROR | Task run 'get_docker_container_logs-0' - Task run failed with exception: AttributeError("'CancellableStream' object has no attribute 'decode'") - Retries are exhausted192928.882 | ERROR | Task run 'get_docker_container_logs-0' - Finished in state Failed("Task run encountered an exception AttributeError: 'CancellableStream' object has no attribute 'decode'") 192928.882 | ERROR | Task run 'fetch_container_logs-0' - Task run failed with exception: AttributeError("'CancellableStream' object has no attribute 'decode'") - Retries are exhausted AttributeError: 'CancellableStream' object has no attribute 'decode' 192928.893 | ERROR | Task run 'fetch_container_logs-0' - Finished in state Failed("Task run encountered an exception AttributeError: 'CancellableStream' object has no attribute 'decode'") 192928.893 | ERROR | Flow run 'fluorescent-goldfish' - Encountered exception during execution: AttributeError("'CancellableStream' object has no attribute 'decode'") 192928.908 | ERROR | Flow run 'fluorescent-goldfish' - Finished in state Failed("Flow run encountered an exception: AttributeError: 'CancellableStream' object has no attribute 'decode'") 19:29
Marvin
07/22/2024, 5:31 PMget_docker_container_logs
function returns a CancellableStream
object which we need to handle properly. This changes our approach slightly. Let's adjust the function to handle a streaming log correctly using an async for
loop.
Here is the revised code:
python
import asyncio
from prefect import flow, get_run_logger, task
from prefect_docker.containers import get_docker_container_logs
@task
async def stream_container_logs(container_id: str):
logger = get_run_logger()
async for log_line in (await get_docker_container_logs(container_id=container_id, stream=True)):
<http://logger.info|logger.info>(log_line)
@flow(log_prints=True)
async def docker_logs_flow(container_id: str):
await stream_container_logs(container_id)
if __name__ == "__main__":
# Pass the container ID as a parameter to the flow
container_id = "your_container_id" # Replace with your specific container ID
asyncio.run(docker_logs_flow(container_id=container_id))
Points to note:
1. We await the call to get_docker_container_logs
to get the stream object.
2. Then, we use async for
to iterate over the stream.
Replace "your_container_id"
with the actual Docker container ID. This setup should properly stream logs and handle them correctly with the Prefect logger.Dijar Vrella
07/22/2024, 5:35 PMMarvin
07/22/2024, 5:35 PMCancellableStream
object returned by get_docker_container_logs
requires specific handling, as it doesn't directly yield strings that can be decoded or logged.
To properly handle the CancellableStream
, we need to iterate over its content correctly. The CancellableStream
most likely yields byte chunks which can be converted to strings.
Here's the updated code to handle this scenario appropriately:
python
import asyncio
from prefect import flow, get_run_logger, task
from prefect_docker.containers import get_docker_container_logs
@task
async def stream_container_logs(container_id: str):
logger = get_run_logger()
async for log_chunk in (await get_docker_container_logs(container_id=container_id, stream=True)):
log_line = log_chunk.decode("utf-8")
<http://logger.info|logger.info>(log_line)
@flow(log_prints=True)
async def docker_logs_flow(container_id: str):
await stream_container_logs(container_id)
if __name__ == "__main__":
# Pass the container ID as a parameter to the flow
container_id = "your_container_id" # Replace with your specific container ID
asyncio.run(docker_logs_flow(container_id=container_id))
In this version:
1. We decode each log_chunk
yielded by the CancellableStream
using utf-8
.
2. Each decoded line is logged using Prefect's logger.
Make sure to replace "your_container_id"
with the actual Docker container ID. This should correctly handle the streaming logs and handle CancellableStream
objects.Dijar Vrella
07/22/2024, 6:47 PMMarvin
07/22/2024, 6:48 PMCancellableStream
object, which isn't directly consumable in a list comprehension or in a simple for
loop. Instead, you need to iterate over the stream as it yields byte strings.
Let's break it down into correctly handling the CancellableStream
object and decoding the chunks properly. Here's the corrected version:
python
import asyncio
from prefect import flow, get_run_logger, task
from prefect_docker.containers import get_docker_container_logs
@task
async def fetch_container_logs(container_id: str):
logs = []
async for log_chunk in (await get_docker_container_logs(container_id=container_id, stream=True)):
logs.append(log_chunk)
return logs
@task
def stream_logs(logs: list):
logger = get_run_logger()
for log in logs:
decoded_log = log.decode('utf-8') # Decode using utf-8
<http://logger.info|logger.info>(decoded_log)
@flow(log_prints=True)
async def docker_logs_flow(container_id: str):
logs = await fetch_container_logs(container_id) # Gather logs
stream_logs(logs) # Process the retrieved logs
if __name__ == "__main__":
# Pass the container ID as a parameter to the flow
container_id = "your_container_id" # Replace with your specific container ID
asyncio.run(docker_logs_flow(container_id=container_id))
In this version:
1. `fetch_container_logs`: This task collects all log chunks asynchronously.
2. `stream_logs`: This task decodes each log chunk and logs it.
3. `docker_logs_flow`: This flow coordinates both tasks.
Replace "your_container_id"
with the actual Docker container ID. This setup ensures that you correctly handle the CancellableStream
and decode each log chunk properly.