Hello, I am experiencing some strange behavior wit...
# prefect-community
d
Hello, I am experiencing some strange behavior with Exceptions and I can't find anything in discourse about it. Exceptions being thrown from flow code are caught, but exceptions being thrown from libraries imported in the virtual environment are not being caught. Details in thread ...
1
pseudo-code to explain what I'm seeing: foo_flow.py (deployed to s3)
Copy code
def f():
  raise Exception("flow exception")

@flow()
def my_flow():
  ...
  try:
    f()
  except Exception:
    <http://logger.info|logger.info>("caught flow exception")

  try:
    g()
  except Exception:
    <http://logger.info|logger.info>("caught library exception")

 if __name__ == "__main__":
   my_flow()
my_library/__init__.py
(NOT deployed, but instead installed in a virtualenv)
Copy code
def g():
  raise Exception("library exception")
I am running the prefect server and the agent locally. The agent is running in a virtualenv with the package
my_library
installed. When the flow is run in the agent, the try-catch fails to catch the exception thrown by g(). Instead the flow just halts, (the agent continues to run) with no output in either the UI or the agent. When I run
foo_flow.py
directly (in an environment that has
my_library
installed), then the try-catch is able to catch the exception thrown by g()! The stack trace DOES mention async io. This is all happening directly inside the flow. No tasks have been called yet. And in the real code where I saw this, both places where throwing a vanilla
Exception
. I've searched discourse but I can't find any issues related to this. Is there something wrong with my setup here? Or should I create a minimal repro and file a bug?
z
That’s really weird. Is it a literal
Exception
type or is it a subclass?
d
literal Exception. If this is not expected behavior than I can start working on a minimal repro
z
Definitely not expected behavior.
A minimal repro would be great! If you can do it without an agent that’d be great.
Possibly by calling
prefect.utilities.importtools.import_object
directly to get your flow then just running it directly.
If you can’t reproduce with that, it’s probably specific to deployments and we’ll dig into the longer repo
d
Ok I'll see what I can do.
🙏 1
Ok I have got a condensed repro. It still only happens when run in an agent. I'm not sure how to use
import_object
to test.
repro_job.py
Copy code
from prefect import flow, get_run_logger
from prefect import get_run_logger


def info():
    return {
        "job_family": "ReproJob",
        "dag_service": "pipelines_dashboard",
        "dag_method": "get_dag",
        "flow_function_name": "repro_flow",
    }


@flow()
def repro_flow():
    logger = get_run_logger()
    <http://logger.info|logger.info>("about to call config lib")
    aiq_repro.get_config().get_svc_port(None, None)
    <http://logger.info|logger.info>("DONE")
The library which is installed to the virtual env:
aiq_repro/__init__.py
Copy code
import logging
import logging.config
from pyhocon import ConfigFactory


class ConfigContainer:
    config_instance = None


class Config:
    def __init__(self):
        logging.config.dictConfig({ "version": 1 })
        self._conf = ConfigFactory.from_dict({})

    def get_svc_port(self, customer_id, service_name):
        raise Exception("repro test")

def get_config():
    if not ConfigContainer.config_instance:
        ConfigContainer.config_instance = Config()

    return ConfigContainer.config_instance
My environment:
Copy code
pip freeze | grep pyhocon
pyhocon==0.3.59
python --version
Python 3.7.9
prefect --version
2.4.0
The configuration I'm using:
Copy code
#!/usr/bin/env python3
import getpass
from prefect.filesystems import S3
from prefect.infrastructure.process import Process

def register_storage():
    block = S3(
        bucket_path="BUCKET PATH",
        aws_access_key_id=input("enter AWS access id>"), 
        aws_secret_access_key=getpass.getpass("enter AWS secret key>")
    )
    block.save("secondblock", overwrite=True)

def register_process():
    process = Process()
    process.save("process-block1", overwrite=True)

if __name__ == "__main__":
    register_storage()
    register_process()