Hi! One of my flows uses GQL to pull logs from ano...
# prefect-community
k
Hi! One of my flows uses GQL to pull logs from another flow. The log pulling flow is directly triggered by the flow that produces the log to be pulled. I continue to get this error for the log pulling flow:
Copy code
{'errors': [{'path': ['flow_run'], 'message': 'Operation timed out', 'extensions': {'code': 'API_ERROR'}}], 'data': None}
When re-running a flow at a later time, it runs successfully. Are there any docs that can provide info on API limits?
a
Before we dive into any technicalities related to GraphQL, I just want to make sure you know that there is a direct way of pulling child flow run logs from a parent flow after you triggered it using:
Copy code
wait_for_flow_run(stream_logs=True)
here is a full example:
Copy code
from prefect import Flow
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run

with Flow("parent_flow") as flow:
    child_flow_run_id = create_flow_run(
        flow_name="child_flow_name", run_name="custom_run_name"
    )
    child_flowrunview = wait_for_flow_run(
        child_flow_run_id, raise_final_state=True, stream_logs=True
    )
the way it works is that it uses
FlowRunView
that has a method called `get_logs`:
Copy code
from prefect.backend.flow_run import FlowRunView

fr_view = FlowRunView(flow_run_id="uuid")
fr_view.get_logs()
k
Ah I actually did not know that! I have a couple questions on this: 1. What form would the logs be in if I pull it with the method you mentioned above? My current flow depends on the flow being a JSON (which I then convert to a dataframe) 2. When I use flow_name to trigger a flow, is that the same thing as using version_group_id? In other words, does it run the latest version of the flow with that name?
a
the easiest way is to try it out 🙂 the format is:
Copy code
FlowRunLog(timestamp=DateTime(2022, 3, 15, 10, 3, 15, 776868, tzinfo=Timezone('+00:00')), level=20, message='Submitted for execution: Container ID: 52383f95621d6d2341a4f53cc95de5be63e6031218e6366074cc74adbdcc1859')
I gave you a wrong syntax, here is the right one if you want to try it:
Copy code
from prefect.backend.flow_run import FlowRunView

flow_run = FlowRunView.from_flow_run_id(
    "9549c55f-1afe-45b6-afb3-1b1ed32e7c3f"
)
logs = flow_run.get_logs()
if you convert it to a dataframe, then this structured format could be perhaps even easier? because you have native Python objects from it, e.g.
Copy code
x = logs[0]
x.timestamp # gives you a nice DateTime object
x.message
x.level
how do you trigger your flow? normally when you do that using create_flow_run, it returns flow run ID that you can use here directly
k
I initially have a
produce_logs
parent flow set on a regular schedule, which runs first, then triggers the
pull_logs
child flow. I think I should change my perspective and have the
pull_logs
as a parent flow which has a wait_for_flow_run to wait for the
produce_logs
child flow to run first.
👍 1