Sean Harkins
03/28/2021, 6:52 PMDaskExecutor
. When running this example Flow
@task
def say_hello():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hello, Cloud")
return "hello result"
with Flow(
"dask-test-flow",
storage=storage.S3(
bucket=storage_bucket
),
run_config=ECSRun(
image=worker_image,
labels=json.loads(os.environ["PREFECT_AGENT_LABELS"]),
task_definition=definition,
),
) as flow:
hello_result = say_hello()
I am able to view log information (specifically the âHello, Cloudâ info line) directly in Prefect Cloud. But using an identical flow with a DaskExecutor
logging information is sent to my CloudWatch dask-ecs
log group but is not available in Prefect Cloud. Is it possible to view all Dask worker logging in Prefect Cloud and if so, what additional logging config to I need to include?Dylan
Sean Harkins
03/29/2021, 4:15 PMdefinition = yaml.safe_load(
"""
networkMode: awsvpc
cpu: 1024
memory: 2048
containerDefinitions:
- name: flow
"""
)
definition["executionRoleArn"] = execution_role_output
executor = DaskExecutor(
cluster_class="dask_cloudprovider.aws.FargateCluster",
cluster_kwargs={
"image": "ecr/image",
"vpc": vpc_output,
"cluster_arn": cluster_output,
"task_role_arn": task_role_output,
"execution_role_arn": execution_role_output,
"security_groups": [
security_group_output
],
"n_workers": 2,
"scheduler_cpu": 256,
"scheduler_mem": 512,
"worker_cpu": 1024,
"worker_mem": 2048,
"scheduler_timeout": "15 minutes",
},
)
And these are the rules used by the cluster security group
security_group.add_ingress_rule(
aws_ec2.Peer.any_ipv4(),
aws_ec2.Port.tcp_range(8786, 8787)
)
security_group.add_ingress_rule(
aws_ec2.Peer.any_ipv6(),
aws_ec2.Port.tcp_range(8786, 8787)
)
security_group.add_ingress_rule(
security_group,
aws_ec2.Port.all_tcp()
)
Dylan
Sean Harkins
03/29/2021, 4:39 PMRUN pip install prefect[aws]==0.14.13
Dylan
Dylan
Sean Harkins
03/29/2021, 5:34 PMdask-ecs
log group. I would like to have worker logging appear in Prefect Cloudâs UI đJim Crist-Harif
03/29/2021, 5:50 PMprefect.context.get("logger")
) regardless of execution environment though - if this is not the case this sounds like a bug. Since you can't see Hello, Cloud
in the logs, sounds like that's the case.Jim Crist-Harif
03/29/2021, 5:51 PMDaskExecutor
rather than something running on fargate? Switching to executor = DaskExecutor()
should be sufficient to check.Sean Harkins
03/29/2021, 9:48 PMDaskExecutor
without a cluster_class
import json
import os
import prefect
from prefect import Flow, storage, task
from prefect.run_configs import ECSRun
import yaml
import boto3
from prefect.engine.executors import DaskExecutor
identifier = os.environ["IDENTIFIER"]
project = os.environ["PREFECT_PROJECT"]
worker_image = os.environ["PREFECT_DASK_WORKER_IMAGE"]
cloudformation = boto3.resource('cloudformation')
stack = cloudformation.Stack(f"pangeo-forge-aws-bakery-{identifier}")
execution_role_output = next((
output for output in stack.outputs
if output.get("ExportName") == f"prefect-task-execution-role-{identifier}"),
None)["OutputValue"]
storage_bucket_output = next((
output for output in stack.outputs
if output.get("ExportName") == f"prefect-storage-bucket-name-output-{identifier}"),
None)["OutputValue"]
definition = yaml.safe_load(
"""
networkMode: awsvpc
cpu: 1024
memory: 2048
containerDefinitions:
- name: flow
"""
)
definition["executionRoleArn"] = execution_role_output
executor = DaskExecutor(
cluster_kwargs={
"n_workers": 1,
"threads_per_worker": 1
}
)
@task
def say_hello():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hello, Cloud")
return "hello result"
with Flow(
"dask-local-test-flow",
storage=storage.S3(
bucket=storage_bucket_output
),
run_config=ECSRun(
image=worker_image,
labels=json.loads(os.environ["PREFECT_AGENT_LABELS"]),
task_definition=definition,
),
executor=executor,
) as flow:
hello_result = say_hello()
flow.register(project_name=project)
âHello, Cloudâ is not logged to Prefect Cloud. Just for clarification, I am still using ECSRun with an agent running in an ECSCluster. Would you prefer testing this with a local agent and local (to my machine) DaskExecutor? As a note, because this ECSRun
uses an ephemeral TaskDefinition, there is no Cloudwatch Log Group and the log statement doesnât appear to be going anywhere.Sean Harkins
03/29/2021, 9:50 PMSean Harkins
03/29/2021, 11:13 PMDaskExecutor
runs start for this cluster https://github.com/PrefectHQ/prefect/blob/master/src/prefect/executors/dask.py#L192 and instantiates the Client
I would like to be able to report the schedulerâs dashboard_link
in the Prefect Cloud UI so that people can use it for debugging their Dask distributed execution.Sean Harkins
04/01/2021, 1:41 AMJim Crist-Harif
04/01/2021, 1:57 AMSean Harkins
04/01/2021, 2:04 AMdistributed.scheduler - INFO - End scheduler at '<tcp://10.0.166.48:8786>'
Traceback (most recent call last):
File "/usr/local/bin/dask-scheduler", line 8, in <module>
sys.exit(go())
File "/usr/local/lib/python3.8/site-packages/distributed/cli/dask_scheduler.py", line 219, in go
main()
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/distributed/cli/dask_scheduler.py", line 210, in main
loop.run_sync(run)
File "/usr/local/lib/python3.8/site-packages/tornado/ioloop.py", line 529, in run_sync
raise TimeoutError("Operation timed out after %s seconds" % timeout)
tornado.util.TimeoutError: Operation timed out after None seconds
Probably unrelated to this though Iâm guessing.Sean Harkins
04/01/2021, 2:06 AMreaping stray process
is only 2 seconds so that aligns with with what you found. Thanks again.Jim Crist-Harif
04/01/2021, 2:11 AMProbably unrelated to this though Iâm guessing.Yeah, we don't run anything on the scheduler itself. The issue is that the worker shutsdown without running any of the atexit hooks. I can patch around this in prefect, but it'd be cleaner I think to push a fix upstream to distributed. I may do both.
Jim Crist-Harif
04/01/2021, 4:31 PMJim Crist-Harif
04/01/2021, 7:19 PMJim Crist-Harif
04/01/2021, 7:19 PMSean Harkins
04/01/2021, 7:20 PM