Is there a way to get logs from code running withi...
# prefect-community
t
Is there a way to get logs from code running within dask? I know that for "general" use of dask, the logs would have to be pulled from the execution environment such as k8s (as noted here) When I am running code that uses
worker_client
and then submit new tasks to the same cluster (code inside), I see that printout from the first call of the function is logged in prefect and shipped to server, but consecutive calls are not (see log inside). Is there some trick I can use?
The code:
Copy code
def fib(value: int) -> int:
    slow_compute_delay = random.random() * 2
    print(
        f"calculating fib({value}). sleeping {slow_compute_delay} seconds to simulate slow processing"
    )
    time.sleep(slow_compute_delay)
    if value < 2:
        return value
    with worker_client() as client:
        a_future = client.submit(fib, value - 1)
        b_future = client.submit(fib, value - 2)
        a, b = client.gather([a_future, b_future])
        return a + b


@task(log_stdout=True)
def get_data(value1: int, value2: str) -> List[int]:
    print(f"the parameters are value1={value1}, value2={value2}")
    return list(range(value1))


@task(log_stdout=True)
def load_stuff(values: List[int]) -> List[int]:
    print(f"loading stuff - got a list of size {len(values)}")
    return values


@task(log_stdout=True)
def compute_with_dask(values: List[int]) -> int:
    max_value = max(values)
    print(
        f"got a list of size {len(values)} - starting long process to calculate fib({max_value})"
    )
    results = fib(max_value)
    print(f"got results of calculation, the value is {results}")


with Flow("fib-dask-flow") as flow:
    param = Parameter("value1", default=12)
    param2 = Parameter("value2", default="Prefect")
    values = get_data(param, param2)
    loaded = load_stuff(values)
    compute_with_dask(loaded)


flow.storage = Docker(
    registry_url="<http://docker.k8s.nextsilicon.com/repository/container-ns/|docker.k8s.nextsilicon.com/repository/container-ns/>",
    image_name="research-dask-fib",
)

