Keith Veleba
01/31/2022, 7:16 PMAnna Geller
export PREFECT__CONTEXT__SECRETS__AWS_CREDENTIALS='{"ACCESS_KEY": "abcdef", "SECRET_ACCESS_KEY": "xxx"}'
so in the UI, it would be:
{
"ACCESS_KEY": "abcdef",
"SECRET_ACCESS_KEY": "xxx"
}
Keith Veleba
01/31/2022, 7:21 PMKevin Kho
Keith Veleba
01/31/2022, 7:24 PMKeith Veleba
01/31/2022, 7:24 PMKevin Kho
Keith Veleba
01/31/2022, 7:26 PMKevin Kho
Keith Veleba
01/31/2022, 7:28 PMKeith Veleba
01/31/2022, 7:28 PMKeith Veleba
01/31/2022, 7:37 PM[31 January 2022 2:33pm]: Failed to load and execute Flow's environment: FlowStorageError("An error occurred while unpickling the flow:\n TypeError('code() takes at most 15 arguments (16 given)')\nThis may be due to one of the following version mismatches between the flow build and execution environments:\n - python: (flow built with '3.8.8', currently running with '3.7.12')")
Keith Veleba
01/31/2022, 7:38 PMKeith Veleba
01/31/2022, 7:40 PMfrom ntpath import join
from prefect import Flow, task
from prefect.tasks.aws.batch import BatchSubmit
from prefect.tasks.aws.client_waiter import AWSClientWait
from prefect.client import Secret
from prefect.backend.kv_store import get_key_value
from prefect.storage import S3
FLOW_NAME="aws-batch-hello-world-flow" # name of your flow in cloud UI
PROJECT_NAME="tokenized-channel-fullfillment" #project that flows/runs are attached to
STORAGE=S3(bucket="sgmt-prefect-dev-flows",secrets=["AWS_CREDENTIALS"]) #bucket where flows are stored. Updates to this require you to run locally.
# boto_args = {
# "containerOverrides": {
# "environment": [
# {"name": "workflow_SCRIPT", "value": get_key_value(key="workflow_SCRIPT")},
# {"name": "workflow_OPTS", "value": get_key_value(key="workflow_OPTS")},
# ],
# "vcpus": 4,
# "memory": 32750,
# },
# }
@task
def wait_for_hello(job_id): #, delay, max_attempts):
waiter = AWSClientWait(
client='batch',
waiter_name='JobComplete',
)
waiter.run(
waiter_kwargs={
'jobs': [job_id],
# 'WaiterConfig': {
# 'Delay': delay,
# 'MaxAttempts': max_attempts
# }
},
)
return job_id
@task
def say_hello():
batchjob = BatchSubmit(job_name="prefect-tcf-hello",
job_definition="tcf-sample",
job_queue="tcf-tokenization",)
job_id = batchjob.run()
return job_id
with Flow(FLOW_NAME, storage=STORAGE) as flow:
job_id = say_hello()
wait_for_hello(job_id)
flow.register(project_name=PROJECT_NAME, labels=["dev"])
Kevin Kho
Keith Veleba
01/31/2022, 7:45 PM"image": "prefecthq/prefect:latest-python3.8",
Keith Veleba
01/31/2022, 7:46 PMKevin Kho
Kevin Kho
Keith Veleba
01/31/2022, 8:18 PMKevin Kho
Keith Veleba
01/31/2022, 8:35 PMKevin Kho