Thread
#prefect-community
    n

    Nikola Lusic

    1 year ago
    Does the
    ECSRun
    configuration combined with
    LocalDaskExecutor(scheduler="processes", num_workers=4)
    support parallel execution of mapped tasks? Currently I'm unable to get the ECS task to spawn any additional processes - all are run in sequence (first image). When running the same flow on the local Prefect environment, the tasks are all done in parallel (second image). If I use
    LocalDaskExecutor(scheduler="threads", num_workers=4)
    , the flow tasks are executed in parallel, but threaded flow only covers part of our use cases.
    ciaran

    ciaran

    1 year ago
    Do you know what compute your ECS containers have? I wonder if they don't have the ability to run 4 processes.
    Also,
    LocalDaskExecutor
    will run on the one container, so it won't spin off extra ECS tasks
    n

    Nikola Lusic

    1 year ago
    I'm passing a custom task definition, these are the ECS container resources (from the AWS task definition that is generated for the task):
    Task memory (MiB)8192
    Task CPU (unit)4096
    I understand it will not spawn multiple ECS tasks, but it should be able to spawn multiple processes within a single container?
    ciaran

    ciaran

    1 year ago
    Yeah, it should. Hmm. Do you get anything useful from the CloudWatch logs?
    n

    Nikola Lusic

    1 year ago
    Unfortunately I'm not getting any logs from the ECS task on the AWS side, only on the Prefect logs.
    ciaran

    ciaran

    1 year ago
    Ah, how did you deploy it?
    n

    Nikola Lusic

    1 year ago
    Hey @ciaran, got caught in a meeting. This is the test flow:
    from prefect import Flow, task
    from prefect.engine.results import S3Result
    from prefect.executors import LocalDaskExecutor
    from prefect.run_configs import ECSRun
    from prefect.storage import S3
    
    @task
    def process(key):
        import time
        time.sleep(30)
    
    with Flow(
            name="opal_copy_test",
    ) as flow:
        keys = [1,2,3,4]
        process.map(keys)
    
    flow.executor = LocalDaskExecutor(scheduler="processes", num_workers=4)
    
    run_config = ECSRun(
        labels=["python3.7"],
        image="<DOCKER_IMAGE>",
    )
    storage = S3(bucket="<S3_BUCKET>")
    result = S3Result(bucket="<S3_BUCKET>")
    flow.run_config = run_config
    flow.storage = storage
    
    if __name__ == '__main__':
        flow.register(project_name='test')
    When registering it for local environment, the storage, result and run_config part is removed. Also, I change the
    ~/.prefect/backend.toml
    to point to the ECS Prefect Server when registering flows to ECS.
    ciaran

    ciaran

    1 year ago
    Sorry, I meant how did you deploy your ECS stuff
    n

    Nikola Lusic

    1 year ago
    I have 2 EC2 instances running Prefect Server and Prefect Agent, and an ECS Cluster as an execution layer.
    server deployment:
    prefect server start
    agent deployment (
    options.yaml
    contain network configuration):
    prefect agent ecs start 
        --task-role-arn=arn:aws:iam::<ACCOUNT_ID>:role/ECSTaskS3ECRRole 
        --execution-role-arn arn:aws:iam::<ACCOUNT_ID>:role/ECSTaskS3ECRRole 
        --log-level INFO 
        --label ecs-dev 
        --label python3.7 
        --name ecs-test 
        --cluster prefect-cluster 
        --run-task-kwargs /home/ubuntu/prefect-agent/options.yaml 
        --api http://<PREFECT_SERVER_IP>:4200
    Kevin Kho

    Kevin Kho

    1 year ago
    Hi @Nikola Lusic, so
    processes
    works but
    threads
    doesn’t?
    n

    Nikola Lusic

    1 year ago
    Hey @Kevin Kho,
    threads
    - works as expected
    processes
    - runs tasks in sequence instead of parallel
    Kevin Kho

    Kevin Kho

    1 year ago
    Will ask the team about this
    This is specific to ECS right? I saw in the previous thread where you downgraded the Dask version and parallelization started working?
    n

    Nikola Lusic

    1 year ago
    Yup, the previous thread was regarding the local execution. The current thread is regarding the ECS deployment.
    Kevin Kho

    Kevin Kho

    1 year ago
    Just making sure, you already downgraded from dask-core==2021.4.0. I think we are exploring this in this issue
    This could also be dask making the choice that spawning the new process to run it in parallel is more expensive than the potential gains
    n

    Nikola Lusic

    1 year ago
    Indeed it was connected to the issue you mentioned. I have downgraded dask version in Prefect Server, Prefect Agent and in the build environment. But docker image still had the new dask version. I've updated the requirements and hardcoded the dask version. With the downgraded version tasks are running in parallel.
    Thank you guys!