Greg Desmarais
07/20/2020, 5:59 AMfrom datetime import datetime
import prefect
from distributed import get_worker
from prefect import Flow, Parameter, task
from prefect.engine.executors import DaskExecutor
from prefect.environments import FargateTaskEnvironment
from prefect.environments.storage import S3
from constants import AWS_ACCOUNT_NUMBER, VPCS
from util.aws_utils import set_aws_credentials_env, DEFAULT_REGION
set_aws_credentials_env()
prefect.context.config.cloud.graphql = '<http://10.72.112.29:4200/graphql>'
@task(log_stdout=True)
def say_hello(t_name):
print(f'{datetime.now()}: workflow hello {t_name}', flush=True)
worker = get_worker()
return f'done on {worker.name}, scheduler at {worker.scheduler.address}'
name_p = Parameter('name')
with Flow("Dask Cloud Provider Test") as flow:
for i in range(1):
say_hello(name_p)
bucket = 'celsius-temp-data'
key = 'datasciences/prefect_flows/dask_cloud_provider_test'
flow.storage = S3(bucket, key=key)
image_name = f'{AWS_ACCOUNT_NUMBER}.<http://dkr.ecr.us-east-1.amazonaws.com/rightsize_99_standard_py37:gdesmarais|dkr.ecr.us-east-1.amazonaws.com/rightsize_99_standard_py37:gdesmarais>'
cluster_kwargs = {
'vpc': VPCS['production'],
'fargate_use_private_ip': True,
'image': image_name,
'cluster_name_template': f'dask-test-gdesmarais-{{uuid}}'
}
executor_kwargs = {'cluster_kwargs': cluster_kwargs}
task_definition_kwargs = {
'memoryReservation': 8192,
'memory': 8192,
}
executor = DaskExecutor(cluster_class='dask_cloudprovider.FargateCluster',
cluster_kwargs=cluster_kwargs)
flow.environment = FargateTaskEnvironment(
executor=executor,
region_name=DEFAULT_REGION,
**task_definition_kwargs
)
flow_id = flow.register()
# I'd love for the flow to be populated with more info - e.g. the version (not sure what else)
print(f'Registered flow id: {flow_id}')
# How can I invoke the flow, wait for the results, and see those results?
____ __ _ _ _
| _ \ _ __ ___ / _| ___ ___| |_ / \ __ _ ___ _ __ | |_
| |_) | '__/ _ \ |_ / _ \/ __| __| / _ \ / _` |/ _ \ '_ \| __|
| __/| | | __/ _| __/ (__| |_ / ___ \ (_| | __/ | | | |_
|_| |_| \___|_| \___|\___|\__| /_/ \_\__, |\___|_| |_|\__|
|___/
[2020-07-20 04:44:57,948] INFO - agent | Starting FargateAgent with labels ['s3-flow-storage']
[2020-07-20 04:44:57,948] INFO - agent | Agent documentation can be found at <https://docs.prefect.io/orchestration/>
[2020-07-20 04:44:57,948] INFO - agent | Agent connecting to the Prefect API at <http://localhost:4200>
[2020-07-20 04:44:57,985] INFO - agent | Waiting for flow runs...
[2020-07-20 04:46:03,767] INFO - agent | Found 1 flow run(s) to submit for execution.
[2020-07-20 04:46:03,809] INFO - agent | Deploying flow run 42e445dd-e0bc-4be4-8566-0e6fd1dbd209
****GSD container_definitions_kwargs: {}
****GSD flow_task_definition_kwargs: {}
****GSD task_definition_name: prefect-task-db2a7788
[2020-07-20 04:46:03,919] ERROR - agent | Logging platform error for flow run 42e445dd-e0bc-4be4-8566-0e6fd1dbd209
Traceback (most recent call last):
File "/home/ec2-user/prefect-venv/lib/python3.7/site-packages/prefect/agent/agent.py", line 598, in mark_failed
raise exc
File "/home/ec2-user/prefect-venv/lib/python3.7/site-packages/prefect/agent/agent.py", line 344, in deploy_and_update_flow_run
deployment_info = self.deploy_flow(flow_run)
File "/home/ec2-user/prefect-venv/lib/python3.7/site-packages/prefect/agent/fargate/agent.py", line 490, in deploy_flow
task_definition_name=task_definition_dict["task_definition_name"],
File "/home/ec2-user/prefect-venv/lib/python3.7/site-packages/prefect/agent/fargate/agent.py", line 677, in _create_task_definition
**flow_task_definition_kwargs,
File "/home/ec2-user/prefect-venv/lib/python3.7/site-packages/botocore/client.py", line 316, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/home/ec2-user/prefect-venv/lib/python3.7/site-packages/botocore/client.py", line 626, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.errorfactory.ClientException: An error occurred (ClientException) when calling the RegisterTaskDefinition operation: Invalid setting for container 'flow'. At least one of 'memory' or 'memoryReservation' must be specified.
[2020-07-20 04:46:03,979] ERROR - agent | Error while deploying flow: ClientException("An error occurred (ClientException) when calling the RegisterTaskDefinition operation: Invalid setting for container 'flow'. At least one of 'memory' or 'memoryReservation' must be specified.")
Michael Ludwig
07/20/2020, 7:06 AMLocalEnvironment
and add the task definiton params to the FargateAgent
. Not perfect though as this is valid for all flows. If you find a solution I would be happy to hearGreg Desmarais
07/20/2020, 3:28 PMexport networkConfiguration="{'awsvpcConfiguration': {'assignPublicIp': 'DISABLED', 'subnets': ['subnet-1234', 'subnet-5678', 'subnet-9101112']}}"
# Start small fargate cluster agent
export memory=8192
export cpu=2048
prefect agent start fargate -v --label s3-flow-storage --label fargate-task --label fargate-size-small
[2020-07-20 15:15:32,220] INFO - agent | Deploying flow run 1c8ba14e-efb3-4ab2-89f9-90a690a7df1b
[2020-07-20 15:15:32,221] DEBUG - agent | Checking for task definition
[2020-07-20 15:15:32,275] DEBUG - agent | Task definition prefect-task-6ef694e7 found
[2020-07-20 15:15:32,275] DEBUG - agent | Running task using task definition prefect-task-6ef694e7
[2020-07-20 15:15:32,439] DEBUG - agent | Querying for flow runs
[2020-07-20 15:15:32,461] DEBUG - agent | No flow runs found
[2020-07-20 15:15:32,461] DEBUG - agent | Next query for flow runs in 0.5 seconds
[2020-07-20 15:15:32,865] DEBUG - agent | Run created for task arn:aws:ecs:us-east-1:386834949250:task/517a0f0d-b167-4bff-a4bc-ab56798f7765
[2020-07-20 15:15:32,902] DEBUG - agent | Completed flow run submission (id: 1c8ba14e-efb3-4ab2-89f9-90a690a7df1b)
[2020-07-20 15:15:32,962] DEBUG - agent | Querying for flow runs
[2020-07-20 15:15:32,983] DEBUG - agent | No flow runs found
[2020-07-20 15:15:32,983] DEBUG - agent | Next query for flow runs in 1.0 seconds
[2020-07-20 15:15:33,984] DEBUG - agent | Querying for flow runs
[2020-07-20 15:15:34,009] DEBUG - agent | No flow runs found
...repeat query/no runs forever...
Maikel Penz
07/21/2020, 7:57 AMGreg Desmarais
07/21/2020, 8:10 AM