Matt Alhonte
10/04/2023, 12:50 AMdef make_fail(flow, flow_run, state):
print("Failed")
flow_run_ctx = FlowRunContext.get()
print(flow_run_ctx)
print("Flow")
print(flow)
print("Flow run")
@flow(
log_prints=True,
on_completion=[make_fail],
on_failure=[make_fail],
task_runner=ConcurrentTaskRunner(),
result_serializer=JSONSerializer(),
result_storage=S3(bucket_path=bucket_path),
persist_result=True,
)
def my_flow():
result1 = my_task.submit()
result2 = my_task2.submit()
result3 = my_task.submit()
result4 = my_task2.submit(wait_for=[result1])
None of the print
statements show upNate
10/04/2023, 3:17 AMon_completion
hook, you're not actually in the flow run context
log_prints works by patching builtin.print
when you enter a flow run context that has log_prints=True
In [6]: from prefect import flow, task
In [7]: from prefect.context import get_run_context
...:
...: def foo(flow, flow_run, state):
...: print(get_run_context())
...:
In [8]: @flow(on_completion=[foo])
...: def my_flow():
...: pass
...:
In [9]: my_flow()
22:15:44.813 | INFO | prefect.engine - Created flow run 'enormous-tortoise' for flow 'my-flow'
22:15:45.185 | INFO | Flow run 'enormous-tortoise' - Running hook 'foo' in response to entering state 'Completed'
22:15:45.197 | ERROR | Flow run 'enormous-tortoise' - An error was encountered while running hook 'foo'
File "<ipython-input-7-9ac86415b18c>", line 4, in foo
print(get_run_context())
^^^^^^^^^^^^^^^^^
File "/Users/nate/src/open-source/prefect-stuff/prefect/src/prefect/context.py", line 367, in get_run_context
raise MissingContextError(
prefect.exceptions.MissingContextError: No run context available. You are not in a flow or task run context.
Matt Alhonte
10/04/2023, 6:12 PMNate
10/04/2023, 6:13 PMfrom prefect import flow, get_run_logger
from prefect.logging.loggers import flow_run_logger
def log_outside(flow, flow_run, state):
flow_run_logger(flow_run, flow).info("Hello from the failure handler!")
@flow(on_failure=[log_outside])
def failing_flow():
get_run_logger().info("Hello from the flow!")
raise ValueError()
if __name__ == "__main__":
failing_flow()
Matt Alhonte
10/04/2023, 6:14 PMNate
10/04/2023, 6:14 PM