Malthe Karbo
01/18/2022, 11:03 AMRuntimeError: IOLoop is closed
Flow example in thread and pinned versions as wellAnna Geller
Malthe Karbo
01/18/2022, 11:10 AM+---------+---------------+---------------+---------------+
| Package | client | scheduler | workers |
+---------+---------------+---------------+---------------+
| python | 3.9.5.final.0 | 3.9.9.final.0 | 3.9.9.final.0 |
+---------+---------------+---------------+---------------+
warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
[2022-01-18 12:00:32+0100] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Traceback (most recent call last):
File "/usr/lib/python3.9/weakref.py", line 656, in _exitfunc
f()
File "/usr/lib/python3.9/weakref.py", line 580, in __call__
return info.func(*info.args, **(info.kwargs or {}))
File "/home/pathtopackage/.venv/lib/python3.9/site-packages/distributed/utils.py", line 310, in sync
return sync(
File "/home/pathtopackage/.venv/lib/python3.9/site-packages/distributed/utils.py", line 333, in sync
raise RuntimeError("IOLoop is closed")
RuntimeError: IOLoop is closed
Traceback (most recent call last):
File "/usr/lib/python3.9/weakref.py", line 656, in _exitfunc
f()
File "/usr/lib/python3.9/weakref.py", line 580, in __call__
return info.func(*info.args, **(info.kwargs or {}))
File "/home/pathtopackage/.venv/lib/python3.9/site-packages/distributed/utils.py", line 310, in sync
return sync(
File "/home/pathtopackage/.venv/lib/python3.9/site-packages/distributed/utils.py", line 333, in sync
raise RuntimeError("IOLoop is closed")
RuntimeError: IOLoop is closed
Traceback (most recent call last):
File "/usr/lib/python3.9/weakref.py", line 656, in _exitfunc
f()
File "/usr/lib/python3.9/weakref.py", line 580, in __call__
return info.func(*info.args, **(info.kwargs or {}))
File "/home/pathtopackage/.venv/lib/python3.9/site-packages/distributed/utils.py", line 310, in sync
return sync(
File "/home/pathtopackage/.venv/lib/python3.9/site-packages/distributed/utils.py", line 333, in sync
raise RuntimeError("IOLoop is closed")
RuntimeError: IOLoop is closed
Traceback (most recent call last):
File "/usr/lib/python3.9/weakref.py", line 656, in _exitfunc
f()
File "/usr/lib/python3.9/weakref.py", line 580, in __call__
return info.func(*info.args, **(info.kwargs or {}))
File "/home/pathtopackage/.venv/lib/python3.9/site-packages/distributed/utils.py", line 310, in sync
return sync(
File "/home/pathtopackage/.venv/lib/python3.9/site-packages/distributed/utils.py", line 333, in sync
raise RuntimeError("IOLoop is closed")
RuntimeError: IOLoop is closed
Traceback (most recent call last):
File "/usr/lib/python3.9/weakref.py", line 656, in _exitfunc
f()
File "/usr/lib/python3.9/weakref.py", line 580, in __call__
return info.func(*info.args, **(info.kwargs or {}))
File "/home/pathtopackage/.venv/lib/python3.9/site-packages/distributed/utils.py", line 310, in sync
return sync(
File "/home/pathtopackage/.venv/lib/python3.9/site-packages/distributed/utils.py", line 333, in sync
raise RuntimeError("IOLoop is closed")
RuntimeError: IOLoop is closed
Traceback (most recent call last):
File "/usr/lib/python3.9/weakref.py", line 656, in _exitfunc
f()
File "/usr/lib/python3.9/weakref.py", line 580, in __call__
return info.func(*info.args, **(info.kwargs or {}))
File "/home/pathtopackage/.venv/lib/python3.9/site-packages/distributed/utils.py", line 310, in sync
return sync(
File "/home/pathtopackage/.venv/lib/python3.9/site-packages/distributed/utils.py", line 333, in sync
raise RuntimeError("IOLoop is closed")
RuntimeError: IOLoop is closed
from prefect import Flow, task # type: ignore
import prefect
from prefect.executors import DaskExecutor # type: ignore
from dask_cloudprovider.aws import FargateCluster # <--- I also tried without this following the example from docs.
import random
from time import sleep
@task
def inc(x):
sleep(random.random() / 10)
return x + 1
@task
def dec(x):
sleep(random.random() / 10)
return x - 1
@task
def add(x, y):
sleep(random.random() / 10)
return x + y
@task(log_stdout=True)
def list_sum(arr):
logger = prefect.context.get("logger") # type: ignore
<http://logger.info|logger.info>(arr) <- This gets printed to cloudwatch logs
<http://logger.info|logger.info>(sum(arr)) <- This gets printed to cloudwatch logs
<http://logger.info|logger.info>("hello world") # <- This gets printed to cloudwatch logs
return sum(arr)
def fargate_cluster(n_workers=4, image: str = ""):
"""Start a fargate cluster using the same image as the flow run"""
return FargateCluster(n_workers=n_workers, image=image)
with Flow(
"daskcloudprovider-example",
executor=DaskExecutor(
cluster_class=fargate_cluster,
cluster_kwargs={"image": "prefecthq/prefect:latest-python3.9", "n_workers": 2},
debug=True,
),
) as flow:
random.seed(123)
incs = inc.map(x=range(10))
decs = dec.map(x=range(10))
adds = add.map(x=incs, y=decs)
total = list_sum(adds)
print("Hello!")
if __name__ == "__main__":
flow.run()
Anna Geller
skip_cleanup=True
to your FargateCluster class? Or perhaps your AWS user doesn’t have some permissions to cleanup the resources (e.g. delete ECS cluster permission)? You could test it by running this with admin user permissions.
FargateCluster(n_workers=n_workers, image=prefect.context.image, skip_cleanup=True)
Malthe Karbo
01/18/2022, 11:34 AMskip_cleanup=True
(I also followed your other suggestions), however, I receive the exact same error message (line number references and everything).
I am running with admin IAM permissions locally, I am not sure if the dask cluster uses the same permissions as I have configured via aws configure
locally.Anna Geller
task_role_arn
execution_role_arn
Here is a list of permissions from the docs:
execution_role_arn: str (optional)
The ARN of an existing IAM role to use for ECS execution.
This ARN must have ``sts:AssumeRole`` allowed for
``<http://ecs-tasks.amazonaws.com|ecs-tasks.amazonaws.com>`` and allow the following permissions:
- ``ecr:GetAuthorizationToken``
- ``ecr:BatchCheckLayerAvailability``
- ``ecr:GetDownloadUrlForLayer``
- ``ecr:GetRepositoryPolicy``
- ``ecr:DescribeRepositories``
- ``ecr:ListImages``
- ``ecr:DescribeImages``
- ``ecr:BatchGetImage``
- ``logs:*``
- ``ec2:AuthorizeSecurityGroupIngress``
- ``ec2:Describe*``
- ``elasticloadbalancing:DeregisterInstancesFromLoadBalancer``
- ``elasticloadbalancing:DeregisterTargets``
- ``elasticloadbalancing:Describe*``
- ``elasticloadbalancing:RegisterInstancesWithLoadBalancer``
- ``elasticloadbalancing:RegisterTargets``
and for task role:
{
"Statement": [
{
"Action": [
"ec2:AuthorizeSecurityGroupIngress",
"ec2:CreateSecurityGroup",
"ec2:CreateTags",
"ec2:DescribeNetworkInterfaces",
"ec2:DescribeSecurityGroups",
"ec2:DescribeSubnets",
"ec2:DescribeVpcs",
"ec2:DeleteSecurityGroup",
"ecs:CreateCluster",
"ecs:DescribeTasks",
"ecs:ListAccountSettings",
"ecs:RegisterTaskDefinition",
"ecs:RunTask",
"ecs:StopTask",
"ecs:ListClusters",
"ecs:DescribeClusters",
"ecs:DeleteCluster",
"ecs:ListTaskDefinitions",
"ecs:DescribeTaskDefinition",
"ecs:DeregisterTaskDefinition",
"iam:AttachRolePolicy",
"iam:CreateRole",
"iam:TagRole",
"iam:PassRole",
"iam:DeleteRole",
"iam:ListRoles",
"iam:ListRoleTags",
"iam:ListAttachedRolePolicies",
"iam:DetachRolePolicy",
"logs:DescribeLogGroups",
"logs:GetLogEvents",
"logs:CreateLogGroup",
"logs:PutRetentionPolicy"
],
"Effect": "Allow",
"Resource": [
"*"
]
}
],
"Version": "2012-10-17"
}
Malthe Karbo
01/18/2022, 11:48 AMAnna Geller
Malthe Karbo
01/18/2022, 12:59 PMAnna Geller
image=prefect.context.image
Malthe Karbo
01/18/2022, 1:23 PMAnna Geller
Malthe Karbo
01/18/2022, 1:25 PMAnna Geller
Kevin Kho
distributed
version?Malthe Karbo
01/18/2022, 2:49 PMAnna Geller
Malthe Karbo
01/19/2022, 7:40 AMKevin Kho
Malthe Karbo
01/19/2022, 2:46 PMAnna Geller
Malthe Karbo
01/19/2022, 2:50 PMKevin Kho
distributed
. For LocalDaskExecutor
that just uses Dask, you get the logs from the processes.Malthe Karbo
01/19/2022, 2:55 PMKevin Kho
Malthe Karbo
01/19/2022, 3:12 PMKevin Kho
Malthe Karbo
01/19/2022, 3:15 PMKevin Kho
Malthe Karbo
01/19/2022, 3:43 PMKevin Kho
task
or inside the Flow
block?Malthe Karbo
01/19/2022, 8:46 PMAnna Geller
logger = prefect.utilities.logging.get_logger("TaskRunner")
<http://logger.info|logger.info>("Sending logs from a module scope")
cloudwatch_logs_stream_prefix
- was it necessary to get task run logs for you? I somehow don’t fully understand it, I would really appreciate if you could share a Gist or explain.Malthe Karbo
01/19/2022, 8:48 PMAnna Geller