https://prefect.io logo
Title
g

Greg Desmarais

07/20/2020, 5:59 AM
I am trying to register a flow in server with a FargateTaskEnvironment and DaskExecutor, and I can successfully register the flow with my server instance. I can also, through the UI, run the flow with a parameter. The agent attached to the prefect server picks up the run and flow, and starts to process it. However it appears that the task definition arguments aren't being passed through the flow serialization process during the registration. Code follows and agent output. What am I doing wrong?
The driver program runs on my local machine. It has the right params and the like to connect to the prefect server, which is running on an EC2 instance.
from datetime import datetime

import prefect

from distributed import get_worker
from prefect import Flow, Parameter, task
from prefect.engine.executors import DaskExecutor
from prefect.environments import FargateTaskEnvironment
from prefect.environments.storage import S3

from constants import AWS_ACCOUNT_NUMBER, VPCS
from util.aws_utils import set_aws_credentials_env, DEFAULT_REGION

set_aws_credentials_env()
prefect.context.config.cloud.graphql = '<http://10.72.112.29:4200/graphql>'

@task(log_stdout=True)
def say_hello(t_name):
    print(f'{datetime.now()}: workflow hello {t_name}', flush=True)
    worker = get_worker()
    return f'done on {worker.name}, scheduler at {worker.scheduler.address}'


name_p = Parameter('name')
with Flow("Dask Cloud Provider Test") as flow:
    for i in range(1):
        say_hello(name_p)

bucket = 'celsius-temp-data'
key = 'datasciences/prefect_flows/dask_cloud_provider_test'
flow.storage = S3(bucket, key=key)
image_name = f'{AWS_ACCOUNT_NUMBER}.<http://dkr.ecr.us-east-1.amazonaws.com/rightsize_99_standard_py37:gdesmarais|dkr.ecr.us-east-1.amazonaws.com/rightsize_99_standard_py37:gdesmarais>'
cluster_kwargs = {
    'vpc': VPCS['production'],
    'fargate_use_private_ip': True,
    'image': image_name,
    'cluster_name_template': f'dask-test-gdesmarais-{{uuid}}'
}
executor_kwargs = {'cluster_kwargs': cluster_kwargs}

task_definition_kwargs = {
    'memoryReservation': 8192,
    'memory': 8192,
}

executor = DaskExecutor(cluster_class='dask_cloudprovider.FargateCluster',
                        cluster_kwargs=cluster_kwargs)
flow.environment = FargateTaskEnvironment(
    executor=executor,
    region_name=DEFAULT_REGION,
    **task_definition_kwargs
)

flow_id = flow.register()

# I'd love for the flow to be populated with more info - e.g. the version (not sure what else)
print(f'Registered flow id: {flow_id}')

