https://prefect.io logo
Title
t

Tim-Oliver

11/18/2022, 11:03 AM
Hi, I am testing logging from
DaskTaskRunner
workers with the current main-branch version of Prefect, which works now locally thanks to some very recent changes (thanks a lot 💐). However, I am having troubles getting the logs from tasks which are running on a
DaskTaskRunner
which uses
dask_jobqueue.SLURMCluster
. The logs from tasks are written into the slurm-output file, but not propagated back to the flow-log or the cloud UI. Happy, to test some things if it would be helpful.
And if I should describe this issue somewhere else, please let me know.
t

Tim Galvin

11/18/2022, 11:13 AM
Typically, how long are the tasks that you are running? I had a weird situation in some test code I was toying with. When I threw in a
os.system("sleep 5")
right before my flow finished I got the logs I was missing.
t

Tim-Oliver

11/18/2022, 12:08 PM
Interesting! With
os.system("sleep 5")
the logs get through. Will try a real long running computation and see if the logs appear or if I always need a
os.system("sleep 5")
. Thanks for the pointer!
Alright, it looks like it works as soon as the task runs approximately for 1sec i.e. does something and not just call
<http://logger.info|logger.info>(...)
. Then I don't even need the sleep.
t

Tim Galvin

11/18/2022, 1:51 PM
My suspicion is that there is some type of event loop in the process running the main flow that will regularly poll the workers for new logs. When the flow completes, this loop does not have time to hit up the worker get_logs method before it closes. This sleep is just a tricksie way of doing that.
z

Zanie

11/18/2022, 5:35 PM
We actually don’t poll the workers for logs, each task worker process runs a log handler in a background thread and streams logs to the API in batches.
🙏 1
If the process has a super-short lifetime it may not finish sending logs before exiting. The threads are daemons and won’t block the main thread from exiting.
t

Tim Galvin

11/19/2022, 3:39 AM
Boom -- that makes sense to me! Is there perhaps a better supported work around other than my cheeky
os.system("sleep 4")
approach? If I recall correctly I did time the more pythonic
time.sleep(3)
type approach, but I feel like it cause somed other strange error I was not pleased with. Would just a sleep command block all threads from running code, or just the thread running the time.sleep?
z

Zanie

11/19/2022, 5:03 AM
We can probably force the worker to flush all the queued logs when a task finishes. We definitely do that for flows, but that doesn't work once the tasks are creating workers on a remote process. Guess we didn't catch it because logging with Dask was broken 🙃
t

Tim Galvin

11/19/2022, 6:27 AM
That would be the way to fix this for sure I think! Thanks a lot! I really appreciate how activate the prefect developers are - really a pleasure :)
z

Zanie

11/19/2022, 5:45 PM
🙂 We do our best! Here’s a patch if you want to try it out https://github.com/PrefectHQ/prefect/compare/task-run-log-flush
t

Tim Galvin

11/20/2022, 6:00 AM
Thanks for that Michael - you are a genius! I can try to fire up a test of this shortly.
t

Tim-Oliver

11/21/2022, 7:28 AM
I can not agree more with Tim, such a pleasure to have this level of interactivity and support. Thank you! Will give it a try as well :)
Tested with this flow:
@task()
def a_task(i):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"I am logging from a task. {i}")

    <http://logger.info|logger.info>("Done.")


@flow(
    name="Logger Test",
    task_runner=runner,
)
def flow():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Flow starts!")

    a_task.map(range(3))

    a_task.submit(4)

    <http://logger.info|logger.info>("Flow ends!")
And all logs appear as expected. (Previously this only worked with the sleep command in place.)
z

Zanie

11/21/2022, 4:25 PM
Great! I’ll get that moved over to a PR 🙂
🙏 1
:party-parrot: 1
Thank you!
t

Tim Galvin

11/22/2022, 1:11 AM
Awesome -- huge thanks Tim and and even larger thanks to Michael!