jack
11/11/2021, 5:26 PMFlowRunView
to access the logs of a flow run?
Sometimes it works for me, but other times I keep seeing stale logs.from time import sleep
from prefect import Flow, task
from prefect.run_configs import LocalRun
import prefect
@task
def do_something():
for i in range(1000):
<http://prefect.context.logger.info|prefect.context.logger.info>(f"Logging {i}")
sleep(2)
FLOW_NAME = "simple example"
with Flow(FLOW_NAME, run_config=LocalRun()) as flow:
do_something()
flow.register("Prefect_Tutorials")
Kevin Kho
11/11/2021, 5:27 PMjack
11/11/2021, 5:29 PMwhile True:
output = []
flow_run = FlowRunView.from_flow_run_id(flow_run_id)
for log in flow_run.get_latest().get_logs():
output.append(log.message)
print("\n ".join(output))
time.sleep(1)
Beginning Flow run for 'simple example'
11/11/2021 05:30:36 PM Submitted for execution: PID: 34084
Beginning Flow run for 'simple example'
11/11/2021 05:30:38 PM Submitted for execution: PID: 34084
Beginning Flow run for 'simple example'
11/11/2021 05:30:39 PM Submitted for execution: PID: 34084
Beginning Flow run for 'simple example'
11/11/2021 05:30:41 PM Submitted for execution: PID: 34084
Beginning Flow run for 'simple example'
11/11/2021 05:30:42 PM Submitted for execution: PID: 34084
Beginning Flow run for 'simple example'
11/11/2021 05:30:44 PM Submitted for execution: PID: 34084
Beginning Flow run for 'simple example'
11/11/2021 05:30:45 PM Submitted for execution: PID: 34084
Beginning Flow run for 'simple example'
11/11/2021 05:30:47 PM Submitted for execution: PID: 34084
Beginning Flow run for 'simple example'
11/11/2021 05:30:48 PM Submitted for execution: PID: 34084
Beginning Flow run for 'simple example'
Kevin Kho
11/11/2021, 5:34 PMZanie
11/11/2021, 5:34 PMwatch_flow_run
or the stream_flow_run_logs
functions
def stream_flow_run_logs(flow_run_id: str) -> None:
"""
Basic wrapper for `watch_flow_run` to print the logs of the run
EXPERIMENTAL: This interface is experimental and subject to change
"""
for log in watch_flow_run(flow_run_id):
level_name = logging.getLevelName(log.level)
timestamp = log.timestamp.in_tz(tz="local")
# Uses `print` instead of the logger to prevent duplicate timestamps
print(
f"{timestamp:%H:%M:%S} | {level_name:<7} | {log.message}",
)
get_logs
requires a start timestamp if you don't want to retrieve duplicate logsjack
11/11/2021, 5:35 PMZanie
11/11/2021, 5:38 PMlogs_query = {
with_args(
"logs",
{
"order_by": {EnumValue("timestamp"): EnumValue("asc")},
"where": {
"_and": [
{"timestamp": {"_lte": self.updated_at.isoformat()}},
(
{"timestamp": {"_gt": start_time.isoformat()}}
if start_time
else {}
),
]
},
},
): {"timestamp": True, "message": True, "level": True}
}
jack
11/11/2021, 5:38 PMZanie
11/11/2021, 5:39 PMupdated_at
is not correct somehow.updated_at
is not reset.jack
11/11/2021, 5:43 PMZanie
11/11/2021, 5:44 PMjack
11/11/2021, 5:50 PMZanie
11/11/2021, 6:46 PM