Nico Neumann
07/07/2023, 8:39 AMprefect deployment run
) to watch the logs in the terminal?
In prefect 1 there was a watch flag: prefect run --watch
but I couldn’t find anything in prefect 2prefect flow-run logs
but I would need to wait then until the flow run is finishedChristopher Boyd
07/07/2023, 2:21 PMNico Neumann
07/07/2023, 2:44 PMimport asyncio
from prefect import get_client
from prefect.deployments import run_deployment
from prefect.server.schemas.core import FlowRun
from prefect.server.schemas.filters import LogFilter, LogFilterFlowRunId
async def print_flow_run_logs(flow_run_id: str):
log_filter = LogFilter(flow_run_id=LogFilterFlowRunId(any_=[flow_run_id]))
async with get_client() as client:
flow_run_logs = await client.read_logs(log_filter=log_filter)
for flow_run_log in flow_run_logs:
print(flow_run_log.message)
if __name__ == "__main__":
flow_run: FlowRun = run_deployment(name="my-flow")
asyncio.run(print_flow_run_logs(flow_run_id=flow_run.id))
Christopher Boyd
07/07/2023, 3:44 PMNico Neumann
07/07/2023, 3:48 PMChristopher Boyd
07/07/2023, 3:52 PMNico Neumann
07/07/2023, 3:53 PMasync def wait_until_flow_run_reached_terminal_state(flow_run_id: str):
while True:
async with get_client() as client:
flow_run = await client.read_flow_run(flow_run_id=flow_run_id)
if flow_run.state.type in TERMINAL_STATES:
return
asyncio.run(wait_until_flow_run_reached_terminal_state(flow_run_id=flow_run.id))
asyncio.run(print_flow_run_logs(flow_run_id=flow_run.id))
Unfortunately it does not help. The logs are still not always fully available. When run_deployment
finishes, it is already in a terminal state. And I can see all logs in Prefect Cloud while the flow runs, but when I try to retrieve them, I get a lot less.
I added a sleep 10 seconds command but this seems to be not enough. Will try to increase it to a minute then