https://prefect.io logo
m

Malthe Karbo

01/18/2022, 11:03 AM
Hi everyone, I am having some trouble using the DaskExecutor with Fargate mode. I get the following error (after successfully running all flows):
Copy code
RuntimeError: IOLoop is closed
Flow example in thread and pinned versions as well
a

Anna Geller

01/18/2022, 11:09 AM
Can you move all code blocks to the thread to keep the main channel cleaner? Thanks a lot in advance!
1
m

Malthe Karbo

01/18/2022, 11:10 AM
Yes
stack trace:
Copy code
+---------+---------------+---------------+---------------+
| Package | client        | scheduler     | workers       |
+---------+---------------+---------------+---------------+
| python  | 3.9.5.final.0 | 3.9.9.final.0 | 3.9.9.final.0 |
+---------+---------------+---------------+---------------+
  warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))

[2022-01-18 12:00:32+0100] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Traceback (most recent call last):
  File "/usr/lib/python3.9/weakref.py", line 656, in _exitfunc
    f()
  File "/usr/lib/python3.9/weakref.py", line 580, in __call__
    return info.func(*info.args, **(info.kwargs or {}))
  File "/home/pathtopackage/.venv/lib/python3.9/site-packages/distributed/utils.py", line 310, in sync
    return sync(
  File "/home/pathtopackage/.venv/lib/python3.9/site-packages/distributed/utils.py", line 333, in sync
    raise RuntimeError("IOLoop is closed")
RuntimeError: IOLoop is closed
Traceback (most recent call last):
  File "/usr/lib/python3.9/weakref.py", line 656, in _exitfunc
    f()
  File "/usr/lib/python3.9/weakref.py", line 580, in __call__
    return info.func(*info.args, **(info.kwargs or {}))
  File "/home/pathtopackage/.venv/lib/python3.9/site-packages/distributed/utils.py", line 310, in sync
    return sync(
  File "/home/pathtopackage/.venv/lib/python3.9/site-packages/distributed/utils.py", line 333, in sync
    raise RuntimeError("IOLoop is closed")
RuntimeError: IOLoop is closed
Traceback (most recent call last):
  File "/usr/lib/python3.9/weakref.py", line 656, in _exitfunc
    f()
  File "/usr/lib/python3.9/weakref.py", line 580, in __call__
    return info.func(*info.args, **(info.kwargs or {}))
  File "/home/pathtopackage/.venv/lib/python3.9/site-packages/distributed/utils.py", line 310, in sync
    return sync(
  File "/home/pathtopackage/.venv/lib/python3.9/site-packages/distributed/utils.py", line 333, in sync
    raise RuntimeError("IOLoop is closed")
RuntimeError: IOLoop is closed
Traceback (most recent call last):
  File "/usr/lib/python3.9/weakref.py", line 656, in _exitfunc
    f()
  File "/usr/lib/python3.9/weakref.py", line 580, in __call__
    return info.func(*info.args, **(info.kwargs or {}))
  File "/home/pathtopackage/.venv/lib/python3.9/site-packages/distributed/utils.py", line 310, in sync
    return sync(
  File "/home/pathtopackage/.venv/lib/python3.9/site-packages/distributed/utils.py", line 333, in sync
    raise RuntimeError("IOLoop is closed")
RuntimeError: IOLoop is closed
Traceback (most recent call last):
  File "/usr/lib/python3.9/weakref.py", line 656, in _exitfunc
    f()
  File "/usr/lib/python3.9/weakref.py", line 580, in __call__
    return info.func(*info.args, **(info.kwargs or {}))
  File "/home/pathtopackage/.venv/lib/python3.9/site-packages/distributed/utils.py", line 310, in sync
    return sync(
  File "/home/pathtopackage/.venv/lib/python3.9/site-packages/distributed/utils.py", line 333, in sync
    raise RuntimeError("IOLoop is closed")
RuntimeError: IOLoop is closed
Traceback (most recent call last):
  File "/usr/lib/python3.9/weakref.py", line 656, in _exitfunc
    f()
  File "/usr/lib/python3.9/weakref.py", line 580, in __call__
    return info.func(*info.args, **(info.kwargs or {}))
  File "/home/pathtopackage/.venv/lib/python3.9/site-packages/distributed/utils.py", line 310, in sync
    return sync(
  File "/home/pathtopackage/.venv/lib/python3.9/site-packages/distributed/utils.py", line 333, in sync
    raise RuntimeError("IOLoop is closed")
RuntimeError: IOLoop is closed
flow.py
Copy code
from prefect import Flow, task  # type: ignore
import prefect
from prefect.executors import DaskExecutor  # type: ignore
from dask_cloudprovider.aws import FargateCluster # <--- I also tried without this following the example from docs.
import random
from time import sleep


@task
def inc(x):
    sleep(random.random() / 10)
    return x + 1


@task
def dec(x):
    sleep(random.random() / 10)
    return x - 1


@task
def add(x, y):
    sleep(random.random() / 10)
    return x + y


@task(log_stdout=True)
def list_sum(arr):
    logger = prefect.context.get("logger")  # type: ignore
    <http://logger.info|logger.info>(arr) <- This gets printed to cloudwatch logs
    <http://logger.info|logger.info>(sum(arr)) <- This gets printed to cloudwatch logs
    <http://logger.info|logger.info>("hello world") # <- This gets printed to cloudwatch logs
    return sum(arr)


def fargate_cluster(n_workers=4, image: str = ""):
    """Start a fargate cluster using the same image as the flow run"""
    return FargateCluster(n_workers=n_workers, image=image)


with Flow(
    "daskcloudprovider-example",
    executor=DaskExecutor(
        cluster_class=fargate_cluster,
        cluster_kwargs={"image": "prefecthq/prefect:latest-python3.9", "n_workers": 2},
        debug=True,
    ),
) as flow:
    random.seed(123)
    incs = inc.map(x=range(10))
    decs = dec.map(x=range(10))
    adds = add.map(x=incs, y=decs)
    total = list_sum(adds)
    print("Hello!")

if __name__ == "__main__":

    flow.run()
pyproject.toml:
An interesting observation: If I add a breakpoint after flow.run(), I will reach that breakpoint. It is first after closing down the initial python process that this error is raised. Thus, I can't try-catch the flow.run() to escape it. When running this flow on server or cloud, using any agent, the flow will never complete due to this error (even though all tasks have completed).
a

Anna Geller

01/18/2022, 11:26 AM
In general, within the Flow block you shouldn’t do anything but calling your tasks to define the DAG structure. Therefore, I would move the first and last line to tasks (random.seed(123) and print line). Also, when you use log_stdout=True, you don’t need to set logger, you can just use print() and it will log all messages as the default level INFO. Given that you mentioned all tasks completed successfully, it looks like something is going on in the cleanup process. I wonder what happens if you add
skip_cleanup=True
to your FargateCluster class? Or perhaps your AWS user doesn’t have some permissions to cleanup the resources (e.g. delete ECS cluster permission)? You could test it by running this with admin user permissions.
Copy code
FargateCluster(n_workers=n_workers, image=prefect.context.image, skip_cleanup=True)
m

Malthe Karbo

01/18/2022, 11:34 AM
Hi Anna, I have just tried with
skip_cleanup=True
(I also followed your other suggestions), however, I receive the exact same error message (line number references and everything). I am running with admin IAM permissions locally, I am not sure if the dask cluster uses the same permissions as I have configured via
aws configure
locally.
a

Anna Geller

01/18/2022, 11:43 AM
In that case, perhaps you can set your roles explicitly? You would need those arguments also set on your Fargate cluster class:
Copy code
task_role_arn
execution_role_arn
Here is a list of permissions from the docs:
Copy code
execution_role_arn: str (optional)
        The ARN of an existing IAM role to use for ECS execution.

        This ARN must have ``sts:AssumeRole`` allowed for
        ``<http://ecs-tasks.amazonaws.com|ecs-tasks.amazonaws.com>`` and allow the following permissions:

        - ``ecr:GetAuthorizationToken``
        - ``ecr:BatchCheckLayerAvailability``
        - ``ecr:GetDownloadUrlForLayer``
        - ``ecr:GetRepositoryPolicy``
        - ``ecr:DescribeRepositories``
        - ``ecr:ListImages``
        - ``ecr:DescribeImages``
        - ``ecr:BatchGetImage``
        - ``logs:*``
        - ``ec2:AuthorizeSecurityGroupIngress``
        - ``ec2:Describe*``
        - ``elasticloadbalancing:DeregisterInstancesFromLoadBalancer``
        - ``elasticloadbalancing:DeregisterTargets``
        - ``elasticloadbalancing:Describe*``
        - ``elasticloadbalancing:RegisterInstancesWithLoadBalancer``
        - ``elasticloadbalancing:RegisterTargets``
and for task role:
Copy code
{
  "Statement": [
    {
      "Action": [
        "ec2:AuthorizeSecurityGroupIngress",
        "ec2:CreateSecurityGroup",
        "ec2:CreateTags",
        "ec2:DescribeNetworkInterfaces",
        "ec2:DescribeSecurityGroups",
        "ec2:DescribeSubnets",
        "ec2:DescribeVpcs",
        "ec2:DeleteSecurityGroup",
        "ecs:CreateCluster",
        "ecs:DescribeTasks",
        "ecs:ListAccountSettings",
        "ecs:RegisterTaskDefinition",
        "ecs:RunTask",
        "ecs:StopTask",
        "ecs:ListClusters",
        "ecs:DescribeClusters",
        "ecs:DeleteCluster",
        "ecs:ListTaskDefinitions",
        "ecs:DescribeTaskDefinition",
        "ecs:DeregisterTaskDefinition",
        "iam:AttachRolePolicy",
        "iam:CreateRole",
        "iam:TagRole",
        "iam:PassRole",
        "iam:DeleteRole",
        "iam:ListRoles",
        "iam:ListRoleTags",
        "iam:ListAttachedRolePolicies",
        "iam:DetachRolePolicy",
        "logs:DescribeLogGroups",
        "logs:GetLogEvents",
        "logs:CreateLogGroup",
        "logs:PutRetentionPolicy"
      ],
      "Effect": "Allow",
      "Resource": [
        "*"
      ]
    }
  ],
  "Version": "2012-10-17"
}
m

Malthe Karbo

01/18/2022, 11:48 AM
Alright, will give it a go and get back to you
👍 1
Hi @Anna Geller, no luck - exact same errors even though I added two new custom policies and attached them via ARN (and I can see in ECS control pane that the correct execution / task roles with correct policies attached and trusted identities).
Here is cloudwatch log from scheduler in cluster:
Corresponding worker cloudwatch logs:
No change in local python stacktrace from my previous post.
a

Anna Geller

01/18/2022, 12:57 PM
Nice work, thanks for sharing the logs! This seems to be related to this Dask issue. The only idea I have right now is to try a lower Python and Dask version e.g. Python 3.8 to see whether an older version is more stable. I’ll ask the team, maybe someone has an idea what might be the issue here.
👀 1
m

Malthe Karbo

01/18/2022, 12:59 PM
Cheers, do you have a pinnable set of versions (dask, dask-cloudprovider, prefect) which I can try with python3.8 (I assume I can use the latest 3.8 image on docker from prefecthq?)
a

Anna Geller

01/18/2022, 1:04 PM
💯 % correct - the default version from the image should work. For the cluster class, you can use the image from the context:
Copy code
image=prefect.context.image
m

Malthe Karbo

01/18/2022, 1:23 PM
I am not setting the image class anywhere besides the cluster class, so what would this do for me?
a

Anna Geller

01/18/2022, 1:25 PM
I see. Usually it’s taken from the run config or inferred from the Python and Prefect version used in your local environment
1
m

Malthe Karbo

01/18/2022, 1:25 PM
Gotcha
I have tested now also with python3.8.12 (built from source) with same versions of dask and prefect as above - still does not work.
👍 1
Also 3.7.12 fails.
What alternatives do I have to try? This is quite the limiting factor for using prefect for us sadly, we don't want to manage static clusters (k8s or ecs) for ETL workflows if we can avoid it
I have also tried custom images where dask, dask-distributed dask-cloudproviders are bumped to various versions. No luck
a

Anna Geller

01/18/2022, 2:49 PM
Gotcha. Thanks for all the info. I will try to reproduce in my AWS account and get back to you
🙌 1
k

Kevin Kho

01/18/2022, 2:49 PM
Do you know your
distributed
version?
m

Malthe Karbo

01/18/2022, 2:49 PM
I have tried distributed 2021.12.0, 2021.11.0 and 2022.01.1 - with corresponding dask versions as well.
I will create a gh ticket (with link to this slack), just incase anyone in the future has the same issue
👍 1
a

Anna Geller

01/18/2022, 6:28 PM
@Malthe Karbo I reproduced it on my ECS cluster and in general it technically works from the Prefect side as long as permissions + package dependencies are set properly, but I agree that the cleanup process is a bit weird. I added a long answer to the Github issue. Thanks so much for raising the issue and your excellent write up of all steps you took.
🙌 1
m

Malthe Karbo

01/19/2022, 7:40 AM
Hey, thanks will go over it today. One note, I did already try with packages, using the following approach (various versions of base image and package versions) And as mentioned I also tried with permission roles as per documentation.
One follow-up question, does the cluster in ECSRun have to be created manually or is it on-demand created?
When trying to use ECSRun, on a cluster I created, running flows from server never get out of pending. a task is created, but even though I set the task and executioner ARN in the ECSRun, the task is executed without a role.
Note: I squashed all necessary policies/iam permissions into prefectTaskRole.
all tasks have "inactive" and no logging
It would appear that task definitions are never activated, and they have no task role. I did manage to get the executioner role populated.
Ok, I managed to get it running using a deployed service agent (ECS service) following the production template. I get similar results as you posted @Anna Geller on the GH issue. My main concern now is, how can I get the logs to be pushed back to the flow cloud logs? They show up in cloudwatch, but not in prefect cloud logs.
k

Kevin Kho

01/19/2022, 2:38 PM
You used the template here but don’t see logs in the Prefect UI? This is on the DaskExecutor right?
m

Malthe Karbo

01/19/2022, 2:46 PM
I see the agent logs, but not dask worker logs. See https://github.com/PrefectHQ/prefect/issues/5332
m

Malthe Karbo

01/19/2022, 2:50 PM
Yeah, sorry, all things related to ECSRun and the agent is solved. Only outstanding issue is related to tear-down of the ECS Dask cluster (https://github.com/PrefectHQ/prefect/issues/5330) and the issue related to log aggregation from dask workers / schedulers, as outlined by yourself yesterday (https://github.com/PrefectHQ/prefect/issues/5332)
👍 1
k

Kevin Kho

01/19/2022, 2:51 PM
This happens because Dask doesn’t have a mechanism to ship logs from worker to scheduler (independent of Prefect). You need some kind of external service to ship the logs to the centralized location. The reason why that doesn’t work is cuz the logger gets serialized and then sent to workers. When it gets deserialized, it loses the configuration. I think you may find more people asking about this in Dask Github issues/channels This is specific to
distributed
. For
LocalDaskExecutor
that just uses Dask, you get the logs from the processes.
m

Malthe Karbo

01/19/2022, 2:55 PM
I see, that is unfortunate, this is quite a big hit to the value proposition of using dask, which seems to be the main advocated way of scaling prefect.
Even so, I don't think this makes sense. How come CloudFlowRun logs are pushed correctly but internal log statements are not? I tried logging dict(prefect.context) within my task, and all information (backend, api_key, ..., is available).
k

Kevin Kho

01/19/2022, 3:11 PM
It depends if the task runs on the worker versus the client.
m

Malthe Karbo

01/19/2022, 3:12 PM
Task is run on the worker
k

Kevin Kho

01/19/2022, 3:14 PM
What makes you sure?
m

Malthe Karbo

01/19/2022, 3:15 PM
cloudwatch logs
I can see logs per worker, and I can see the log entries: both those that are pushed to PREFECT and those which are only visible in cloudwatch
I solved it by logging to TaskRunner:
upvote 1
k

Kevin Kho

01/19/2022, 3:39 PM
Sorry for the late response. I will look more into it, but that is pretty interesting! Thanks for sharing!
m

Malthe Karbo

01/19/2022, 3:43 PM
Yeah, so using prefect.context.get('logger') does not work, but using prefect.utilities.logging.get_logger works (doesnt need to be TaskRunner). I looked up the source code to see how the TaskRunner and FlowRunner logger was created, and they don't use prefect.context.get('logger') (or well, they do it downstream), but they use utilities.
k

Kevin Kho

01/19/2022, 8:44 PM
Did you put that TaskRunner logger inside the
task
or inside the
Flow
block?
m

Malthe Karbo

01/19/2022, 8:46 PM
Task. See linked github issues for code examples if you want to replicate
a

Anna Geller

01/19/2022, 8:46 PM
@Malthe Karbo can you share a bit more how did you get the logs to show up in the UI? I was trying to reproduce using TaskRunner, MyRunner, CloudTaskRunner - each of those failed to retrieve the logs from a task (i.e. from a dask worker node). Only when logging something from a module scope or from within the Flow block has worked using the approach you shared.
Copy code
logger = prefect.utilities.logging.get_logger("TaskRunner")
<http://logger.info|logger.info>("Sending logs from a module scope")
the only difference I had was that I did not add the
cloudwatch_logs_stream_prefix
- was it necessary to get task run logs for you? I somehow don’t fully understand it, I would really appreciate if you could share a Gist or explain.
m

Malthe Karbo

01/19/2022, 8:48 PM
I can share a gist tomorrow perhaps, I am behind schedule due to the issues discussed here but will take a look
a

Anna Geller

01/19/2022, 8:48 PM
thanks a lot!
2 Views