Marc Lipoff
04/27/2022, 1:52 PMCreating a new Dask cluster with None.fargate_cluster...
. I'm running on prefect==0.15.6. My code is attached. The trouble is that it just hangs and I have no clue what it's "trying" to do.def fargate_cluster(n_workers=4):
"""Start a fargate cluster using the same image as the flow run"""
return FargateCluster(
n_workers=n_workers,
image=prefect.context.image,
cluster_arn= "arn:aws:ecs:us-west-2:524279393077:cluster/staging-prefect-cluster"
)
with Flow(
flow_name,
**get_configs(
file_path=os.path.abspath(sys.argv[0]),
flow_name=flow_name,
schedule=sched,
executor = DaskExecutor(
cluster_class=fargate_cluster,
cluster_kwargs={"n_workers": 4}
),
run_config=ECSRun(
env={
"PREFECT__LOGGING__LEVEL": "DEBUG",
"PREFECT__LOGGING__EXTRA_LOGGERS": "['boto3', 'paramiko', 'awswrangler', 'pandas']",
},
cpu="4 vcpu",
memory="30 GB",
labels=['v2']
),
)
) as flow:
incs = inc.map(x=range(100))
decs = dec.map(x=range(100))
adds = add.map(x=incs, y=decs)
total = list_sum(adds)
Kevin Kho
prefect.context.image
? How does it get supplied?Marc Lipoff
04/27/2022, 1:56 PMKevin Kho
Marc Lipoff
04/27/2022, 1:59 PMKevin Kho
prefecthq/prefect
for a quick test since that is publicMarc Lipoff
04/27/2022, 2:01 PM<http://xxx.dkr.ecr.us-west-2.amazonaws.com/staging-prefect-example-dask-cluster:2022-04-27t13-58-56-475868-00-00|xxx.dkr.ecr.us-west-2.amazonaws.com/staging-prefect-example-dask-cluster:2022-04-27t13-58-56-475868-00-00>
Kevin Kho
FargateCluster
is a subclass of ECSCluster
which takes in a parameter _*`cloudwatch_logs_group`*_ here
I think we need to try supplying a logs group so we can get some error tracking in CloudWatchMarc Lipoff
04/27/2022, 2:13 PMfargate_cluster
function to this:
def fargate_cluster(n_workers=4):
"""Start a fargate cluster using the same image as the flow run"""
return FargateCluster(
n_workers=n_workers,
image="prefecthq/prefect", #prefect.context.image,
cluster_arn= "arn:aws:ecs:us-west-2:xxx:cluster/staging-prefect-cluster",
cloudwatch_logs_group="/fargate/service/staging-prefect-agent"
)
That specified log group already exists (happens to be the one that the agent writes to). .... And no logs for the flow show upKevin Kho
cluster_kwargs
instead? No idea if it’ll make a differenceMarc Lipoff
04/27/2022, 2:20 PMState Message:
Unexpected error: TypeError("fargate_cluster() got an unexpected keyword argument 'cloudwatch_logs_group'")
Kevin Kho
from dask_cloudprovider.aws import FargateCluster
cluster = FargateCluster()
and then use your kwargs? this is roughMarc Lipoff
04/27/2022, 2:30 PMdef fargate_cluster()
with
def fargate_cluster():
return FargateCluster()
is that right?Kevin Kho
Marc Lipoff
04/27/2022, 2:34 PMPython 3.9.7 (default, Feb 4 2022, 10:07:17)
[Clang 12.0.0 (clang-1200.0.32.28)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> from dask_cloudprovider.aws import FargateCluster
>>> cluster = FargateCluster()
and it just hangs...
... so what is it doing?Kevin Kho
Marc Lipoff
04/27/2022, 2:38 PMKevin Kho
Marc Lipoff
04/27/2022, 2:53 PMDEBUG:botocore.endpoint:Making request for OperationModel(name=DescribeTaskDefinition) with params: {'url_path': '/', 'query_string': '', 'method': 'POST', 'headers': {'X-Amz-Target': 'AmazonEC2ContainerServiceV20141113.DescribeTaskDefinition', 'Content-Type': 'application/x-amz-json-1.1', 'User-Agent': 'Botocore/1.24.21 Python/3.9.7 Darwin/19.6.0'}, 'body': b'{"taskDefinition": "arn:aws:ecs:us-west-2:xxx:task-definition/dev-webserver:202", "include": ["TAGS"]}', 'url': '<https://ecs.us-west-2.amazonaws.com/>', 'context': {'client_region': 'us-west-2', 'client_config': <aiobotocore.config.AioConfig object at 0x112993d60>, 'has_streaming_input': False, 'auth_type': None}}
And well, we have tons of task definitions -- bc we keep all the versions in our ECR repo.Kevin Kho
Marc Lipoff
04/27/2022, 3:05 PM