itay livni
05/11/2020, 5:21 PMDaskCloudProviderEnvironment
? I am following the docs here https://docs.prefect.io/orchestration/execution/dask_cloud_provider_environment.html#process and am getting an import error .
prefect.__version__
'0.10.7'
from prefect.environments import DaskCloudProviderEnvironment
cannot import name 'DaskCloudProviderEnvironment' from 'prefect.environments' (.../miniconda3/envs/py37moc/lib/python3.7/site-packages/prefect/environments/__init__.py)
Chris White
05/11/2020, 5:26 PMfrom prefect.environments.execution.dask.cloud_provider import DaskCloudProviderEnvironment
Apologies for the inconvenience!Marvin
05/11/2020, 6:15 PMJoe Schmid
05/11/2020, 6:58 PMDaskCloudProviderEnvironment
-- happy to try to answer any questions you might have about it. (Also interested to hear your experience. We have a Flow using that Environment that's been running every 15 minutes for the past 2 weeks and has been very stable.)itay livni
05/11/2020, 7:28 PM# Tear down the Dask cluster. If you're developing and testing your flow you would
# not do this after each Flow run, but when you're done developing and testing.
How does this get implemented in a Flow
where the cluster is not defined? Or should the FargateCluster be defined where the flow run resides?Joe Schmid
05/11/2020, 7:32 PMDaskCloudProviderEnvironment
it will take care of creating all of the Fargate resources you need at the start of each Flow run and will tear them down at the end of each Flow run.itay livni
05/11/2020, 7:33 PMJoe Schmid
05/11/2020, 7:33 PMDaskCloudProviderEnvironment
. What you did interactively to create a FargateCluster (with Dask Cloud Provider) that Environment will now do dynamically for each Flow run.itay livni
05/11/2020, 7:58 PMenvironment = DaskCloudProviderEnvironment(
provider_class=FargateCluster,
#task_role_arn="...",
execution_role_arn="...",
n_workers=1,
scheduler_cpu=1024,
scheduler_mem=1024,
worker_cpu=1024,
worker_mem=2048,
scheduler_timeout="45 minutes",
)
etl_moc_flow.environment = environment
Joe Schmid
05/11/2020, 8:08 PMimage
kwarg to DaskCloudProviderEnvironment
it will look up the registry URL, image & tag from Cloud (based on the storage for that Flow) and will use that as the Docker image for the Dask Cluster in Fargate.itay livni
05/11/2020, 8:16 PMAn error occurred (ClientException) when calling the RegisterTaskDefinition operation: Invalid setting for container 'flow'. At least one of 'memory' or 'memoryReservation' must be specified.
The only difference is the Docker.
with Flow("Dask Cloud Provider Test", environment=environment) as flow:
x = Parameter("x", default=[1, 2, 3])
y = times_two.map(x)
results = get_sum(y)
flow.storage = Docker(python_dependencies=["dask_cloudprovider"])
environment = DaskCloudProviderEnvironment(
provider_class=FargateCluster,
execution_role_arn="arn:aws:iam::xyz/ecsTaskExecutionRole",
an_workers=1,
scheduler_cpu=512,
scheduler_mem=1024,
worker_cpu=512,
worker_mem=1024,
scheduler_timeout="15 minutes"
)
flow.environment = environment
flow.register(project_name="tst-deploy", build=True)
https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_RegisterTaskDefinition.html Any guidance or solution to this?Joe Schmid
05/11/2020, 11:35 PMimport os
from prefect.agent.fargate import FargateAgent
REGION_NAME = os.getenv("REGION_NAME")
TASK_ROLE_ARN = os.getenv("TASK_ROLE_ARN")
EXECUTION_ROLE_ARN = os.getenv("EXECUTION_ROLE_ARN")
CLUSTER = os.getenv("CLUSTER")
SUBNETS = os.getenv("SUBNETS")
SECURITY_GROUPS = os.getenv("SECURITY_GROUPS")
TASK_CPU = os.getenv("TASK_CPU")
TASK_MEMORY = os.getenv("TASK_MEMORY")
ASSIGN_PUBLIC_IP = os.getenv("ASSIGN_PUBLIC_IP")
EXTERNAL_KWARGS_BUCKET = os.getenv("EXTERNAL_KWARGS_BUCKET")
LAUNCH_TYPE = os.getenv("LAUNCH_TYPE")
kwargs = {"platformVersion": "1.4.0"} if LAUNCH_TYPE == "FARGATE" else {}
agent = FargateAgent(
launch_type=LAUNCH_TYPE,
enable_task_revisions=True,
use_external_kwargs=False,
# external_kwargs_s3_bucket= EXTERNAL_KWARGS_BUCKET,
# external_kwargs_s3_key='prefect-artifacts/kwargs',
labels=os.getenv("LABELS", "fargate").split(","),
region_name=REGION_NAME,
taskRoleArn=TASK_ROLE_ARN,
executionRoleArn=EXECUTION_ROLE_ARN,
cpu=TASK_CPU,
memory=TASK_MEMORY,
cluster=CLUSTER,
networkMode="awsvpc",
env_vars={"AWS_DEFAULT_REGION": REGION_NAME},
networkConfiguration={
"awsvpcConfiguration": {
"subnets": str(SUBNETS).split(","),
"securityGroups": str(SECURITY_GROUPS).split(","),
"assignPublicIp": ASSIGN_PUBLIC_IP,
}
},
containerDefinitions=[
{
"command": [],
"environment": [
{"name": "PREFECT__LOGGING__LOG_TO_CLOUD", "value": "true"},
{"name": "AWS_DEFAULT_REGION", "value": REGION_NAME},
],
"essential": True,
"logConfiguration": {
"logDriver": "awslogs",
"options": {
"awslogs-group": "ds-prefect-fargate",
"awslogs-region": "us-west-2",
"awslogs-stream-prefix": "ds-prefect-fargate",
},
},
}
],
**kwargs
)
agent.start()
$ export AWS_ACCESS_KEY_ID=...
$ export AWS_SECRET_ACCESS_KEY=...
$ export REGION_NAME=us-east-1
$ export cpu=256
$ export memory=512
$ export networkConfiguration="{'awsvpcConfiguration': {'assignPublicIp': 'ENABLED', 'subnets': ['my_subnet_id'], 'securityGroups': []}}"
$ prefect agent start fargate