https://prefect.io logo
Title
n

Nathaniel Russell

10/11/2022, 4:18 PM
I have a flow running in lambda that keeps giving me this warning:
/usr/local/lib/python3.9/site-packages/prefect/logging/handlers.py:76: UserWarning: Failed to create the Prefect home directory at /home/sbx_user1051/.prefect
It then runs the flow code correctly, but after the flow is done it crashes, and gives this error:
[in thread]
All of my flows perform their intended code but all end with this error and say crashed. How do I fix this?
1
c

Christopher Boyd

10/11/2022, 4:20 PM
Hi Nathaniel - Would you mind moving your traceback into the thread?
Also - is
sbx_user1051
your user?
n

Nathaniel Russell

10/11/2022, 4:21 PM
not that I know of, I haven't specifically made that user
c

Christopher Boyd

10/11/2022, 4:21 PM
what does the flow do
n

Nathaniel Russell

10/11/2022, 4:21 PM
[ERROR] OSError: [Errno 30] Read-only file system: '/home/sbx_user1051'
Traceback (most recent call last):
  File "/opt/prefect/service.py", line 63, in handler
    main_flow(event, context)
  File "/usr/local/lib/python3.9/site-packages/prefect/flows.py", line 384, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 158, in enter_flow_run_engine_from_flow_call
    return anyio.run(begin_run)
  File "/usr/local/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/usr/local/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/local/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
    return future.result()
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/usr/local/lib/python3.9/site-packages/prefect/client.py", line 103, in with_injected_client
    return await fn(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 231, in create_then_begin_flow_run
    state = await begin_flow_run(
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 367, in begin_flow_run
    terminal_state = await orchestrate_flow_run(
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 659, in orchestrate_flow_run
    await _persist_serialized_result(
  File "/usr/local/lib/python3.9/site-packages/prefect/results.py", line 15, in _persist_serialized_result
    await filesystem.write_path(key, content)
  File "/usr/local/lib/python3.9/site-packages/prefect/filesystems.py", line 198, in write_path
    path.parent.mkdir(exist_ok=True, parents=True)
  File "/usr/local/lib/python3.9/pathlib.py", line 1327, in mkdir
    self.parent.mkdir(parents=True, exist_ok=True)
  File "/usr/local/lib/python3.9/pathlib.py", line 1327, in mkdir
    self.parent.mkdir(parents=True, exist_ok=True)
  File "/usr/local/lib/python3.9/pathlib.py", line 1323, in mkdir
    self._accessor.mkdir(self, mode)
The flow launches a flow run from a deployment.
@flow(name="flow_run_launcher")
def main_flow(event, context):
    # Get prefect logger
    logger = get_run_logger()
    flow_logger = logging.getLogger('prefect.flow_runs')
    flow_logger.removeHandler("orion")

    # <http://logger.info|logger.info>('Received event')
    print('Received event')
    <http://logger.info|logger.info>(event)
    for record in event['Records']:
        event_body = json.loads(record.get('body'))
        #pull the body out & json load it
        detail = json.loads(json.dumps(event_body['detail']))
        detail = reorder_data_arrays(detail)
        # <http://logger.info|logger.info>(json.dumps(detail))
        <http://logger.info|logger.info>(json.dumps(detail))


    # Get prefect project id from cumulus_client_id
    cumulus_client_id = os.getenv("CUMULUS_CLIENT_ID")
    # <http://logger.info|logger.info>("got client id: " + str(cumulus_client_id))
    <http://logger.info|logger.info>("got client id: " + str(cumulus_client_id))

    # Get the deployment
    deployment = get_deployment(client_id=cumulus_client_id)
    # <http://logger.info|logger.info>("Retrieved deployment: " + str(deployment))
    <http://logger.info|logger.info>("Retrieved deployment: " + str(deployment))

    # Make sure we found the deployment
    if deployment is None:
        raise Exception("Deployment not found")


    # Add the banner event to a dict
    parameters = dict(lambda_event=event)
    # <http://logger.info|logger.info>("Set parameters")
    <http://logger.info|logger.info>("Set parameters")

    # Create the Prefect flow run passing the Banner event
    # <http://logger.info|logger.info>("Making Prefect API Request")
    <http://logger.info|logger.info>("Making Prefect API Request")
    client = get_client()

    
    # Create a flow run for the given deployment
    asyncio.run(client.create_flow_run_from_deployment(deployment_id=deployment.id,
                                                           parameters=parameters,
                                                           tags=[cumulus_client_id, 'ecs']), debug=True)


    # <http://logger.info|logger.info>("Completed API requests")
    <http://logger.info|logger.info>("Completed API requests")
c

Christopher Boyd

10/11/2022, 4:24 PM
what’s the lambda look like
n

Nathaniel Russell

10/11/2022, 4:24 PM
^ flow code above (that is in the lambda), all of it succeeds and it logs the final line, then proceeds to crash with the given error.
c

Christopher Boyd

10/11/2022, 4:25 PM
is that the whole lambda?
[ERROR] OSError: [Errno 30] Read-only file system: '/home/sbx_user1051'
Traceback (most recent call last):
  File "/opt/prefect/service.py", line 63, in handler
    main_flow(event, context)
says it’s trying to write out to the filesystem in some way
so main_flow is the entrypoint to the flow itself, but the OSerror is a read-only filesystem
n

Nathaniel Russell

10/11/2022, 4:26 PM
Here is whole lambda:
import imp
import json
import os

from prefect.client import get_client
import asyncio

from prefect import flow, task, get_run_logger
import logging


"""
Logging configuration
"""



# Reset the logging config
#logger.remove()
#log_level = os.getenv('LOG_LEVEL', 'info').upper()
#log_format = '{level} {message}'

# Configure output to stdout
#logger.add(sys.stdout, format=log_format, level=log_level)



async def get_deployments():
    client = get_client()
    deployments = await client.read_deployments()
    return deployments

def get_deployment(client_id: str):
    """
    Get a deployment given its client id name.
    Args:
        - client_id (str): the client id (eg: sand01-dev)
    Returns:
        - Deployment: the deployment that matches the client_id
    """
    all_deployments = asyncio.run(get_deployments())
    deployment = None
    # Search for the deployment with a name equal to the client id
    for each in all_deployments:
        if each.name == client_id:
            deployment = each
            break
    
    return deployment



def handler(event, context):
    main_flow(event, context)


@flow(name="flow_run_launcher")
def main_flow(event, context):
    # Get prefect logger
    logger = get_run_logger()
    flow_logger = logging.getLogger('prefect.flow_runs')
    flow_logger.removeHandler("orion")

    # <http://logger.info|logger.info>('Received event')
    print('Received event')
    <http://logger.info|logger.info>(event)
    for record in event['Records']:
        event_body = json.loads(record.get('body'))
        #pull the body out & json load it
        detail = json.loads(json.dumps(event_body['detail']))
        detail = reorder_data_arrays(detail)
        # <http://logger.info|logger.info>(json.dumps(detail))
        <http://logger.info|logger.info>(json.dumps(detail))

   

    # Get prefect project id from cumulus_client_id
    cumulus_client_id = os.getenv("CUMULUS_CLIENT_ID")
    # <http://logger.info|logger.info>("got client id: " + str(cumulus_client_id))
    <http://logger.info|logger.info>("got client id: " + str(cumulus_client_id))

    # Get the deployment
    deployment = get_deployment(client_id=cumulus_client_id)
    # <http://logger.info|logger.info>("Retrieved deployment: " + str(deployment))
    <http://logger.info|logger.info>("Retrieved deployment: " + str(deployment))

    # Make sure we found the deployment
    if deployment is None:
        raise Exception("Deployment not found")


    # Add the banner event to a dict
    parameters = dict(lambda_event=event)
    # <http://logger.info|logger.info>("Set parameters")
    <http://logger.info|logger.info>("Set parameters")

    # Create the Prefect flow run passing the Banner event
    # <http://logger.info|logger.info>("Making Prefect API Request")
    <http://logger.info|logger.info>("Making Prefect API Request")
    client = get_client()

    
    # Create a flow run for the given deployment
    asyncio.run(client.create_flow_run_from_deployment(deployment_id=deployment.id,
                                                           parameters=parameters,
                                                           tags=[cumulus_client_id, 'ecs']), debug=True)


    # <http://logger.info|logger.info>("Completed API requests")
    <http://logger.info|logger.info>("Completed API requests")
    






# Reorders the additionalData and changeData to help with AWS log formatting
def reorder_data_arrays(event_dict):
    changedata,additionaldata = event_dict['changeData'], event_dict['additionalData']
    del event_dict['changeData']
    del event_dict['additionalData']
    event_dict['additionalData'] = additionaldata
    event_dict['changeData'] = changedata
    return event_dict
c

Christopher Boyd

10/11/2022, 4:26 PM
File "/usr/local/lib/python3.9/site-packages/prefect/results.py", line 15, in _persist_serialized_result
    await filesystem.write_path(key, content)
  File "/usr/local/lib/python3.9/site-packages/prefect/filesystems.py", line 198, in write_path
the async is trying to write out to a path to persist the results
is there a need to use async here?
n

Nathaniel Russell

10/11/2022, 4:27 PM
yes in order to make API calls to prefect cloud the calls need to be async
Unless you know of another way
c

Christopher Boyd

10/11/2022, 4:28 PM
if it’s outside of the flow yes; if it’s inside the flow, that should not be necessary
n

Nathaniel Russell

10/11/2022, 4:29 PM
oh, I did not know that, I will try that, thank you
c

Christopher Boyd

10/11/2022, 4:30 PM
I’m also a little curious on your function definitions; you have them outside the flow, but not designated as tasks to be wrapped
was that intentional?
n

Nathaniel Russell

10/11/2022, 4:32 PM
I will try setting them as tasks, as tasks do I not need any async/await?
c

Christopher Boyd

10/11/2022, 4:35 PM
No, they will be captured and wrapped by the flow
m

Mason Menges

10/11/2022, 4:36 PM
Unless the task function itself is async then not necessarily no, when the client is called within a flow/task context I believe we handle that, AFAIK at least.
c

Christopher Boyd

10/11/2022, 4:36 PM
anything outside of flow / tasks would
Also, by default, only /tmp is writable in lambda; Therefore this would work:
filepath = '/tmp/' + key
if needing to persist results; not relevant here, but the core of the issue
n

Nathaniel Russell

10/11/2022, 4:39 PM
So I removed the async stuff but it is still saying the same error. What is that filepath in reference to?
How can I change my code to write to /tmp if its prefect's code not mine that is trying to do the writing?
?
m

Mason Menges

10/11/2022, 5:30 PM
I believe you would need to configure this path variable
PREFECT_HOME='~/.prefect
to something like this
prefect config set PREFECT_HOME='/tmp/.prefect
You could also just define this as an environment variable as well Not 100% certain about that but this home variable is how prefect determines where to store configuration data.
c

Christopher Boyd

10/11/2022, 6:02 PM
this might be likely with using lambda in general ; what mason suggested by updating PREFECT_HOME to the tmp directory could be a short term. I’ll bring this up with the engineering team for a more persistent solution
n

Nathaniel Russell

10/11/2022, 6:06 PM
I tried updating PREFECT HOME to /tmp/.prefect in the dockerfile that builds the lambda but it didn't change anything:
/usr/local/lib/python3.9/site-packages/prefect/context.py:474: UserWarning: Failed to create the Prefect home directory at /home/sbx_user1051/.prefect
Here is the Dockerfile:
FROM prefecthq/prefect:2.4.2-python3.9
COPY service.py .
COPY requirements.txt .

RUN apt-get update
RUN apt-get install -y sqlite3 libsqlite3-dev



# PYTHON INSTALLS
RUN pip install -r requirements.txt
RUN pip install awslambdaric


# TRY MAKING A PREFECT PROFILE IN DOCKERFILE
RUN prefect config set PREFECT_HOME='/tmp/.prefect'
RUN prefect config set PREFECT_LOCAL_STORAGE_PATH='/tmp/.prefect/storage'

RUN prefect profile create lambda_prefect_profile
RUN prefect profile use lambda_prefect_profile
RUN prefect config set PREFECT_API_URL="********"
RUN prefect config set PREFECT_API_KEY="********"

RUN prefect config set PREFECT_LOGGING_LEVEL=DEBUG


# DEFINE ENV VARIABLES AND ENTRYPOINT
ENV CUMULUS_CLIENT_ID="sand02-dev"
ENTRYPOINT [ "/usr/local/bin/python", "-m", "awslambdaric" ]
CMD [ "service.handler" ]
(edited to hide API URL/KEY)
a

Anna Geller

10/11/2022, 6:12 PM
you can set the prefect home to /tmp which solves the issue https://github.com/anna-geller/prefect-aws-lambda/blob/master/serverless.yml#L24
c

Christopher Boyd

10/11/2022, 6:14 PM
Hey Nathaniel, talking with the team, this is being worked on long term and will be resolved here: https://github.com/PrefectHQ/prefect/pull/6908 what Anna suggested would be the short term to get this functional by changing the prefect location
n

Nathaniel Russell

10/11/2022, 6:29 PM
hmm so I also tried this:
RUN prefect config set PREFECT_HOME='/tmp'
and that doesn't seem to do it either:
/usr/local/lib/python3.9/site-packages/prefect/logging/handlers.py:76: UserWarning: Failed to create the Prefect home directory at /home/sbx_user1051/.prefect
a

Anna Geller

10/11/2022, 6:31 PM