Is there a way when starting a flow through the CL...
# ask-community
n
Is there a way when starting a flow through the CLI (
prefect deployment run
) to watch the logs in the terminal? In prefect 1 there was a watch flag:
prefect run --watch
but I couldn’t find anything in prefect 2
1
I only found
prefect flow-run logs
but I would need to wait then until the flow run is finished
c
Where are your flows running at ? Execution is streaming them to prefect cloud, and potentially the worker / agent if you have stream_output set to true, but they won’t stream to your local console if your agent / execution isn’t running there
n
I use prefect cloud and the agent/flows are running in AWS EKS
I tried to write a python function which executes the flow and then prints all the logs afterwards. This sometimes works and sometimes only shows not the full logs. I think it takes a while until all logs are saved and available.
Copy code
import asyncio

from prefect import get_client
from prefect.deployments import run_deployment
from prefect.server.schemas.core import FlowRun
from prefect.server.schemas.filters import LogFilter, LogFilterFlowRunId


async def print_flow_run_logs(flow_run_id: str):
    log_filter = LogFilter(flow_run_id=LogFilterFlowRunId(any_=[flow_run_id]))

    async with get_client() as client:
        flow_run_logs = await client.read_logs(log_filter=log_filter)

    for flow_run_log in flow_run_logs:
        print(flow_run_log.message)


if __name__ == "__main__":
    flow_run: FlowRun = run_deployment(name="my-flow")
    asyncio.run(print_flow_run_logs(flow_run_id=flow_run.id))
Is there a better method to start a flow and stream the prefect cloud logs to the machine where I start the flow?
c
Err, there’s no correlation that I’m aware of to stream locally, there is no connection made to your local agent
Objectively speaking, the flow run is in EKS, and streaming to the worker, and to Prefect cloud. To get the logs locally, would either be a push effort (from the container to your local terminal), or a pull effort (from your local terminal to the container) You could tail / stream the container, or the cloudwatch log group
or the prefect rest logs endpoint
but your local terminal never enters the picture for execution
n
Thanks for the insights! Seems a little complicated What about my code to wait until the flow run is finished and then just print the logs. Is there any way to get that working properly? Then I could at least print the logs, after my flow finished.
c
I think you’d probably need a timer or wait - the logs themselves are batched in either increments of size or increments of time based on the settings. From the code beying asyncio like that, I think you’d need to wait for a terminal state / condition for the flow to complete to get logs
so you’d need to filter for flow state first to ensure it has completed (or failed or whatever) before attempting to print, because there is a delay in aggregating the logs
🙏 1
it’s near real-time but not exactly
n
Thank you for your advice! I will try to implement a loop that waits until the flow is in a terminal state and only then pull the logs
🙌 1
catjam 1
@Christopher Boyd I added that function:
Copy code
async def wait_until_flow_run_reached_terminal_state(flow_run_id: str):
    while True:
        async with get_client() as client:
            flow_run = await client.read_flow_run(flow_run_id=flow_run_id)
            if flow_run.state.type in TERMINAL_STATES:
                return

asyncio.run(wait_until_flow_run_reached_terminal_state(flow_run_id=flow_run.id))
asyncio.run(print_flow_run_logs(flow_run_id=flow_run.id))
Unfortunately it does not help. The logs are still not always fully available. When
run_deployment
finishes, it is already in a terminal state. And I can see all logs in Prefect Cloud while the flow runs, but when I try to retrieve them, I get a lot less. I added a sleep 10 seconds command but this seems to be not enough. Will try to increase it to a minute then