Slava Shor
02/20/2022, 4:19 PMDASK_DISTRIBUTED__COMM__COMPRESSION=zlib
and now we getting a puzzling exception:
We didn’t use Conda explicitly at any stage. And yet, we see something weird. All of the stack trace is from Python 3.10 except the last line, which is from Python 3.8.Kevin Kho
02/20/2022, 4:21 PMAnna Geller
02/20/2022, 4:34 PMSlava Shor
02/20/2022, 4:47 PMdask-cloudprovider[aws]
to our image.Kevin Kho
02/20/2022, 4:50 PMSlava Shor
02/20/2022, 4:53 PMDockerfile
looks like this:
FROM python:3.10-slim
# Set system locale
ENV LC_ALL C.UTF-8
ENV LANG C.UTF-8
# TODO: Extract hard-codded values
# Set Flow wars
ENV ACCOUNT_ID [censored]
ENV REGION us-east-1
ENV ECR_REPO emporus-prefect
ENV ECR_IMAGE emporus-prefect
ENV ECR_TAG latest
ENV FLOW_NAME ecs-demo
ENV TASK_ROLE Prefect_Task_Role
ENV AGENT_ROLE Prefect_Agent_Role
ENV PREFECT__BACKEND server
ENV PREFECT__CLOUD__AGENT__LABELS ['dev']
ENV PREFECT__CLOUD__API http://[censored]:4200
ENV PREFECT__CLOUD__SEND_FLOW_RUN_LOGS true
ENV PREFECT__CLOUD__USE_LOCAL_SECRETS false
ENV PREFECT__ENGINE__FLOW_RUNNER__DEFAULT_CLASS prefect.engine.cloud.CloudFlowRunner
ENV PREFECT__ENGINE__TASK_RUNNER__DEFAULT_CLASS prefect.engine.cloud.CloudTaskRunner
ENV PREFECT__LOGGING__LEVEL DEBUG
ENV PREFECT__LOGGING__LOG_TO_CLOUD true
# Image Labels
LABEL maintainer="[censored]"
RUN apt update && apt install -y gcc git build-essential
COPY dist/*.whl .
RUN python3.10 -m pip install --no-cache-dir -U pip dask-cloudprovider[aws] *.whl
RUN apt purge -y gcc git build-essential && apt clean && apt autoremove -y
RUN rm -rf *.whl /var/lib/apt*
ENV PATH /usr/local/bin/:$PATH
RUN flow-register
flow-register
is a Python package script that does what it says: registers a flow from inside the docker image.Unexpected error: ModuleNotFoundError("No module named 'prefect'")
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/prefect/engine/flow_runner.py", line 542, in get_flow_run_state
upstream_states = executor.wait(
File "/usr/local/lib/python3.10/site-packages/prefect/executors/dask.py", line 440, in wait
return self.client.gather(futures)
File "/usr/local/lib/python3.10/site-packages/distributed/client.py", line 2146, in gather
return self.sync(
File "/usr/local/lib/python3.10/site-packages/distributed/utils.py", line 309, in sync
return sync(
File "/usr/local/lib/python3.10/site-packages/distributed/utils.py", line 376, in sync
raise exc.with_traceback(tb)
File "/usr/local/lib/python3.10/site-packages/distributed/utils.py", line 349, in f
result = yield future
File "/usr/local/lib/python3.10/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/usr/local/lib/python3.10/site-packages/distributed/client.py", line 2009, in _gather
raise exception.with_traceback(traceback)
File "/opt/conda/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 75, in loads
ModuleNotFoundError: No module named 'prefect'
Kevin Kho
02/20/2022, 4:56 PMFlow
?Slava Shor
02/20/2022, 4:56 PMKevin Kho
02/20/2022, 4:59 PMSlava Shor
02/20/2022, 4:59 PMFLOW_NAME = os.environ["FLOW_NAME"]
TASK_ROLE = os.environ["TASK_ROLE"]
AGENT_ROLE = os.environ["AGENT_ROLE"]
ACCOUNT_ID = os.environ["ACCOUNT_ID"]
REGION = os.environ["REGION"]
ECR_REPO = os.environ["ECR_REPO"]
ECR_TAG = os.environ["ECR_TAG"]
STORAGE = S3(
bucket="prefect-datasets",
key=f"flows/{FLOW_NAME}.py",
stored_as_script=True,
# this will ensure to upload the Flow script to S3 during registration
local_script_path=f"{FLOW_NAME}.py",
)
RUN_CONFIG = ECSRun(
labels=["dev"],
run_task_kwargs=dict(
cluster="prefect",
launchType="FARGATE",
networkConfiguration={
"awsvpcConfiguration": {
"subnets": [
"...",
"...",
],
"securityGroups": [
"...",
],
"assignPublicIp": "ENABLED",
}
},
),
task_definition_arn="arn:aws:ecs:us-east-1:...:task-definition/...",
)
EXECUTOR = DaskExecutor(
cluster_class="dask_cloudprovider.aws.FargateCluster",
adapt_kwargs={"maximum": 10},
)
with Flow(FLOW_NAME, executor=EXECUTOR, run_config=RUN_CONFIG) as flow:
...
Kevin Kho
02/20/2022, 5:00 PMSlava Shor
02/20/2022, 5:01 PMKevin Kho
02/20/2022, 5:01 PMSlava Shor
02/20/2022, 5:02 PMwith Flow(FLOW_NAME, executor=EXECUTOR, run_config=RUN_CONFIG, storage=STORAGE) as flow:
Kevin Kho
02/20/2022, 5:07 PMprefect ModuleNotFound
but if you don’t attach a storage, Prefect uses a default LocalStorage and saves the file locally, which is why it’s looking on the local machine for the file that contains the FlowSlava Shor
02/20/2022, 6:12 PMPREFECT__*
environment variable to the docker image. Isn’t it some thing that Agent should take care of?Kevin Kho
02/20/2022, 6:15 PM--env
flag when you spin up an agent to set environment variables that will be copied over to the Flow run. Otherwise, you’d have to define it on the new process (in the container)Slava Shor
02/20/2022, 6:58 PMAnna Geller
02/20/2022, 10:06 PMKevin Kho
02/20/2022, 11:03 PMconfig.toml
of the container, but that’s pretty much the same thing, just set differently.Slava Shor
02/21/2022, 8:30 AMAnna Geller
02/21/2022, 9:35 AMenv:
- name: PREFECT__CLOUD__AGENT__AUTH_TOKEN
value: ''
- name: PREFECT__CLOUD__API
value: "<http://some_ip:4200/graphql>" # paste your GraphQL Server endpoint here
- name: PREFECT__BACKEND
value: server
Slava Shor
02/21/2022, 10:31 AMPREFECT__CLOUD__AGENT__AUTH_TOKEN
as we’re using Prefect server self-hosted on EC2Anna Geller
02/21/2022, 11:14 AMSlava Shor
02/23/2022, 4:13 PM/usr/local/lib/python3.10/site-packages/prefect/utilities/logging.py:126: UserWarning: Failed to write logs with error: HTTPError('413 Client Error: Payload Too Large for url: http://********:4200/'), Pending log length: 3,000,240, Max batch log length: 4,000,000, Queue size: 3,617
Kevin Kho
02/23/2022, 4:16 PMSlava Shor
02/23/2022, 4:21 PMKevin Kho
02/23/2022, 4:26 PMSlava Shor
02/23/2022, 4:37 PMEXECUTOR = DaskExecutor(
cluster_class="dask_cloudprovider.aws.FargateCluster",
adapt_kwargs={"maximum": 10},
)
Kevin Kho
02/23/2022, 4:44 PMSlava Shor
02/23/2022, 4:44 PMECSRun
looks like :
RUN_CONFIG = ECSRun(
labels=["dev"],
run_task_kwargs=dict(
cluster="prefect",
launchType="FARGATE",
networkConfiguration={
"awsvpcConfiguration": {
"subnets": [
"...",
],
"securityGroups": [
"...",
],
"assignPublicIp": "ENABLED",
}
},
),
task_definition_arn="arn:aws:ecs:...",
)
And when we trigger a workflow there is an ECS task inside prefect
ECS cluster but it seems like an agent spin another cluster dynamically looking like: dask-098fc57b-3
Kevin Kho
02/23/2022, 4:46 PMSlava Shor
02/23/2022, 4:47 PMLocalDaskExecutor
doesn’t mean running in local machine?Kevin Kho
02/23/2022, 4:48 PMSlava Shor
02/23/2022, 4:52 PMDaskExecutor
just without configuration I showed before. But we wanted to benefit from scalability, otherwise it is quite pointless for us.Anna Geller
02/23/2022, 5:25 PMKevin Kho
02/23/2022, 7:51 PM