Malthe Karbo
01/18/2022, 11:03 AMRuntimeError: IOLoop is closedAnna Geller
Malthe Karbo
01/18/2022, 11:10 AMMalthe Karbo
01/18/2022, 11:11 AM+---------+---------------+---------------+---------------+
| Package | client        | scheduler     | workers       |
+---------+---------------+---------------+---------------+
| python  | 3.9.5.final.0 | 3.9.9.final.0 | 3.9.9.final.0 |
+---------+---------------+---------------+---------------+
  warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
[2022-01-18 12:00:32+0100] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Traceback (most recent call last):
  File "/usr/lib/python3.9/weakref.py", line 656, in _exitfunc
    f()
  File "/usr/lib/python3.9/weakref.py", line 580, in __call__
    return info.func(*info.args, **(info.kwargs or {}))
  File "/home/pathtopackage/.venv/lib/python3.9/site-packages/distributed/utils.py", line 310, in sync
    return sync(
  File "/home/pathtopackage/.venv/lib/python3.9/site-packages/distributed/utils.py", line 333, in sync
    raise RuntimeError("IOLoop is closed")
RuntimeError: IOLoop is closed
Traceback (most recent call last):
  File "/usr/lib/python3.9/weakref.py", line 656, in _exitfunc
    f()
  File "/usr/lib/python3.9/weakref.py", line 580, in __call__
    return info.func(*info.args, **(info.kwargs or {}))
  File "/home/pathtopackage/.venv/lib/python3.9/site-packages/distributed/utils.py", line 310, in sync
    return sync(
  File "/home/pathtopackage/.venv/lib/python3.9/site-packages/distributed/utils.py", line 333, in sync
    raise RuntimeError("IOLoop is closed")
RuntimeError: IOLoop is closed
Traceback (most recent call last):
  File "/usr/lib/python3.9/weakref.py", line 656, in _exitfunc
    f()
  File "/usr/lib/python3.9/weakref.py", line 580, in __call__
    return info.func(*info.args, **(info.kwargs or {}))
  File "/home/pathtopackage/.venv/lib/python3.9/site-packages/distributed/utils.py", line 310, in sync
    return sync(
  File "/home/pathtopackage/.venv/lib/python3.9/site-packages/distributed/utils.py", line 333, in sync
    raise RuntimeError("IOLoop is closed")
RuntimeError: IOLoop is closed
Traceback (most recent call last):
  File "/usr/lib/python3.9/weakref.py", line 656, in _exitfunc
    f()
  File "/usr/lib/python3.9/weakref.py", line 580, in __call__
    return info.func(*info.args, **(info.kwargs or {}))
  File "/home/pathtopackage/.venv/lib/python3.9/site-packages/distributed/utils.py", line 310, in sync
    return sync(
  File "/home/pathtopackage/.venv/lib/python3.9/site-packages/distributed/utils.py", line 333, in sync
    raise RuntimeError("IOLoop is closed")
RuntimeError: IOLoop is closed
Traceback (most recent call last):
  File "/usr/lib/python3.9/weakref.py", line 656, in _exitfunc
    f()
  File "/usr/lib/python3.9/weakref.py", line 580, in __call__
    return info.func(*info.args, **(info.kwargs or {}))
  File "/home/pathtopackage/.venv/lib/python3.9/site-packages/distributed/utils.py", line 310, in sync
    return sync(
  File "/home/pathtopackage/.venv/lib/python3.9/site-packages/distributed/utils.py", line 333, in sync
    raise RuntimeError("IOLoop is closed")
RuntimeError: IOLoop is closed
Traceback (most recent call last):
  File "/usr/lib/python3.9/weakref.py", line 656, in _exitfunc
    f()
  File "/usr/lib/python3.9/weakref.py", line 580, in __call__
    return info.func(*info.args, **(info.kwargs or {}))
  File "/home/pathtopackage/.venv/lib/python3.9/site-packages/distributed/utils.py", line 310, in sync
    return sync(
  File "/home/pathtopackage/.venv/lib/python3.9/site-packages/distributed/utils.py", line 333, in sync
    raise RuntimeError("IOLoop is closed")
RuntimeError: IOLoop is closedMalthe Karbo
01/18/2022, 11:11 AMfrom prefect import Flow, task  # type: ignore
import prefect
from prefect.executors import DaskExecutor  # type: ignore
from dask_cloudprovider.aws import FargateCluster # <--- I also tried without this following the example from docs.
import random
from time import sleep
@task
def inc(x):
    sleep(random.random() / 10)
    return x + 1
@task
def dec(x):
    sleep(random.random() / 10)
    return x - 1
@task
def add(x, y):
    sleep(random.random() / 10)
    return x + y
@task(log_stdout=True)
def list_sum(arr):
    logger = prefect.context.get("logger")  # type: ignore
    <http://logger.info|logger.info>(arr) <- This gets printed to cloudwatch logs
    <http://logger.info|logger.info>(sum(arr)) <- This gets printed to cloudwatch logs
    <http://logger.info|logger.info>("hello world") # <- This gets printed to cloudwatch logs
    return sum(arr)
def fargate_cluster(n_workers=4, image: str = ""):
    """Start a fargate cluster using the same image as the flow run"""
    return FargateCluster(n_workers=n_workers, image=image)
with Flow(
    "daskcloudprovider-example",
    executor=DaskExecutor(
        cluster_class=fargate_cluster,
        cluster_kwargs={"image": "prefecthq/prefect:latest-python3.9", "n_workers": 2},
        debug=True,
    ),
) as flow:
    random.seed(123)
    incs = inc.map(x=range(10))
    decs = dec.map(x=range(10))
    adds = add.map(x=incs, y=decs)
    total = list_sum(adds)
    print("Hello!")
if __name__ == "__main__":
    flow.run()Malthe Karbo
01/18/2022, 11:12 AMMalthe Karbo
01/18/2022, 11:22 AMAnna Geller
skip_cleanup=TrueFargateCluster(n_workers=n_workers, image=prefect.context.image, skip_cleanup=True)Malthe Karbo
01/18/2022, 11:34 AMskip_cleanup=Trueaws configureAnna Geller
task_role_arn
execution_role_arnexecution_role_arn: str (optional)
        The ARN of an existing IAM role to use for ECS execution.
        This ARN must have ``sts:AssumeRole`` allowed for
        ``<http://ecs-tasks.amazonaws.com|ecs-tasks.amazonaws.com>`` and allow the following permissions:
        - ``ecr:GetAuthorizationToken``
        - ``ecr:BatchCheckLayerAvailability``
        - ``ecr:GetDownloadUrlForLayer``
        - ``ecr:GetRepositoryPolicy``
        - ``ecr:DescribeRepositories``
        - ``ecr:ListImages``
        - ``ecr:DescribeImages``
        - ``ecr:BatchGetImage``
        - ``logs:*``
        - ``ec2:AuthorizeSecurityGroupIngress``
        - ``ec2:Describe*``
        - ``elasticloadbalancing:DeregisterInstancesFromLoadBalancer``
        - ``elasticloadbalancing:DeregisterTargets``
        - ``elasticloadbalancing:Describe*``
        - ``elasticloadbalancing:RegisterInstancesWithLoadBalancer``
        - ``elasticloadbalancing:RegisterTargets``{
  "Statement": [
    {
      "Action": [
        "ec2:AuthorizeSecurityGroupIngress",
        "ec2:CreateSecurityGroup",
        "ec2:CreateTags",
        "ec2:DescribeNetworkInterfaces",
        "ec2:DescribeSecurityGroups",
        "ec2:DescribeSubnets",
        "ec2:DescribeVpcs",
        "ec2:DeleteSecurityGroup",
        "ecs:CreateCluster",
        "ecs:DescribeTasks",
        "ecs:ListAccountSettings",
        "ecs:RegisterTaskDefinition",
        "ecs:RunTask",
        "ecs:StopTask",
        "ecs:ListClusters",
        "ecs:DescribeClusters",
        "ecs:DeleteCluster",
        "ecs:ListTaskDefinitions",
        "ecs:DescribeTaskDefinition",
        "ecs:DeregisterTaskDefinition",
        "iam:AttachRolePolicy",
        "iam:CreateRole",
        "iam:TagRole",
        "iam:PassRole",
        "iam:DeleteRole",
        "iam:ListRoles",
        "iam:ListRoleTags",
        "iam:ListAttachedRolePolicies",
        "iam:DetachRolePolicy",
        "logs:DescribeLogGroups",
        "logs:GetLogEvents",
        "logs:CreateLogGroup",
        "logs:PutRetentionPolicy"
      ],
      "Effect": "Allow",
      "Resource": [
        "*"
      ]
    }
  ],
  "Version": "2012-10-17"
}Malthe Karbo
01/18/2022, 11:48 AMMalthe Karbo
01/18/2022, 12:43 PMMalthe Karbo
01/18/2022, 12:45 PMMalthe Karbo
01/18/2022, 12:47 PMMalthe Karbo
01/18/2022, 12:48 PMAnna Geller
Malthe Karbo
01/18/2022, 12:59 PMAnna Geller
image=prefect.context.imageMalthe Karbo
01/18/2022, 1:23 PMAnna Geller
Malthe Karbo
01/18/2022, 1:25 PMMalthe Karbo
01/18/2022, 1:37 PMMalthe Karbo
01/18/2022, 1:52 PMMalthe Karbo
01/18/2022, 1:55 PMMalthe Karbo
01/18/2022, 2:15 PMAnna Geller
Kevin Kho
distributedMalthe Karbo
01/18/2022, 2:49 PMMalthe Karbo
01/18/2022, 2:50 PMMalthe Karbo
01/18/2022, 2:57 PMAnna Geller
Malthe Karbo
01/19/2022, 7:40 AMMalthe Karbo
01/19/2022, 7:43 AMMalthe Karbo
01/19/2022, 8:34 AMMalthe Karbo
01/19/2022, 8:35 AMMalthe Karbo
01/19/2022, 8:47 AMMalthe Karbo
01/19/2022, 9:46 AMMalthe Karbo
01/19/2022, 12:08 PMKevin Kho
Malthe Karbo
01/19/2022, 2:46 PMAnna Geller
Malthe Karbo
01/19/2022, 2:50 PMKevin Kho
distributedLocalDaskExecutorMalthe Karbo
01/19/2022, 2:55 PMMalthe Karbo
01/19/2022, 3:10 PMKevin Kho
Malthe Karbo
01/19/2022, 3:12 PMKevin Kho
Malthe Karbo
01/19/2022, 3:15 PMMalthe Karbo
01/19/2022, 3:15 PMMalthe Karbo
01/19/2022, 3:30 PMKevin Kho
Malthe Karbo
01/19/2022, 3:43 PMMalthe Karbo
01/19/2022, 3:44 PMKevin Kho
taskFlowMalthe Karbo
01/19/2022, 8:46 PMAnna Geller
logger = prefect.utilities.logging.get_logger("TaskRunner")
<http://logger.info|logger.info>("Sending logs from a module scope")Anna Geller
cloudwatch_logs_stream_prefixMalthe Karbo
01/19/2022, 8:48 PMAnna Geller
