https://prefect.io logo
Title
j

jack

11/11/2021, 5:26 PM
Anybody else using
FlowRunView
to access the logs of a flow run? Sometimes it works for me, but other times I keep seeing stale logs.
Here is my example flow:
from time import sleep

from prefect import Flow, task
from prefect.run_configs import LocalRun
import prefect


@task
def do_something():
    for i in range(1000):
        <http://prefect.context.logger.info|prefect.context.logger.info>(f"Logging {i}")
        sleep(2)


FLOW_NAME = "simple example"
with Flow(FLOW_NAME, run_config=LocalRun()) as flow:
    do_something()


flow.register("Prefect_Tutorials")
k

Kevin Kho

11/11/2021, 5:27 PM
Could you show me how you use the FlowRunView?
j

jack

11/11/2021, 5:29 PM
Yes. here is how I'm calling the FlowRunView:
while True:
    output = []
    flow_run = FlowRunView.from_flow_run_id(flow_run_id)
    for log in flow_run.get_latest().get_logs():
        output.append(log.message)

    print("\n  ".join(output))
    time.sleep(1)
This is the output I'm seeing:
Beginning Flow run for 'simple example'
11/11/2021 05:30:36 PM Submitted for execution: PID: 34084
  Beginning Flow run for 'simple example'
11/11/2021 05:30:38 PM Submitted for execution: PID: 34084
  Beginning Flow run for 'simple example'
11/11/2021 05:30:39 PM Submitted for execution: PID: 34084
  Beginning Flow run for 'simple example'
11/11/2021 05:30:41 PM Submitted for execution: PID: 34084
  Beginning Flow run for 'simple example'
11/11/2021 05:30:42 PM Submitted for execution: PID: 34084
  Beginning Flow run for 'simple example'
11/11/2021 05:30:44 PM Submitted for execution: PID: 34084
  Beginning Flow run for 'simple example'
11/11/2021 05:30:45 PM Submitted for execution: PID: 34084
  Beginning Flow run for 'simple example'
11/11/2021 05:30:47 PM Submitted for execution: PID: 34084
  Beginning Flow run for 'simple example'
11/11/2021 05:30:48 PM Submitted for execution: PID: 34084
  Beginning Flow run for 'simple example'
k

Kevin Kho

11/11/2021, 5:34 PM
Maybe you can try during the
watch_flow_run
method here .
stream_logs
is true by default.
z

Zanie

11/11/2021, 5:34 PM
Yeah I'd recommend using
watch_flow_run
or the
stream_flow_run_logs
functions
def stream_flow_run_logs(flow_run_id: str) -> None:
    """
    Basic wrapper for `watch_flow_run` to print the logs of the run

    EXPERIMENTAL: This interface is experimental and subject to change
    """
    for log in watch_flow_run(flow_run_id):
        level_name = logging.getLevelName(log.level)
        timestamp = log.timestamp.in_tz(tz="local")
        # Uses `print` instead of the logger to prevent duplicate timestamps
        print(
            f"{timestamp:%H:%M:%S} | {level_name:<7} | {log.message}",
        )
get_logs
requires a start timestamp if you don't want to retrieve duplicate logs
j

jack

11/11/2021, 5:35 PM
I don't mind retrieving duplicate logs
I've been avoiding watch_flow_run and stream_flow_run_logs because in some cases I will want to wait until the flow completes before logging its output
z

Zanie

11/11/2021, 5:38 PM
I see. Looking at those implementations may be helpful yet.
The logs query is as such:
logs_query = {
            with_args(
                "logs",
                {
                    "order_by": {EnumValue("timestamp"): EnumValue("asc")},
                    "where": {
                        "_and": [
                            {"timestamp": {"_lte": self.updated_at.isoformat()}},
                            (
                                {"timestamp": {"_gt": start_time.isoformat()}}
                                if start_time
                                else {}
                            ),
                        ]
                    },
                },
            ): {"timestamp": True, "message": True, "level": True}
        }
j

jack

11/11/2021, 5:38 PM
Here is what I'm seeing in the browser:
But from my python code, it still only gives me those first two lines each time I query.
z

Zanie

11/11/2021, 5:39 PM
I see. It seems like
updated_at
is not correct somehow.
I think what's happening here is that the flow run has not changed state so
updated_at
is not reset.
j

jack

11/11/2021, 5:43 PM
If I wait till the flow completes, then I get a complete set of logs. But in some case I want to see them incrementally, as they come in instead of all at the end.
z

Zanie

11/11/2021, 5:44 PM
j

jack

11/11/2021, 5:50 PM
That looks like it will do it ^^
Thanks for the help. based on your pull request I have a method that is doing what I want
z

Zanie

11/11/2021, 6:46 PM
Great! I'll polish that up when I get a chance
🎉 1