Hello everyone! I am Slava from Emporus. Prefect d...
# prefect-server
s
Hello everyone! I am Slava from Emporus. Prefect documentation has some examples of configurations for AWS ECS but, it doesn’t give a comprehensive recipe on how to run a workflow in ECS using the Fargate cluster (or maybe I need a Google Premium account to find it 😊) After some trial and error and by looking at other tutorials online we seem to find an almost working solution. We are trying to use the slef-hosted Perfect server on EC2 in AWS using the “ECSRun” and “DaskExecutor” configuration inside the flow definition script to run the workflow in ECS using Fargate cluster. We are also, building a custom Docker image using Python 3.10. At first, we had an issue with Dask compression but, sorted it out by providing an environment variable
DASK_DISTRIBUTED__COMM__COMPRESSION=zlib
and now we getting a puzzling exception: We didn’t use Conda explicitly at any stage. And yet, we see something weird. All of the stack trace is from Python 3.10 except the last line, which is from Python 3.8.
k
Hi @Slava Shor, can we move the traceback into the thread? I think this has to do with a serialized Flow storage. You build a custom image using Python 3.10, did you add the Flow to that image?
a
If you need more examples for using ECSRun, check out this repo and for using Dask Cloud Provider for ECS check this issue
s
Hi @Anna Geller we have seen your on response GitHub before and did add
dask-cloudprovider[aws]
to our image.
I am not sure what do you mean by moving trace back into the tread, @Kevin Kho
k
Ah sorry I didn’t complete that sentence. I meant move the traceback to this thread so we can keep the main channel a bit more compact
s
Our
Dockerfile
looks like this:
Copy code
FROM python:3.10-slim

# Set system locale
ENV LC_ALL C.UTF-8
ENV LANG C.UTF-8
# TODO: Extract hard-codded values
# Set Flow wars
ENV ACCOUNT_ID [censored]
ENV REGION us-east-1
ENV ECR_REPO emporus-prefect
ENV ECR_IMAGE emporus-prefect
ENV ECR_TAG latest
ENV FLOW_NAME ecs-demo
ENV TASK_ROLE Prefect_Task_Role
ENV AGENT_ROLE Prefect_Agent_Role

ENV PREFECT__BACKEND server
ENV PREFECT__CLOUD__AGENT__LABELS ['dev']
ENV PREFECT__CLOUD__API http://[censored]:4200
ENV PREFECT__CLOUD__SEND_FLOW_RUN_LOGS true
ENV PREFECT__CLOUD__USE_LOCAL_SECRETS false
ENV PREFECT__ENGINE__FLOW_RUNNER__DEFAULT_CLASS prefect.engine.cloud.CloudFlowRunner
ENV PREFECT__ENGINE__TASK_RUNNER__DEFAULT_CLASS prefect.engine.cloud.CloudTaskRunner
ENV PREFECT__LOGGING__LEVEL DEBUG
ENV PREFECT__LOGGING__LOG_TO_CLOUD true

# Image Labels
LABEL maintainer="[censored]"