flow.executor = DaskExecutor()
The logs (collected with kubectl, or actually, k9s) - note how
calculating fib(11)...
is captured by prefect but consecutive calls are not...
Copy code
│ [2022-03-20 10:10:39+0000] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'fib-dask-flow'                                                                      │
│ [2022-03-20 10:10:39+0000] INFO - prefect.DaskExecutor | Creating a new Dask cluster with `distributed.deploy.local.LocalCluster`...                                    |
| [2022-03-20 10:10:41+0000] INFO - prefect.DaskExecutor | The Dask dashboard is available at <http://127.0.0.1:8787/status>                                                |
| /usr/local/lib/python3.9/site-packages/distributed/scheduler.py:5670: UserWarning: Scheduler already contains a plugin with name worker-status; overwriting.            |
|   warnings.warn(                                                                                                                                                        |
| [2022-03-20 10:10:41+0000] INFO - prefect.CloudTaskRunner | Task 'value1': Starting task run...                                                                         |
| [2022-03-20 10:10:41+0000] INFO - prefect.CloudTaskRunner | Task 'value2': Starting task run...                                                                         |
| [2022-03-20 10:10:41+0000] INFO - prefect.CloudTaskRunner | Task 'value1': Finished task run for task with final state: 'Success'                                       |
| [2022-03-20 10:10:41+0000] INFO - prefect.CloudTaskRunner | Task 'value2': Finished task run for task with final state: 'Success'                                       |
| [2022-03-20 10:10:41+0000] INFO - prefect.CloudTaskRunner | Task 'get_data': Starting task run...                                                                       |
| [2022-03-20 10:10:42+0000] INFO - prefect.CloudTaskRunner | the parameters are value1=12, value2=Prefect                                                                |
| [2022-03-20 10:10:42+0000] INFO - prefect.CloudTaskRunner | Task 'get_data': Finished task run for task with final state: 'Success'                                     |
| [2022-03-20 10:10:42+0000] INFO - prefect.CloudTaskRunner | Task 'load_stuff': Starting task run...                                                                     |
| [2022-03-20 10:10:42+0000] INFO - prefect.CloudTaskRunner | loading stuff - got a list of size 12                                                                       |
| [2022-03-20 10:10:42+0000] INFO - prefect.CloudTaskRunner | Task 'load_stuff': Finished task run for task with final state: 'Success'                                   |
| [2022-03-20 10:10:42+0000] INFO - prefect.CloudTaskRunner | Task 'compute_with_dask': Starting task run...                                                              |
| [2022-03-20 10:10:42+0000] INFO - prefect.CloudTaskRunner | got a list of size 12 - starting long process to calculate fib(11)                                          |
| [2022-03-20 10:10:42+0000] INFO - prefect.CloudTaskRunner | calculating fib(11). sleeping 0.5530870039737996 seconds to simulate slow processing                        |
| [2022-03-20 10:10:48+0000] INFO - prefect.CloudTaskRunner | got results of calculation, the value is 89                                                                 |
| [2022-03-20 10:10:48+0000] INFO - prefect.CloudTaskRunner | Task 'compute_with_dask': Finished task run for task with final state: 'Success'                            |
| calculating fib(1). sleeping 1.1147617720297223 seconds to simulate slow processing                                                                                     |
| calculating fib(0). sleeping 1.6729680086778969 seconds to simulate slow processing                                                                                     |
| calculating fib(6). sleeping 0.7344708098646031 seconds to simulate slow processing                                                                                     |
| calculating fib(5). sleeping 1.185138104797178 seconds to simulate slow processing                                                                                      |
| calculating fib(3). sleeping 1.1508838185913755 seconds to simulate slow processing                                                                                     |
| calculating fib(7). sleeping 1.3828636184701388 seconds to simulate slow processing                                                                                     |
| calculating fib(8). sleeping 0.8941380123847236 seconds to simulate slow processing                                                                                     |
| calculating fib(10). sleeping 1.6048135240274324 seconds to simulate slow processing                                                                                    |
| calculating fib(4). sleeping 0.15924167691710855 seconds to simulate slow processing                                                                                    |
| calculating fib(9). sleeping 0.5762516282451282 seconds to simulate slow processing                                                                                     |
| calculating fib(2). sleeping 0.5941398434837817 seconds to simulate slow processing                                                                                     |
| [2022-03-20 10:10:48+0000] INFO - prefect.CloudFlowRunner | Flow run SUCCESS: all reference tasks succeeded                                                             |
| Stream closed EOF for research/prefect-job-5d1873a9--1-z78p5 (flow)
a
@Tomer Cagan this is a known issue - we explained it here incl. some things you may try as a workaround https://discourse.prefect.io/t/why-are-the-dask-logs-not-being-shown-in-the-prefect-ui/75
t
I didn't find the discourse (have to get used to search there before I ask) but did find the github issue you mention... I was still wondering since the first log message does get collected so I thought maybe there's a way to collect the other ones. I assumed, since I am running with a DaskExecutor, my tasks are executed on that cluster and since prefect is able to collect the first print statement, it would be able to somehow collect the others... I tried to find this in the agent's code but not sure that's the right way to look for it...
In case I am running with
flow.executor = DaskExecutor(cluster_class=KubeCluster())
, is there a way for me to access that
KubeCluster
instance? If I can somehow get it from within the flow I can save the content of the log somehow (the goal is to have it easily accessible from within the UI). I guess that with a resource manager it would somehow be possible but currently I am hoping to run on the same cluster that is spawned by prefect...
a
as explained on Discourse, it's not currently possible to view the logs directly in the Prefect UI, you need some extra log aggregation service and view Dask logs there. It's a known issue in
distributed
repo
t
I understand what is in the Discourse. Was wondering if there's some stuff that can be done. If for example I can have access to the
KubeCluster
instance, I can possibly call
get_logs()
in a consecutive task and write it to stdout so it is picked up by Prefect.. thus, I was asking about it so that I can possibly find some way to do it 🙂
a
I see. Calling
KubeCluster().get_logs()
from a consecutive task won't work since the consecutive task would also run on a Dask worker. It's a known issue and I think there is no way to do it directly when using
DaskExecutor
. But if you don't use
DaskExecutor
and you instead operate with Dask via resource manager and via Dask client, calling
KubeCluster().get_logs()
in a resource manager cleanup task could potentially work. I'm mentioning this option because of our previous threads about it. If you try that, I would appreciate it if you could share whether this worked
t
I am confused as to your first remark in contrast to what I am seeing in the logs. From what I understood, and as your statement seems to confirm, tasks are running in dask when using
DaskExecutor
. Hence, I assume, the
compute_with_dask
task from my code is running on one of the workers and when it calls
fib
, print statements from both are captured by prefect (lines with date are those captured by Prefect). Is that's wrong? If not, why it works for the futures (calls) that Prefect
submit
directly and not the others? I am still exploring how to use Prefect + Dask and as soon as I will - I hope to share how we used it...
a
I think I know what you mean, somehow you may get lucky to get some logs to show up in the UI, but this is not a behavior you should rely on. The general recommendation from us is: if you want to get Dask logs reliably, use some external service that the logs are sent to (e.g. a log aggregation service, a database, or even some shared file system) since we don't have any proven and tested solutions other than that. I would be totally curious and happy to chat more if you figure out some better solution
t
I have implemented some code with
KubeCluster
that calls
get_logs
function. I implemented this as a standalone code but similar thing can probably be done by getting the cluster from a client retrieved using
worker_client
. While this works, it does not get logs written to STDOUT. It just get logs written to the worker logger. It is probably possible to write to the same log by calling
logging.getLogger("distributed.worker")
- but for a cluster deployment, it might be better to use proper infrastructure (ELK / Loki+Promtail / Syslog) to collect / ship the logs. I will report more as I progress with this - maybe we can define a flow to enable getting the logs - like setting it up properly and then collecting the logs inside a task at the end of the flow. This will be a dask-centric solution though...
a
This will be a dask-centric solution though.
That's totally valid if you use Dask with many flows. I agree that using something like the ELK stack is better for cluster deployment as it can more reliably get logs directly on the infrastructure level rather than on a flow level. I would be curious to test it out if you can share later how you did it 🙌
t
I will try to make a more complete example and share it. I also need to move it into Prefect context (right now playing with dask only) I verified that it's possible to write to the worker log with the snippet above (though again, not sure that's a good idea 😉)
a
I'm not sure either, it could be that logs from all loggers that use standard Python logging will show up on the worker the same way, the issue is how to get all those logs before e.g. worker dies or the cluster gets shut down
t
Yeah - workers can come up and down so getting everything at the end is if-fy... another thought that crossed my mind was to somehow process it in a different thread but that's more complex and kind of need to be integrated on a "lower" level. These loopholes brings me to rethink about the question I raised above regarding some STDOUT printouts do appearing in the prefect logs - I guess the first call to my
fib
function is somehow caught even though, supposedly, it runs inside a dask task...
a
I'm no distributed expert, maybe @Kevin Kho has an idea on whether STDOUT printouts are supposed to be logged in Dask workers
k
I wouldn’t know more, this is always tricky. The Dask community does have a Discourse that you can try posting in? I think your
fib
task might be running on the Client instead of the Workers?
upvote 1
t
if it would be running on the client, it would probably show in the log (client -- prefect). I will keep playing with this and if I find anything interesting, report back
👍 1