Sean Harkins

    Sean Harkins

    1 year ago
    I have a question about my (probably incorrect 😄 ) understanding of Prefect logger and it’s behavior when using a
    DaskExecutor
    . When running this example Flow
    @task
    def say_hello():
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>("Hello, Cloud")
        return "hello result"
    
    
    with Flow(
        "dask-test-flow",
        storage=storage.S3(
            bucket=storage_bucket
        ),
        run_config=ECSRun(
            image=worker_image,
            labels=json.loads(os.environ["PREFECT_AGENT_LABELS"]),
            task_definition=definition,
        ),
    ) as flow:
        hello_result = say_hello()
    I am able to view log information (specifically the “Hello, Cloud” info line) directly in Prefect Cloud. But using an identical flow with a
    DaskExecutor
    logging information is sent to my CloudWatch
    dask-ecs
    log group but is not available in Prefect Cloud. Is it possible to view all Dask worker logging in Prefect Cloud and if so, what additional logging config to I need to include?
    Dylan

    Dylan

    1 year ago
    Hi @Sean Harkins! Can you share your Prefect version and executor configuration as well as any networking information about your cluster?
    Sean Harkins

    Sean Harkins

    1 year ago
    @Dylan This is the executor configuration
    definition = yaml.safe_load(
        """
        networkMode: awsvpc
        cpu: 1024
        memory: 2048
        containerDefinitions:
            - name: flow
        """
    )
    definition["executionRoleArn"] = execution_role_output
    
    executor = DaskExecutor(
        cluster_class="dask_cloudprovider.aws.FargateCluster",
        cluster_kwargs={
            "image": "ecr/image",
            "vpc": vpc_output,
            "cluster_arn": cluster_output,
            "task_role_arn": task_role_output,
            "execution_role_arn": execution_role_output,
            "security_groups": [
                security_group_output
            ],
            "n_workers": 2,
            "scheduler_cpu": 256,
            "scheduler_mem": 512,
            "worker_cpu": 1024,
            "worker_mem": 2048,
            "scheduler_timeout": "15 minutes",
        },
    )
    And these are the rules used by the cluster security group
    security_group.add_ingress_rule(
                aws_ec2.Peer.any_ipv4(),
                aws_ec2.Port.tcp_range(8786, 8787)
            )
            security_group.add_ingress_rule(
                aws_ec2.Peer.any_ipv6(),
                aws_ec2.Port.tcp_range(8786, 8787)
            )
            security_group.add_ingress_rule(
                security_group,
                aws_ec2.Port.all_tcp()
            )
    Dylan

    Dylan

    1 year ago
    And your Prefect version?
    Sean Harkins

    Sean Harkins

    1 year ago
    🤦 Sorry 😄 From the image used by the Dask worker nodes
    RUN pip install prefect[aws]==0.14.13
    Dylan

    Dylan

    1 year ago
    Thank you!
    Hey Sean, just to confirm a couple of things:1. You see “Hello, Cloud” in every case, correct? 2. You want Dask worker logging to appear in Prefect Cloud’s UI, correct?
    Sean Harkins

    Sean Harkins

    1 year ago
    Without Dask I can see “Hello, Cloud” in Prefect Cloud , with Dask I can only see “Hello, Cloud” in my
    dask-ecs
    log group. I would like to have worker logging appear in Prefect Cloud’s UI 👍
    Jim Crist-Harif

    Jim Crist-Harif

    1 year ago
    Adding logs from all dask workers to prefect cloud may be extremely noisy (dask logs can be verbose) and is also kinda tricky currently to set up properly. You should always see logs from the prefect loggers (e.g. logs from
    prefect.context.get("logger")
    ) regardless of execution environment though - if this is not the case this sounds like a bug. Since you can't see
    Hello, Cloud
    in the logs, sounds like that's the case.
    Can you reproduce using a local
    DaskExecutor
    rather than something running on fargate? Switching to
    executor = DaskExecutor()
    should be sufficient to check.
    Sean Harkins

    Sean Harkins

    1 year ago
    I ran the same flow using the
    DaskExecutor
    without a
    cluster_class
    import json
    import os
    
    import prefect
    from prefect import Flow, storage, task
    from prefect.run_configs import ECSRun
    import yaml
    import boto3
    from prefect.engine.executors import DaskExecutor
    
    
    identifier = os.environ["IDENTIFIER"]
    project = os.environ["PREFECT_PROJECT"]
    worker_image = os.environ["PREFECT_DASK_WORKER_IMAGE"]
    
    cloudformation = boto3.resource('cloudformation')
    stack = cloudformation.Stack(f"pangeo-forge-aws-bakery-{identifier}")
    execution_role_output = next((
        output for output in stack.outputs
        if output.get("ExportName") == f"prefect-task-execution-role-{identifier}"),
        None)["OutputValue"]
    
    storage_bucket_output = next((
        output for output in stack.outputs
        if output.get("ExportName") == f"prefect-storage-bucket-name-output-{identifier}"),
        None)["OutputValue"]
    
    definition = yaml.safe_load(
        """
        networkMode: awsvpc
        cpu: 1024
        memory: 2048
        containerDefinitions:
            - name: flow
        """
    )
    definition["executionRoleArn"] = execution_role_output
    
    executor = DaskExecutor(
        cluster_kwargs={
            "n_workers": 1,
            "threads_per_worker": 1
        }
    )
    
    
    @task
    def say_hello():
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>("Hello, Cloud")
        return "hello result"
    
    
    with Flow(
        "dask-local-test-flow",
        storage=storage.S3(
            bucket=storage_bucket_output
        ),
        run_config=ECSRun(
            image=worker_image,
            labels=json.loads(os.environ["PREFECT_AGENT_LABELS"]),
            task_definition=definition,
        ),
        executor=executor,
    ) as flow:
        hello_result = say_hello()
    
    flow.register(project_name=project)
    “Hello, Cloud” is not logged to Prefect Cloud. Just for clarification, I am still using ECSRun with an agent running in an ECSCluster. Would you prefer testing this with a local agent and local (to my machine) DaskExecutor? As a note, because this
    ECSRun
    uses an ephemeral TaskDefinition, there is no Cloudwatch Log Group and the log statement doesn’t appear to be going anywhere.
    As a bit of follow up here, my end goal is that after
    DaskExecutor
    runs start for this cluster https://github.com/PrefectHQ/prefect/blob/master/src/prefect/executors/dask.py#L192 and instantiates the
    Client
    I would like to be able to report the scheduler’s
    dashboard_link
    in the Prefect Cloud UI so that people can use it for debugging their Dask distributed execution.
    @Dylan @Jim Crist-Harif Any ideas on this?
    Jim Crist-Harif

    Jim Crist-Harif

    1 year ago
    I figured out the issue earlier today. In short - this has to do with how our logs our batched and how the dask cluster is shutdown. If you have > 5 s from the last log line written for a task on a worker than things will show up in the UI (our logs are batched in 5 s batches). If a worker is violently shutdown though before the batch is written, then those logs will be lost. I think I have a fix for this that I can get up tomorrow, but this won't lead to all your logs being missing in a real workflow, there's only a chance (and only a chance) for the last lines written on a worker to be dropped.
    Sean Harkins

    Sean Harkins

    1 year ago
    🙇 Thank you for investigating this. My workers appear to exit cleanly but I am seeing the following issue on the scheduler that I’ll need to check into
    distributed.scheduler - INFO - End scheduler at '<tcp://10.0.166.48:8786>'
    Traceback (most recent call last):
      File "/usr/local/bin/dask-scheduler", line 8, in <module>
        sys.exit(go())
      File "/usr/local/lib/python3.8/site-packages/distributed/cli/dask_scheduler.py", line 219, in go
        main()
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 829, in __call__
        return self.main(*args, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 782, in main
        rv = self.invoke(ctx)
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
        return ctx.invoke(self.callback, **ctx.params)
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 610, in invoke
        return callback(*args, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/distributed/cli/dask_scheduler.py", line 210, in main
        loop.run_sync(run)
      File "/usr/local/lib/python3.8/site-packages/tornado/ioloop.py", line 529, in run_sync
        raise TimeoutError("Operation timed out after %s seconds" % timeout)
    tornado.util.TimeoutError: Operation timed out after None seconds
    Probably unrelated to this though I’m guessing.
    And the timestamp from my worker log statement to distributed’s
    reaping stray process
    is only 2 seconds so that aligns with with what you found. Thanks again.
    Jim Crist-Harif

    Jim Crist-Harif

    1 year ago
    Probably unrelated to this though I’m guessing.
    Yeah, we don't run anything on the scheduler itself. The issue is that the worker shutsdown without running any of the atexit hooks. I can patch around this in prefect, but it'd be cleaner I think to push a fix upstream to distributed. I may do both.
    Pushed up a partial fix to upstream dask. This will fix the issue for local cluster usage. Remote clusters seem to work as intended in my tests already (though I haven't tested on fargate). The distributed codebase at least is already doing the right thing here.
    Update - while the upstream PR did fix things, upon closer investigation the underlying issue was in prefect itself - PR here: https://github.com/PrefectHQ/prefect/pull/4334. Illustrates the importance of ensuring your tests fail before making the fix 😃.
    Fix should be out in the next release. Thanks for raising the issue!
    Sean Harkins

    Sean Harkins

    1 year ago
    Thanks again for extremely quick turnaround.