https://prefect.io logo
Title
j

Jared Noynaert

05/25/2022, 7:34 PM
@Kevin Kho, did you ever find out more about https://github.com/PrefectHQ/prefect/issues/5769? This is impacting us on AKS also edit: the reason I'm asking is that we are probably going to have to change our default executor for all flows, just to enable task observability....!
k

Kevin Kho

05/25/2022, 7:36 PM
I don’t have an update on that unfortunately besides what’s written in the issue
j

Jared Noynaert

05/25/2022, 7:43 PM
If we can help meaningfully with debugging let us know. Definitely would prefer not to have to pick between certain execution environments and seeing task logs
k

Kevin Kho

05/25/2022, 8:04 PM
To be honest I personally don’t know where to start because it seems not so deterministic based on the issue where it worked sometimes.
z

Zanie

05/25/2022, 9:04 PM
Feel free to dig into it! Any information you can add will be helpful.
j

Jared Noynaert

05/26/2022, 3:55 AM
• For us this happens 100% of the time when using LocalDaskExecutor with processes (k8s agent, k8s run)--we get only the CloudFlowRunner state updates in the logs, although the UI definitely displays the task-level info as the flow runs (just not in the logs). • From a quick test it looks like when we switch to a DaskExecutor (on an ephemeral KubeCluster) we start to see the CloudTaskRunner updates, but won't see the logs from within tasks. We've only tested one example of this so it may be worth validating. • When we switch to a LocalDaskExecutor with threads, we get everything, including task-level logging output (i.e. the full observability we need)
Kevin I feel you that this is not a very straightforward issue to debug 😅
k

Kevin Kho

05/26/2022, 3:58 PM
This is super mind boggling to me. DaskExecutor not giving logs is perfectly normal. There is no built-in mechanism for Dask to pass logs from workers to scheduler so you really need to find them in the worker logs instead. The real issue here is just LocalDask with processes.
z

Zanie

05/26/2022, 3:59 PM
We’re assigning some additional resources to investigate this
j

Jared Noynaert

05/26/2022, 5:02 PM
To clarify, it was logs via the Prefect logger inside a task that weren't surfacing with the DaskExecutor. There's a layer of indirection there so I won't file an issue unless we come up with a minimal example to reproduce, though
k

Kevin Kho

05/26/2022, 5:49 PM
Yes that’s right I don’t think those get surface because the original logger is serialized and passed to workers. When it’s deserialized, it loses the configuration to send stuff to Cloud.
j

Jared Noynaert

05/27/2022, 1:56 AM
Is that still going to be a limitation in Orion? E.g. if I execute a flow using a Dask or Ray task runner, the following would send logs to Prefect Cloud? We will definitely test it out when we upgrade, but I was immediately curious as fully distributed execution seems like when you'd want in-task logging via the UI the most!
@task
def engage_warp_9():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Warp drive engaged")
k

Kevin Kho

05/27/2022, 1:57 AM
I think Dask needs to do some work upstream to be able to support that.
👍 1
z

Zanie

05/27/2022, 5:10 AM
This should be fully functional in 2.0
m

Marko Jamedzija

07/19/2022, 4:19 PM
Hi @Kevin Kho, is there any solution to this today? We were on
0.15.7
and when updated to
1.2.4
we can’t see any tasks’ logs anymore in the Prefect UI. We are also using
LocalDaskExecutor
with processes.
k

Kevin Kho

07/19/2022, 4:52 PM
Yeah! Commented in the issue. Use DaskExecutor with LocalCluster
m

Marko Jamedzija

07/19/2022, 4:59 PM
Thanks @Kevin Kho, this works! Btw, where do you see this as a preferred way against
LocalDaskExecutor
with
threads
(if any)?
(I saw that using threads also solves the problem)
k

Kevin Kho

07/19/2022, 5:01 PM
Oh it does, I think you can specify threads instead of processes for LocalCluster too. I think it should be interrchangeable
m

Marko Jamedzija

07/19/2022, 5:03 PM
So you like this
DaskExecutor(cluster_kwargs={"n_workers": 1, "threads_per_worker": 4})
e.g.?
k

Kevin Kho

07/19/2022, 5:03 PM
yeah exactly
m

Marko Jamedzija

07/19/2022, 5:04 PM
so it’s better then
LocalDaskExecutor("threads", num_workers = 4)
then?
k

Kevin Kho

07/19/2022, 5:27 PM
I dont expect it to be a huge difference, but it might be more stable yep
m

Marko Jamedzija

07/19/2022, 6:27 PM
Thanks!