Hi - Is anyone using `DaskCloudProviderEnvironment...
# prefect-community
i
Hi - Is anyone using
DaskCloudProviderEnvironment
? I am following the docs here https://docs.prefect.io/orchestration/execution/dask_cloud_provider_environment.html#process and am getting an import error .
Copy code
prefect.__version__ 
'0.10.7'

from prefect.environments import DaskCloudProviderEnvironment
cannot import name 'DaskCloudProviderEnvironment' from 'prefect.environments' (.../miniconda3/envs/py37moc/lib/python3.7/site-packages/prefect/environments/__init__.py)
c
This was an import bug that was identified after 0.10.7 was released - it will be resolved in 0.11.0 coming out in a few days! In the meantime, you can import this environment using:
Copy code
from prefect.environments.execution.dask.cloud_provider import DaskCloudProviderEnvironment
Apologies for the inconvenience!
👍 1
@Marvin archive “Issue importing DaskCloudProviderEnvironment”
j
@itay livni sorry for the trouble! I worked on
DaskCloudProviderEnvironment
-- happy to try to answer any questions you might have about it. (Also interested to hear your experience. We have a Flow using that Environment that's been running every 15 minutes for the past 2 weeks and has been very stable.)
i
@Joe Schmid Using the first example was neat and straightforward. But I am definitely have implementation issues after that 🙂 (1) The documentation says
Copy code
# Tear down the Dask cluster. If you're developing and testing your flow you would
# not do this after each Flow run, but when you're done developing and testing.
How does this get implemented in a
Flow
where the cluster is not defined? Or should the FargateCluster be defined where the flow run resides?
j
@itay livni When using
DaskCloudProviderEnvironment
it will take care of creating all of the Fargate resources you need at the start of each Flow run and will tear them down at the end of each Flow run.
i.e. you don't need to have an ECS/Fargate cluster already -- it will create one dynamically for that Flow run
i
Yes and then tear it down without specifying?
j
yup
So if you completed the first example successfully, next step is to take the same kwargs you used with FargateCluster and use them with
DaskCloudProviderEnvironment
. What you did interactively to create a FargateCluster (with Dask Cloud Provider) that Environment will now do dynamically for each Flow run.
i
@Joe Schmid Very cool. (2) Confused about Storage The Docker Storage should be initialized before calling DaskCloudProvider?
Copy code
environment = DaskCloudProviderEnvironment(
    provider_class=FargateCluster,
    #task_role_arn="...",
    execution_role_arn="...",
    n_workers=1,
    scheduler_cpu=1024,
    scheduler_mem=1024,
    worker_cpu=1024,
    worker_mem=2048,
    scheduler_timeout="45 minutes",  
)


etl_moc_flow.environment = environment
j
@itay livni Right. If you don't provide an
image
kwarg to
DaskCloudProviderEnvironment
it will look up the registry URL, image & tag from Cloud (based on the storage for that Flow) and will use that as the Docker image for the Dask Cluster in Fargate.
This is really nice because it guarantees that the Dask cluster has the same dependencies (python modules, etc.) available as the Docker image for the Flow run.
i
@Joe Schmid Yes.. There are a lot of nice things about this. When I get it working. It will be perhaps the simplest way to deploy code to the cloud I have seen!
🚀 1
@Joe Schmid - So I am running into this aws error when I run the flow below.
An error occurred (ClientException) when calling the RegisterTaskDefinition operation: Invalid setting for container 'flow'. At least one of 'memory' or 'memoryReservation' must be specified.
The only difference is the Docker.
Copy code
with Flow("Dask Cloud Provider Test", environment=environment) as flow:
    x = Parameter("x", default=[1, 2, 3])
    y = times_two.map(x)
    results = get_sum(y)

flow.storage = Docker(python_dependencies=["dask_cloudprovider"])

environment = DaskCloudProviderEnvironment(
    provider_class=FargateCluster,
    execution_role_arn="arn:aws:iam::xyz/ecsTaskExecutionRole",
    an_workers=1,
    scheduler_cpu=512,
    scheduler_mem=1024,
    worker_cpu=512,
    worker_mem=1024,
    scheduler_timeout="15 minutes"
)

flow.environment = environment

flow.register(project_name="tst-deploy", build=True)
https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_RegisterTaskDefinition.html Any guidance or solution to this?
j
@itay livni Couple thoughts: 1. That doesn't look like it's related to Dask Cloud Provider (its container wouldn't be named "flow" but would be "dask-scheduler" or "dask-worker") 2. I think it's related to the config for the Fargate agent. Double-check the instructions found here: https://docs.prefect.io/orchestration/agents/fargate.html I think you're close!
Here's code that we use to start our Fargate agent. It's mostly just reading env vars.
Copy code
import os

from prefect.agent.fargate import FargateAgent

REGION_NAME = os.getenv("REGION_NAME")
TASK_ROLE_ARN = os.getenv("TASK_ROLE_ARN")
EXECUTION_ROLE_ARN = os.getenv("EXECUTION_ROLE_ARN")
CLUSTER = os.getenv("CLUSTER")
SUBNETS = os.getenv("SUBNETS")
SECURITY_GROUPS = os.getenv("SECURITY_GROUPS")
TASK_CPU = os.getenv("TASK_CPU")
TASK_MEMORY = os.getenv("TASK_MEMORY")
ASSIGN_PUBLIC_IP = os.getenv("ASSIGN_PUBLIC_IP")
EXTERNAL_KWARGS_BUCKET = os.getenv("EXTERNAL_KWARGS_BUCKET")
LAUNCH_TYPE = os.getenv("LAUNCH_TYPE")

kwargs = {"platformVersion": "1.4.0"} if LAUNCH_TYPE == "FARGATE" else {}

agent = FargateAgent(
    launch_type=LAUNCH_TYPE,
    enable_task_revisions=True,
    use_external_kwargs=False,
    # external_kwargs_s3_bucket= EXTERNAL_KWARGS_BUCKET,
    # external_kwargs_s3_key='prefect-artifacts/kwargs',
    labels=os.getenv("LABELS", "fargate").split(","),
    region_name=REGION_NAME,
    taskRoleArn=TASK_ROLE_ARN,
    executionRoleArn=EXECUTION_ROLE_ARN,
    cpu=TASK_CPU,
    memory=TASK_MEMORY,
    cluster=CLUSTER,
    networkMode="awsvpc",
    env_vars={"AWS_DEFAULT_REGION": REGION_NAME},
    networkConfiguration={
        "awsvpcConfiguration": {
            "subnets": str(SUBNETS).split(","),
            "securityGroups": str(SECURITY_GROUPS).split(","),
            "assignPublicIp": ASSIGN_PUBLIC_IP,
        }
    },
    containerDefinitions=[
        {
            "command": [],
            "environment": [
                {"name": "PREFECT__LOGGING__LOG_TO_CLOUD", "value": "true"},
                {"name": "AWS_DEFAULT_REGION", "value": REGION_NAME},
            ],
            "essential": True,
            "logConfiguration": {
                "logDriver": "awslogs",
                "options": {
                    "awslogs-group": "ds-prefect-fargate",
                    "awslogs-region": "us-west-2",
                    "awslogs-stream-prefix": "ds-prefect-fargate",
                },
            },
        }
    ],
    **kwargs
)
agent.start()
You could also set env vars and just use the prefect command line, e.g.:
Copy code
$ export AWS_ACCESS_KEY_ID=...
$ export AWS_SECRET_ACCESS_KEY=...
$ export REGION_NAME=us-east-1
$ export cpu=256
$ export memory=512
$ export networkConfiguration="{'awsvpcConfiguration': {'assignPublicIp': 'ENABLED', 'subnets': ['my_subnet_id'], 'securityGroups': []}}"

$ prefect agent start fargate