Does the `ECSRun` configuration combined with `Loc...
# ask-community
n
Does the
ECSRun
configuration combined with
LocalDaskExecutor(scheduler="processes", num_workers=4)
support parallel execution of mapped tasks? Currently I'm unable to get the ECS task to spawn any additional processes - all are run in sequence (first image). When running the same flow on the local Prefect environment, the tasks are all done in parallel (second image). If I use
LocalDaskExecutor(scheduler="threads", num_workers=4)
, the flow tasks are executed in parallel, but threaded flow only covers part of our use cases.
c
Do you know what compute your ECS containers have? I wonder if they don't have the ability to run 4 processes.
Also,
LocalDaskExecutor
will run on the one container, so it won't spin off extra ECS tasks
n
I'm passing a custom task definition, these are the ECS container resources (from the AWS task definition that is generated for the task):
Copy code
Task memory (MiB)8192
Task CPU (unit)4096
I understand it will not spawn multiple ECS tasks, but it should be able to spawn multiple processes within a single container?
c
Yeah, it should. Hmm. Do you get anything useful from the CloudWatch logs?
n
Unfortunately I'm not getting any logs from the ECS task on the AWS side, only on the Prefect logs.
c
Ah, how did you deploy it?
n
Hey @ciaran, got caught in a meeting. This is the test flow:
Copy code
from prefect import Flow, task
from prefect.engine.results import S3Result
from prefect.executors import LocalDaskExecutor
from prefect.run_configs import ECSRun
from prefect.storage import S3

@task
def process(key):
    import time
    time.sleep(30)

with Flow(
        name="opal_copy_test",
) as flow:
    keys = [1,2,3,4]
    process.map(keys)

flow.executor = LocalDaskExecutor(scheduler="processes", num_workers=4)

run_config = ECSRun(
    labels=["python3.7"],
    image="<DOCKER_IMAGE>",
)
storage = S3(bucket="<S3_BUCKET>")
result = S3Result(bucket="<S3_BUCKET>")
flow.run_config = run_config
flow.storage = storage

if __name__ == '__main__':
    flow.register(project_name='test')
When registering it for local environment, the storage, result and run_config part is removed. Also, I change the
~/.prefect/backend.toml
to point to the ECS Prefect Server when registering flows to ECS.
c
Sorry, I meant how did you deploy your ECS stuff
n
I have 2 EC2 instances running Prefect Server and Prefect Agent, and an ECS Cluster as an execution layer.
server deployment:
Copy code
prefect server start
agent deployment (
options.yaml
contain network configuration):
Copy code
prefect agent ecs start 
    --task-role-arn=arn:aws:iam::<ACCOUNT_ID>:role/ECSTaskS3ECRRole 
    --execution-role-arn arn:aws:iam::<ACCOUNT_ID>:role/ECSTaskS3ECRRole 
    --log-level INFO 
    --label ecs-dev 
    --label python3.7 
    --name ecs-test 
    --cluster prefect-cluster 
    --run-task-kwargs /home/ubuntu/prefect-agent/options.yaml 
    --api http://<PREFECT_SERVER_IP>:4200
k
Hi @Nikola Lusic, so
processes
works but
threads
doesn’t?
n
Hey @Kevin Kho,
threads
- works as expected
processes
- runs tasks in sequence instead of parallel
k
Will ask the team about this
This is specific to ECS right? I saw in the previous thread where you downgraded the Dask version and parallelization started working?
n
Yup, the previous thread was regarding the local execution. The current thread is regarding the ECS deployment.
k
Just making sure, you already downgraded from dask-core==2021.4.0. I think we are exploring this in this issue
This could also be dask making the choice that spawning the new process to run it in parallel is more expensive than the potential gains
n
Indeed it was connected to the issue you mentioned. I have downgraded dask version in Prefect Server, Prefect Agent and in the build environment. But docker image still had the new dask version. I've updated the requirements and hardcoded the dask version. With the downgraded version tasks are running in parallel.
1
Thank you guys!