I have a question about my (probably incorrect :sm...
# ask-community
s
I have a question about my (probably incorrect 😄 ) understanding of Prefect logger and it’s behavior when using a
DaskExecutor
. When running this example Flow
Copy code
@task
def say_hello():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Hello, Cloud")
    return "hello result"


with Flow(
    "dask-test-flow",
    storage=storage.S3(
        bucket=storage_bucket
    ),
    run_config=ECSRun(
        image=worker_image,
        labels=json.loads(os.environ["PREFECT_AGENT_LABELS"]),
        task_definition=definition,
    ),
) as flow:
    hello_result = say_hello()
I am able to view log information (specifically the “Hello, Cloud” info line) directly in Prefect Cloud. But using an identical flow with a
DaskExecutor
logging information is sent to my CloudWatch
dask-ecs
log group but is not available in Prefect Cloud. Is it possible to view all Dask worker logging in Prefect Cloud and if so, what additional logging config to I need to include?
d
Hi @Sean Harkins! Can you share your Prefect version and executor configuration as well as any networking information about your cluster?
s
@Dylan This is the executor configuration
Copy code
definition = yaml.safe_load(
    """
    networkMode: awsvpc
    cpu: 1024
    memory: 2048
    containerDefinitions:
        - name: flow
    """
)
definition["executionRoleArn"] = execution_role_output

executor = DaskExecutor(
    cluster_class="dask_cloudprovider.aws.FargateCluster",
    cluster_kwargs={
        "image": "ecr/image",
        "vpc": vpc_output,
        "cluster_arn": cluster_output,
        "task_role_arn": task_role_output,
        "execution_role_arn": execution_role_output,
        "security_groups": [
            security_group_output
        ],
        "n_workers": 2,
        "scheduler_cpu": 256,
        "scheduler_mem": 512,
        "worker_cpu": 1024,
        "worker_mem": 2048,
        "scheduler_timeout": "15 minutes",
    },
)
And these are the rules used by the cluster security group
Copy code
security_group.add_ingress_rule(
            aws_ec2.Peer.any_ipv4(),
            aws_ec2.Port.tcp_range(8786, 8787)
        )
        security_group.add_ingress_rule(
            aws_ec2.Peer.any_ipv6(),
            aws_ec2.Port.tcp_range(8786, 8787)
        )
        security_group.add_ingress_rule(
            security_group,
            aws_ec2.Port.all_tcp()
        )
d
And your Prefect version?
s
🤦 Sorry 😄 From the image used by the Dask worker nodes
Copy code
RUN pip install prefect[aws]==0.14.13
d
Thank you!
Hey Sean, just to confirm a couple of things: 1. You see “Hello, Cloud” in every case, correct? 2. You want Dask worker logging to appear in Prefect Cloud’s UI, correct?
s
Without Dask I can see “Hello, Cloud” in Prefect Cloud , with Dask I can only see “Hello, Cloud” in my
dask-ecs
log group. I would like to have worker logging appear in Prefect Cloud’s UI 👍
j
Adding logs from all dask workers to prefect cloud may be extremely noisy (dask logs can be verbose) and is also kinda tricky currently to set up properly. You should always see logs from the prefect loggers (e.g. logs from
prefect.context.get("logger")
) regardless of execution environment though - if this is not the case this sounds like a bug. Since you can't see
Hello, Cloud
in the logs, sounds like that's the case.
Can you reproduce using a local
DaskExecutor
rather than something running on fargate? Switching to
executor = DaskExecutor()
should be sufficient to check.
s
I ran the same flow using the
DaskExecutor
without a
cluster_class
Copy code
import json
import os

import prefect
from prefect import Flow, storage, task
from prefect.run_configs import ECSRun
import yaml
import boto3
from prefect.engine.executors import DaskExecutor


identifier = os.environ["IDENTIFIER"]
project = os.environ["PREFECT_PROJECT"]
worker_image = os.environ["PREFECT_DASK_WORKER_IMAGE"]

cloudformation = boto3.resource('cloudformation')
stack = cloudformation.Stack(f"pangeo-forge-aws-bakery-{identifier}")
execution_role_output = next((
    output for output in stack.outputs
    if output.get("ExportName") == f"prefect-task-execution-role-{identifier}"),
    None)["OutputValue"]

storage_bucket_output = next((
    output for output in stack.outputs
    if output.get("ExportName") == f"prefect-storage-bucket-name-output-{identifier}"),
    None)["OutputValue"]

definition = yaml.safe_load(
    """
    networkMode: awsvpc
    cpu: 1024
    memory: 2048
    containerDefinitions:
        - name: flow
    """
)
definition["executionRoleArn"] = execution_role_output

executor = DaskExecutor(
    cluster_kwargs={
        "n_workers": 1,
        "threads_per_worker": 1
    }
)


@task
def say_hello():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Hello, Cloud")
    return "hello result"


with Flow(
    "dask-local-test-flow",
    storage=storage.S3(
        bucket=storage_bucket_output
    ),
    run_config=ECSRun(
        image=worker_image,
        labels=json.loads(os.environ["PREFECT_AGENT_LABELS"]),
        task_definition=definition,
    ),
    executor=executor,
) as flow:
    hello_result = say_hello()

flow.register(project_name=project)
“Hello, Cloud” is not logged to Prefect Cloud. Just for clarification, I am still using ECSRun with an agent running in an ECSCluster. Would you prefer testing this with a local agent and local (to my machine) DaskExecutor? As a note, because this
ECSRun
uses an ephemeral TaskDefinition, there is no Cloudwatch Log Group and the log statement doesn’t appear to be going anywhere.
As a bit of follow up here, my end goal is that after
DaskExecutor
runs start for this cluster https://github.com/PrefectHQ/prefect/blob/master/src/prefect/executors/dask.py#L192 and instantiates the
Client
I would like to be able to report the scheduler’s
dashboard_link
in the Prefect Cloud UI so that people can use it for debugging their Dask distributed execution.
@Dylan @Jim Crist-Harif Any ideas on this?
j
I figured out the issue earlier today. In short - this has to do with how our logs our batched and how the dask cluster is shutdown. If you have > 5 s from the last log line written for a task on a worker than things will show up in the UI (our logs are batched in 5 s batches). If a worker is violently shutdown though before the batch is written, then those logs will be lost. I think I have a fix for this that I can get up tomorrow, but this won't lead to all your logs being missing in a real workflow, there's only a chance (and only a chance) for the last lines written on a worker to be dropped.
hero 2
s
🙇 Thank you for investigating this. My workers appear to exit cleanly but I am seeing the following issue on the scheduler that I’ll need to check into
Copy code
distributed.scheduler - INFO - End scheduler at '<tcp://10.0.166.48:8786>'
Traceback (most recent call last):
  File "/usr/local/bin/dask-scheduler", line 8, in <module>
    sys.exit(go())
  File "/usr/local/lib/python3.8/site-packages/distributed/cli/dask_scheduler.py", line 219, in go
    main()
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/distributed/cli/dask_scheduler.py", line 210, in main
    loop.run_sync(run)
  File "/usr/local/lib/python3.8/site-packages/tornado/ioloop.py", line 529, in run_sync
    raise TimeoutError("Operation timed out after %s seconds" % timeout)
tornado.util.TimeoutError: Operation timed out after None seconds
Probably unrelated to this though I’m guessing.
And the timestamp from my worker log statement to distributed’s
reaping stray process
is only 2 seconds so that aligns with with what you found. Thanks again.
j
Probably unrelated to this though I’m guessing.
Yeah, we don't run anything on the scheduler itself. The issue is that the worker shutsdown without running any of the atexit hooks. I can patch around this in prefect, but it'd be cleaner I think to push a fix upstream to distributed. I may do both.
Pushed up a partial fix to upstream dask. This will fix the issue for local cluster usage. Remote clusters seem to work as intended in my tests already (though I haven't tested on fargate). The distributed codebase at least is already doing the right thing here.
🦸 1
Update - while the upstream PR did fix things, upon closer investigation the underlying issue was in prefect itself - PR here: https://github.com/PrefectHQ/prefect/pull/4334. Illustrates the importance of ensuring your tests fail before making the fix :).
👍 2
hero 1
Fix should be out in the next release. Thanks for raising the issue!
s
Thanks again for extremely quick turnaround.