# How can I invoke the flow, wait for the results, and see those results?
The agent picks up the flow after I run it through the UI, but spits out the following errors (I've added some print outs):
____            __           _        _                    _
|  _ \ _ __ ___ / _| ___  ___| |_     / \   __ _  ___ _ __ | |_
| |_) | '__/ _ \ |_ / _ \/ __| __|   / _ \ / _` |/ _ \ '_ \| __|
|  __/| | |  __/  _|  __/ (__| |_   / ___ \ (_| |  __/ | | | |_
|_|   |_|  \___|_|  \___|\___|\__| /_/   \_\__, |\___|_| |_|\__|
                                           |___/

[2020-07-20 04:44:57,948] INFO - agent | Starting FargateAgent with labels ['s3-flow-storage']
[2020-07-20 04:44:57,948] INFO - agent | Agent documentation can be found at <https://docs.prefect.io/orchestration/>
[2020-07-20 04:44:57,948] INFO - agent | Agent connecting to the Prefect API at <http://localhost:4200>
[2020-07-20 04:44:57,985] INFO - agent | Waiting for flow runs...
[2020-07-20 04:46:03,767] INFO - agent | Found 1 flow run(s) to submit for execution.
[2020-07-20 04:46:03,809] INFO - agent | Deploying flow run 42e445dd-e0bc-4be4-8566-0e6fd1dbd209
****GSD container_definitions_kwargs: {}
****GSD flow_task_definition_kwargs: {}
****GSD task_definition_name: prefect-task-db2a7788
[2020-07-20 04:46:03,919] ERROR - agent | Logging platform error for flow run 42e445dd-e0bc-4be4-8566-0e6fd1dbd209
Traceback (most recent call last):
  File "/home/ec2-user/prefect-venv/lib/python3.7/site-packages/prefect/agent/agent.py", line 598, in mark_failed
    raise exc
  File "/home/ec2-user/prefect-venv/lib/python3.7/site-packages/prefect/agent/agent.py", line 344, in deploy_and_update_flow_run
    deployment_info = self.deploy_flow(flow_run)
  File "/home/ec2-user/prefect-venv/lib/python3.7/site-packages/prefect/agent/fargate/agent.py", line 490, in deploy_flow
    task_definition_name=task_definition_dict["task_definition_name"],
  File "/home/ec2-user/prefect-venv/lib/python3.7/site-packages/prefect/agent/fargate/agent.py", line 677, in _create_task_definition
    **flow_task_definition_kwargs,
  File "/home/ec2-user/prefect-venv/lib/python3.7/site-packages/botocore/client.py", line 316, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/home/ec2-user/prefect-venv/lib/python3.7/site-packages/botocore/client.py", line 626, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.errorfactory.ClientException: An error occurred (ClientException) when calling the RegisterTaskDefinition operation: Invalid setting for container 'flow'. At least one of 'memory' or 'memoryReservation' must be specified.

[2020-07-20 04:46:03,979] ERROR - agent | Error while deploying flow: ClientException("An error occurred (ClientException) when calling the RegisterTaskDefinition operation: Invalid setting for container 'flow'. At least one of 'memory' or 'memoryReservation' must be specified.")
I traced through the client side serialization during registration, and it looks like only labels and metadata are serialized for the FargateTaskEnvironment.
m

Michael Ludwig

07/20/2020, 7:06 AM
Maybe this is related? https://prefect-community.slack.com/archives/CL09KU1K7/p1594305134408800?thread_ts=1594305134.408800 I could not make it work and now use a
LocalEnvironment
and add the task definiton params to the
FargateAgent
. Not perfect though as this is valid for all flows. If you find a solution I would be happy to hear
g

Greg Desmarais

07/20/2020, 3:28 PM
@Michael Ludwig - thanks for the pointer - I hadn't searched -community, just -server, so I didn't see this. I didn't get the LocalEnv to work, though. In the end, I was able to get the needed parameters to show up on the fargate agent by using the command line configuration options, similar to:
export networkConfiguration="{'awsvpcConfiguration': {'assignPublicIp': 'DISABLED', 'subnets': ['subnet-1234', 'subnet-5678', 'subnet-9101112']}}"
# Start small fargate cluster agent
export memory=8192
export cpu=2048

prefect agent start fargate -v --label s3-flow-storage --label fargate-task --label fargate-size-small
My thinking is that I would start different agents that represented access to different cluster task sizes.
But, as seems to be the pattern, one step forward, one point something steps back. The newly configured agent can pick up the manual run just fine, and even seems to have the needed parameters, but nothing ever gets build for the fargate cluster, and the tak never runs. It just sits at:
[2020-07-20 15:15:32,220] INFO - agent | Deploying flow run 1c8ba14e-efb3-4ab2-89f9-90a690a7df1b
[2020-07-20 15:15:32,221] DEBUG - agent | Checking for task definition
[2020-07-20 15:15:32,275] DEBUG - agent | Task definition prefect-task-6ef694e7 found
[2020-07-20 15:15:32,275] DEBUG - agent | Running task using task definition prefect-task-6ef694e7
[2020-07-20 15:15:32,439] DEBUG - agent | Querying for flow runs
[2020-07-20 15:15:32,461] DEBUG - agent | No flow runs found
[2020-07-20 15:15:32,461] DEBUG - agent | Next query for flow runs in 0.5 seconds
[2020-07-20 15:15:32,865] DEBUG - agent | Run created for task arn:aws:ecs:us-east-1:386834949250:task/517a0f0d-b167-4bff-a4bc-ab56798f7765
[2020-07-20 15:15:32,902] DEBUG - agent | Completed flow run submission (id: 1c8ba14e-efb3-4ab2-89f9-90a690a7df1b)
[2020-07-20 15:15:32,962] DEBUG - agent | Querying for flow runs
[2020-07-20 15:15:32,983] DEBUG - agent | No flow runs found
[2020-07-20 15:15:32,983] DEBUG - agent | Next query for flow runs in 1.0 seconds
[2020-07-20 15:15:33,984] DEBUG - agent | Querying for flow runs
[2020-07-20 15:15:34,009] DEBUG - agent | No flow runs found
...repeat query/no runs forever...
m

Maikel Penz

07/21/2020, 7:57 AM
@Greg Desmarais @Michael Ludwig did you guys figure a way to make it work again ? If the configuration must be passed to the agent it means that I would not be able to deploy flows with distinct memory/cpu configuration using the same agent ?
g

Greg Desmarais

07/21/2020, 8:10 AM
I'm still working on it. It does seem like the mem/cpu is established by the agent, which is a bit limiting. I'm planning on taking the approach of having multiple agents running, each with a tag like 'fargate-cluster-small' or '-medium', '-large' to at least have some granularity on what tasks are assigned. For me, this is actually reasonable as I'm going to expose my interfaces to folks where I don't want them to think in too granular a way.