Zac Hooper
11/07/2022, 11:53 PMCrash detected! Execution was interrupted by an unexpected exception.
Looking over the logs produced by the agent I can't see the Exception thrown.
The flow looks like below. I've changed the variables used but the premise is that two lambdas are invoked but the 2nd needs to wait until the first is completed.
import os
from prefect import flow, get_run_logger, task
from prefect.deployments import Deployment
from prefect.filesystems import S3
from prefect.orion.schemas.schedules import CronSchedule
from utils.aws import invoke_lambda
# --- Flow Details --- #
client = "example_client"
project = "example_project"
flow_name = "Example Name"
description = """
Example Description
"""
tags = ["example"] # Optional list of string tags for the flow. Helps to filter in UI
schedule = CronSchedule(cron="* 7-19 * * MON-FRI", timezone="Australia/Sydney")
# --- Other Flow Details (Leave as is) --- #
work_queue = "example_workqueue"
storage = S3.load("s3-storage")
output_file_name = f"{os.path.splitext(os.path.basename(__file__))[0]}.yaml"
def invoke_lambda(function_full_name, payload, blocking=False, **kwargs):
"""
Invokes the given lambda function in AWS.
Args:
function_full_name (str): Exact name of the lambda function
payload (dict): Dictionary of data you may want to pass to lambda
blocking (bool, optional): If you want to wait for a response from the lambda. Defaults to False.
"""
config = Config(max_pool_connections=250, read_timeout=900, connect_timeout=900)
lambda_client = boto3.client("lambda", config=config)
# Change client's region
if kwargs.get("region"):
lambda_client = boto3.client(
"lambda", config=config, region_name=kwargs.get("region")
)
# DO NOT BLOCK IF WILL RUN OVER 30 SECONDS
payload = bytes(json.dumps(payload), encoding="utf8")
if blocking:
response = lambda_client.invoke(
FunctionName=function_full_name,
InvocationType="RequestResponse",
LogType="Tail",
Payload=payload,
)
res_payload = response.get("Payload").read()
return json.loads(res_payload)
else:
response = lambda_client.invoke(
FunctionName=function_full_name,
InvocationType="Event",
LogType="None",
Payload=payload,
)
return response
@task
def first_lambda_to_invoke():
logger = get_run_logger()
res = invoke_lambda("first_lambda_to_invoke", {}, True)
<http://logger.info|logger.info>(res)
return ""
@task
def second_lambda_to_invoke():
logger = get_run_logger()
res = invoke_lambda("second_lambda_to_invoke", {}, True)
<http://logger.info|logger.info>(res)
return ""
@flow
def example_flow():
x = first_lambda_to_invoke.submit()
y = second_lambda_to_invoke.submit(wait_for=[x])
if __name__ == "__main__":
Deployment.build_from_flow(
flow=example_flow,
name=flow_name,
work_queue_name=work_queue,
tags=[client, project] + tags,
schedule=schedule,
description=description,
output=output_file_name,
storage=storage,
apply=True,
path="/",
)
Here is an example of the log output from a failed run
Created task run 'first_lambda_to_invoke-ec90046a-0' for task 'first_lambda_to_invoke'
09:32:10 AM
Submitted task run 'first_lambda_to_invoke-ec90046a-0' for execution.
09:32:10 AM
Created task run 'second_lambda_to_invoke-a66b51c9-0' for task 'second_lambda_to_invoke'
09:32:10 AM
Submitted task run 'second_lambda_to_invoke-a66b51c9-0' for execution.
09:32:10 AM
Created task run 'first_lambda_to_invoke-ec90046a-0' for task 'first_lambda_to_invoke'
09:32:10 AM
Submitted task run 'first_lambda_to_invoke-ec90046a-0' for execution.
09:32:10 AM
Created task run 'second_lambda_to_invoke-a66b51c9-0' for task 'second_lambda_to_invoke'
09:32:10 AM
Submitted task run 'second_lambda_to_invoke-a66b51c9-0' for execution.
09:32:10 AM
Crash detected! Execution was interrupted by an unexpected exception.
09:32:10 AM
first_lambda_to_invoke-ec90046a-0
{'statusCode': 200}
09:32:21 AM
first_lambda_to_invoke-ec90046a-0
Crash detected! Execution was interrupted by an unexpected exception.
Note the {'statusCode': 200}
response from the Lambda showing that it ran successfully.Zac Hooper
11/07/2022, 11:54 PMMason Menges
11/08/2022, 12:25 AMZac Hooper
11/08/2022, 2:07 AMMason Menges
11/08/2022, 2:09 AMZac Hooper
11/08/2022, 2:16 AMMason Menges
11/08/2022, 5:10 AMZac Hooper
11/08/2022, 11:05 PMZac Hooper
11/08/2022, 11:07 PMMason Menges
11/08/2022, 11:11 PMZac Hooper
11/08/2022, 11:12 PMMason Menges
11/08/2022, 11:13 PMZac Hooper
11/08/2022, 11:15 PMMason Menges
11/08/2022, 11:23 PMBring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by