my ecsagent is using the "image": "prefecthq/prefect:latest-python3.8", so I'm not sure what I've done wrong.
My flow in it's entirety:
from ntpath import join
from prefect import Flow, task
from prefect.tasks.aws.batch import BatchSubmit
from prefect.tasks.aws.client_waiter import AWSClientWait
from prefect.client import Secret
from prefect.backend.kv_store import get_key_value
from prefect.storage import S3
FLOW_NAME="aws-batch-hello-world-flow" # name of your flow in cloud UI
PROJECT_NAME="tokenized-channel-fullfillment" #project that flows/runs are attached to
STORAGE=S3(bucket="sgmt-prefect-dev-flows",secrets=["AWS_CREDENTIALS"]) #bucket where flows are stored. Updates to this require you to run locally.
# boto_args = {
# "containerOverrides": {
# "environment": [
# {"name": "workflow_SCRIPT", "value": get_key_value(key="workflow_SCRIPT")},
# {"name": "workflow_OPTS", "value": get_key_value(key="workflow_OPTS")},
# ],
# "vcpus": 4,
# "memory": 32750,
# },
# }
@task
def wait_for_hello(job_id): #, delay, max_attempts):
waiter = AWSClientWait(
client='batch',
waiter_name='JobComplete',
)
waiter.run(
waiter_kwargs={
'jobs': [job_id],
# 'WaiterConfig': {
# 'Delay': delay,
# 'MaxAttempts': max_attempts
# }
},
)
return job_id
@task
def say_hello():
batchjob = BatchSubmit(job_name="prefect-tcf-hello",
job_definition="tcf-sample",
job_queue="tcf-tokenization",)
job_id = batchjob.run()
return job_id
with Flow(FLOW_NAME, storage=STORAGE) as flow:
job_id = say_hello()
wait_for_hello(job_id)
flow.register(project_name=PROJECT_NAME, labels=["dev"])