Does the Orion Server produce any more logs beyond...
# prefect-community
z
Does the Orion Server produce any more logs beyond the ones that appear in the agents logs? I can't seem to work out what is causing my flow to crash. I have a flow that runs every minute but at least once a day it will fail due to:
Copy code
Crash detected! Execution was interrupted by an unexpected exception.
Looking over the logs produced by the agent I can't see the Exception thrown. The flow looks like below. I've changed the variables used but the premise is that two lambdas are invoked but the 2nd needs to wait until the first is completed.
Copy code
import os
from prefect import flow, get_run_logger, task
from prefect.deployments import Deployment
from prefect.filesystems import S3
from prefect.orion.schemas.schedules import CronSchedule
from utils.aws import invoke_lambda

# --- Flow Details --- #
client = "example_client"
project = "example_project" 
flow_name = "Example Name"
description = """
    Example Description
"""
tags = ["example"]  # Optional list of string tags for the flow. Helps to filter in UI
schedule = CronSchedule(cron="* 7-19 * * MON-FRI", timezone="Australia/Sydney")

# --- Other Flow Details (Leave as is) --- #
work_queue = "example_workqueue"
storage = S3.load("s3-storage")
output_file_name = f"{os.path.splitext(os.path.basename(__file__))[0]}.yaml"

def invoke_lambda(function_full_name, payload, blocking=False, **kwargs):
    """
    Invokes the given lambda function in AWS.

    Args:
        function_full_name (str): Exact name of the lambda function
        payload (dict): Dictionary of data you may want to pass to lambda
        blocking (bool, optional): If you want to wait for a response from the lambda. Defaults to False.
    """
    config = Config(max_pool_connections=250, read_timeout=900, connect_timeout=900)
    lambda_client = boto3.client("lambda", config=config)
    # Change client's region
    if kwargs.get("region"):
        lambda_client = boto3.client(
            "lambda", config=config, region_name=kwargs.get("region")
        )

    # DO NOT BLOCK IF WILL RUN OVER 30 SECONDS
    payload = bytes(json.dumps(payload), encoding="utf8")

    if blocking:
        response = lambda_client.invoke(
            FunctionName=function_full_name,
            InvocationType="RequestResponse",
            LogType="Tail",
            Payload=payload,
        )
        res_payload = response.get("Payload").read()
        return json.loads(res_payload)
    else:
        response = lambda_client.invoke(
            FunctionName=function_full_name,
            InvocationType="Event",
            LogType="None",
            Payload=payload,
        )
        return response

@task
def first_lambda_to_invoke():
    logger = get_run_logger()
    res = invoke_lambda("first_lambda_to_invoke", {}, True)
    <http://logger.info|logger.info>(res)
    return ""


@task
def second_lambda_to_invoke():
    logger = get_run_logger()
    res = invoke_lambda("second_lambda_to_invoke", {}, True)
    <http://logger.info|logger.info>(res)
    return ""


@flow
def example_flow():
    x = first_lambda_to_invoke.submit()
    y = second_lambda_to_invoke.submit(wait_for=[x])


if __name__ == "__main__":
    Deployment.build_from_flow(
        flow=example_flow,
        name=flow_name,
        work_queue_name=work_queue,
        tags=[client, project] + tags,
        schedule=schedule,
        description=description,
        output=output_file_name,
        storage=storage,
        apply=True,
        path="/",
    )
Here is an example of the log output from a failed run
Copy code
Created task run 'first_lambda_to_invoke-ec90046a-0' for task 'first_lambda_to_invoke'
09:32:10 AM
Submitted task run 'first_lambda_to_invoke-ec90046a-0' for execution.
09:32:10 AM
Created task run 'second_lambda_to_invoke-a66b51c9-0' for task 'second_lambda_to_invoke'
09:32:10 AM
Submitted task run 'second_lambda_to_invoke-a66b51c9-0' for execution.
09:32:10 AM
Created task run 'first_lambda_to_invoke-ec90046a-0' for task 'first_lambda_to_invoke'
09:32:10 AM
Submitted task run 'first_lambda_to_invoke-ec90046a-0' for execution.
09:32:10 AM
Created task run 'second_lambda_to_invoke-a66b51c9-0' for task 'second_lambda_to_invoke'
09:32:10 AM
Submitted task run 'second_lambda_to_invoke-a66b51c9-0' for execution.
09:32:10 AM
Crash detected! Execution was interrupted by an unexpected exception.
09:32:10 AM
first_lambda_to_invoke-ec90046a-0
{'statusCode': 200}
09:32:21 AM
first_lambda_to_invoke-ec90046a-0
Crash detected! Execution was interrupted by an unexpected exception.
Note the
{'statusCode': 200}
response from the Lambda showing that it ran successfully.
1
I have two agents attached to the same queue. Both are deployed as an AWS ECS Service
m
Hey @Zac Hooper do you happen to have the agent logs for when this occurred, I'd wager a guess that likely what happened is both agents picked up the same flow run simultaneously and one of them abort the run, i.e. this is probably just a result of an orchestration rule in place to prevent multiple agents from running the same flow.
z
Hey @Mason Menges looks like you're right. Both picked up the flow run. Is there a best practice to organising agents running in the same queue to prevent this? Or maybe someway I can add better handling in the Flow itself?
m
Not necessarily in actuality this isn't really an issue it's just the orchestration rule doing it's job on the backend multiple agents is totally fine in terms of distributing work 😁, there's been talk of adjusting how some of the error handling is done here but I don't know if anything concrete around it at the moment
z
Okay no problems. Could I add retries to the Flow or Tasks to try and make sure at least one of the agents keeps executing the flow? Or does the crash stop the flow completely and I just need to wait for the flow to be invoked again?
m
For clarification this occurs because one agent has already started running the flow, i.e. you shouldn't need to do anything extra here, is that not the case when you check the status through the UI?
z
No it's not. It looks like the first task crashed which stopped the second from running
I'm not desperate to scale wide at the moment so I've just dropped my agents down to one and doubled it's cpu and memory to make up the difference. Will see how this goes
m
What version of perfect is the Orion Server running on?
z
Still on 2.5.0
m
It may be worth opening a bug for this on the github if you're willing to, more so if you're able to test this on 2.6
z
Okay. I'm planning on upgrading to 2.6 in a couple of weeks so I might hold off till then and see what happens. Thank you so much for the help really appreciate it!
m
No problem 😄