RUN apt update && apt install -y gcc git build-essential
COPY dist/*.whl .
RUN python3.10 -m pip install --no-cache-dir -U pip dask-cloudprovider[aws] *.whl
RUN apt purge -y gcc git build-essential && apt clean && apt autoremove -y
RUN rm -rf *.whl /var/lib/apt*
ENV PATH /usr/local/bin/:$PATH
RUN flow-register
flow-register
is a Python package script that does what it says: registers a flow from inside the docker image.
Stacktrace:
Copy code
Unexpected error: ModuleNotFoundError("No module named 'prefect'")
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/prefect/engine/flow_runner.py", line 542, in get_flow_run_state
    upstream_states = executor.wait(
  File "/usr/local/lib/python3.10/site-packages/prefect/executors/dask.py", line 440, in wait
    return self.client.gather(futures)
  File "/usr/local/lib/python3.10/site-packages/distributed/client.py", line 2146, in gather
    return self.sync(
  File "/usr/local/lib/python3.10/site-packages/distributed/utils.py", line 309, in sync
    return sync(
  File "/usr/local/lib/python3.10/site-packages/distributed/utils.py", line 376, in sync
    raise exc.with_traceback(tb)
  File "/usr/local/lib/python3.10/site-packages/distributed/utils.py", line 349, in f
    result = yield future
  File "/usr/local/lib/python3.10/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/usr/local/lib/python3.10/site-packages/distributed/client.py", line 2009, in _gather
    raise exception.with_traceback(traceback)
  File "/opt/conda/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 75, in loads
ModuleNotFoundError: No module named 'prefect'
k
Can I see then how you defined Storage in the
Flow
?
s
I noticed if we register flow from the machine we’re using to build it then it fails to execute it inside the container because it is trying to load it from the place it was registered on the Docker host machine (used to build the container image)
k
That sounds like you didn’t define any Storage?
s
Copy code
FLOW_NAME = os.environ["FLOW_NAME"]
TASK_ROLE = os.environ["TASK_ROLE"]
AGENT_ROLE = os.environ["AGENT_ROLE"]
ACCOUNT_ID = os.environ["ACCOUNT_ID"]
REGION = os.environ["REGION"]
ECR_REPO = os.environ["ECR_REPO"]
ECR_TAG = os.environ["ECR_TAG"]

STORAGE = S3(
    bucket="prefect-datasets",
    key=f"flows/{FLOW_NAME}.py",
    stored_as_script=True,
    # this will ensure to upload the Flow script to S3 during registration
    local_script_path=f"{FLOW_NAME}.py",
)

RUN_CONFIG = ECSRun(
    labels=["dev"],
    run_task_kwargs=dict(
        cluster="prefect",
        launchType="FARGATE",
        networkConfiguration={
            "awsvpcConfiguration": {
                "subnets": [
                    "...",
                    "...",
                ],
                "securityGroups": [
                    "...",
                ],
                "assignPublicIp": "ENABLED",
            }
        },
    ),
    task_definition_arn="arn:aws:ecs:us-east-1:...:task-definition/...",
)

EXECUTOR = DaskExecutor(
    cluster_class="dask_cloudprovider.aws.FargateCluster",
    adapt_kwargs={"maximum": 10},
)

with Flow(FLOW_NAME, executor=EXECUTOR, run_config=RUN_CONFIG) as flow:
    ...
k
Is it right that your S3 storage is not attached to the Flow?
s
Perhaps…
k
It might fix your problem once you do. Not sure but you need to anyway 😅
s
Got it. I am checking now with:
Copy code
with Flow(FLOW_NAME, executor=EXECUTOR, run_config=RUN_CONFIG, storage=STORAGE) as flow:
I must say, the errors we’ve seen on the way were quite cryptic.
k
I still don’t know about the
prefect ModuleNotFound
but if you don’t attach a storage, Prefect uses a default LocalStorage and saves the file locally, which is why it’s looking on the local machine for the file that contains the Flow
s
That make sense @Kevin Kho. I still do not understand why I have to populate
PREFECT__*
environment variable to the docker image. Isn’t it some thing that Agent should take care of?
k
If you are using an agent that creates a new container like ECS/Docker/Kubernetes, these spin up a new Python process so environment variables don’t carry over by default. This is why Prefect exposes the
--env
flag when you spin up an agent to set environment variables that will be copied over to the Flow run. Otherwise, you’d have to define it on the new process (in the container)
s
So no way to make this process automatic?
BTW @Kevin Kho if I am building custom Docker image, can I register flow outside of the container and it will work?
a
Yes, you can register the flow independently of when you build the docker image. Since you face an issue that “prefect” is not found in your container, can you start with a Prefect base image instead of python slim? It’s recommended to use Prefect base image for Prefect flows and makes things easier If you’re looking for example, the same repo I shared before has one
k
Setting the env variables? It’s automatic if you’re on Prefect Cloud. You ultimately need to configure to point to a Server if you are using Server because we won’t know where the Server is hosted. Maybe you can put it in the
config.toml
of the container, but that’s pretty much the same thing, just set differently.
upvote 1
s
So, basically I just need to pass only my Perfect server URL to the workflow at runtime and my custom variables to my workflow, if I define such. Passing them at start of the workflow task from Prefect UI should be sufficient, if I understand correctly?
Yes, @Anna Geller, I have no troubles to start Perfect from my Docker image based on Python 3.10 and also, I can run flow registration.
a
@Slava Shor afaik, you need only those 3 env variables when deploying an agent for Server rather than Cloud https://discourse.prefect.io/t/how-to-set-server-endpoint-when-deploying-a-kubernetes-agent/181
Copy code
env:
        - name: PREFECT__CLOUD__AGENT__AUTH_TOKEN
          value: ''
        - name: PREFECT__CLOUD__API
          value: "<http://some_ip:4200/graphql>" # paste your GraphQL Server endpoint here
        - name: PREFECT__BACKEND
          value: server
s
🤔 We did not used
PREFECT__CLOUD__AGENT__AUTH_TOKEN
as we’re using Prefect server self-hosted on EC2
a
that's fine, but if you set it, it should be set to empty string 🙂 tokens are deprecated, so all good
s
Hm… Weird. After fixing S3 storage, an ECS task shows endless loop of the same exception: in logs and flow is being stuck in submitted state:
Copy code
/usr/local/lib/python3.10/site-packages/prefect/utilities/logging.py:126: UserWarning: Failed to write logs with error: HTTPError('413 Client Error: Payload Too Large for url: http://********:4200/'), Pending log length: 3,000,240, Max batch log length: 4,000,000, Queue size: 3,617
k
That’s happening even before the flow run? That looks like you are surpassing the API limit with logging. Did you configure anything on any of the Prefect services?
s
Can you elaborate more @Kevin Kho or perhaps focus me on specific configuration?
k
I don’t have anything specific but maybe something like changing API limits?
I take that back. Something is weird here because the pending log length isn’t even at your maximum. Does a Flow off of ECS work?
s
It was working before I’ve configured Dask executer with this configuration (inside a workflow definition script)
Copy code
EXECUTOR = DaskExecutor(
    cluster_class="dask_cloudprovider.aws.FargateCluster",
    adapt_kwargs={"maximum": 10},
)
k
Oh wow so ECS works. First time I’ve seen something like this. I suspect it has to do with the adapt kwargs. I find that it tends to cause issues in general
s
Also, @Kevin Kho there is something weird I do not understand. My
ECSRun
looks like :
Copy code
RUN_CONFIG = ECSRun(
    labels=["dev"],
    run_task_kwargs=dict(
        cluster="prefect",
        launchType="FARGATE",
        networkConfiguration={
            "awsvpcConfiguration": {
                "subnets": [
                    "...",
                ],
                "securityGroups": [
                    "...",
                ],
                "assignPublicIp": "ENABLED",
            }
        },
    ),
    task_definition_arn="arn:aws:ecs:...",
)
And when we trigger a workflow there is an ECS task inside
prefect
ECS cluster but it seems like an agent spin another cluster dynamically looking like:
dask-098fc57b-3
k
The Dask executor creates another pod/container to submit tasks to. It creates a cluster. If you want it to be in the same container then you would use the LocalDaskExecutor
s
Oh… So
LocalDaskExecutor
doesn’t mean running in local machine?
k
LocalDask is the same process. It’s equivalent to a multiprocessing pool. DaskExecutor is another machine
s
We had a successful execution before on a single pod on ECS using same
DaskExecutor
just without configuration I showed before. But we wanted to benefit from scalability, otherwise it is quite pointless for us.
Is there any recipe to run Prefect workflows in an AWS ECS with autoscaling while using custom Prefect server hosted on the same ECS in a container or simply on EC2?
a
We don't have any Server recipes for ECS, but if you would want to use AWS EKS instead, we have a helm chart that allows you to deploy all server components and you could even have a Kubernetes agent deployed to the same EKS cluster as part of the helm chart. The only Server tutorial with ECS I saw was this one: https://towardsdatascience.com/deploying-prefect-server-with-aws-ecs-fargate-and-docker-storage-36f633226c5f - maybe it can help you still
k
Was busy. This should work that error is weird. We do have a lot of users on autoscaling (with instability from Dask sometimes) but it should not trigger that kind of error.