Ken Nguyen
03/28/2022, 5:17 PM{'errors': [{'path': ['flow_run'], 'message': 'Operation timed out', 'extensions': {'code': 'API_ERROR'}}], 'data': None}
When re-running a flow at a later time, it runs successfully. Are there any docs that can provide info on API limits?Anna Geller
03/28/2022, 5:32 PMwait_for_flow_run(stream_logs=True)
from prefect import Flow
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
with Flow("parent_flow") as flow:
child_flow_run_id = create_flow_run(
flow_name="child_flow_name", run_name="custom_run_name"
)
child_flowrunview = wait_for_flow_run(
child_flow_run_id, raise_final_state=True, stream_logs=True
)
FlowRunView
that has a method called `get_logs`:
from prefect.backend.flow_run import FlowRunView
fr_view = FlowRunView(flow_run_id="uuid")
fr_view.get_logs()
Ken Nguyen
03/28/2022, 6:07 PMAnna Geller
03/28/2022, 6:21 PMFlowRunLog(timestamp=DateTime(2022, 3, 15, 10, 3, 15, 776868, tzinfo=Timezone('+00:00')), level=20, message='Submitted for execution: Container ID: 52383f95621d6d2341a4f53cc95de5be63e6031218e6366074cc74adbdcc1859')
I gave you a wrong syntax, here is the right one if you want to try it:
from prefect.backend.flow_run import FlowRunView
flow_run = FlowRunView.from_flow_run_id(
"9549c55f-1afe-45b6-afb3-1b1ed32e7c3f"
)
logs = flow_run.get_logs()
x = logs[0]
x.timestamp # gives you a nice DateTime object
x.message
x.level
Ken Nguyen
03/28/2022, 6:57 PMproduce_logs
parent flow set on a regular schedule, which runs first, then triggers the pull_logs
child flow.
I think I should change my perspective and have the pull_logs
as a parent flow which has a wait_for_flow_run to wait for the produce_logs
child flow to run first.