Wenli Wan
02/04/2021, 9:00 PMTask
with log_stdout=True
2. add a FileHandeler
to the logger of the Task
with the following code:
class MyTask(Task):
def __init__(self, test_id, log_stdout=True, **kwargs):
super().__init__(log_stdout=log_stdout, **kwargs)
self.test_id = test_id
fh = FileHandler(f"../logging_test/test_{test_id}.log")
self.logger.addHandler(fh)
def run(self):
<http://self.logger.info|self.logger.info>("An info message.")
print(f'test {self.test_id}')
Then run it as the following:
t1 = MyTask(1, name="test1", log_stdout=False)
t2 = MyTask(2, name="test2")
f = Flow("logging test")
f.add_task(t1)
f.add_task(t2)
f.run()
I found out the stdout from the Task has been redirected to the logger of the TaskRunner
instead of the logger of MyTask
:
[2021-02-04 18:00:09-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'logging test'
[2021-02-04 18:00:09-0500] DEBUG - prefect.FlowRunner | Using executor type LocalExecutor
[2021-02-04 18:00:09-0500] DEBUG - prefect.FlowRunner | Flow 'logging test': Handling state change from Scheduled to Running
[2021-02-04 18:00:09-0500] INFO - prefect.TaskRunner | Task 'test1': Starting task run...
[2021-02-04 18:00:09-0500] DEBUG - prefect.TaskRunner | Task 'test1': Handling state change from Pending to Running
[2021-02-04 18:00:09-0500] DEBUG - prefect.TaskRunner | Task 'test1': Calling task.run() method...
[2021-02-04 18:00:09-0500] INFO - prefect.test1 | An info message from test 1.
test 1
[2021-02-04 18:00:09-0500] DEBUG - prefect.TaskRunner | Task 'test1': Handling state change from Running to Success
[2021-02-04 18:00:09-0500] INFO - prefect.TaskRunner | Task 'test1': Finished task run for task with final state: 'Success'
[2021-02-04 18:00:09-0500] INFO - prefect.TaskRunner | Task 'test2': Starting task run...
[2021-02-04 18:00:09-0500] DEBUG - prefect.TaskRunner | Task 'test2': Handling state change from Pending to Running
[2021-02-04 18:00:09-0500] DEBUG - prefect.TaskRunner | Task 'test2': Calling task.run() method...
[2021-02-04 18:00:09-0500] INFO - prefect.test2 | An info message from test 2.
[2021-02-04 18:00:09-0500] INFO - prefect.TaskRunner | test 2
[2021-02-04 18:00:09-0500] DEBUG - prefect.TaskRunner | Task 'test2': Handling state change from Running to Success
[2021-02-04 18:00:09-0500] INFO - prefect.TaskRunner | Task 'test2': Finished task run for task with final state: 'Success'
[2021-02-04 18:00:09-0500] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2021-02-04 18:00:09-0500] DEBUG - prefect.FlowRunner | Flow 'logging test': Handling state change from Running to Success
is there a way to have the Task stdout log in the Task logger? I'm trying to keep a log per task include the task stdoutWenli Wan
02/04/2021, 11:17 PMZanie
Wenli Wan
02/04/2021, 11:24 PMZanie
TaskRunner
captures stdout and redirects it into the TaskRunner logger
# Create a stdout redirect if the task has log_stdout enabled
log_context = (
redirect_stdout(prefect.utilities.logging.RedirectToLog(self.logger))
if getattr(self.task, "log_stdout", False)
else nullcontext()
) # type: AbstractContextManager
so yeah this is about as expected since you're adding a file handler to task.logger
-- you could try putting the contents of your run method in a with redirect_stdout(prefect.utilities.logging.RedirectToLog(self.logger))
block instead of using the builtin log_stdout
Zanie
redirect_stdout
is from contextlib from contextlib import redirect_stdout
Wenli Wan
02/04/2021, 11:34 PM