David Cupp
09/30/2022, 7:29 PMdef f():
raise Exception("flow exception")
@flow()
def my_flow():
...
try:
f()
except Exception:
<http://logger.info|logger.info>("caught flow exception")
try:
g()
except Exception:
<http://logger.info|logger.info>("caught library exception")
if __name__ == "__main__":
my_flow()
my_library/__init__.py
(NOT deployed, but instead installed in a virtualenv)
def g():
raise Exception("library exception")
I am running the prefect server and the agent locally. The agent is running in a virtualenv with the package my_library
installed.
When the flow is run in the agent, the try-catch fails to catch the exception thrown by g(). Instead the flow just halts, (the agent continues to run) with no output in either the UI or the agent.
When I run foo_flow.py
directly (in an environment that has my_library
installed), then the try-catch is able to catch the exception thrown by g()! The stack trace DOES mention async io.
This is all happening directly inside the flow. No tasks have been called yet. And in the real code where I saw this, both places where throwing a vanilla Exception
.
I've searched discourse but I can't find any issues related to this. Is there something wrong with my setup here? Or should I create a minimal repro and file a bug?Zanie
09/30/2022, 7:37 PMException
type or is it a subclass?David Cupp
09/30/2022, 7:38 PMZanie
09/30/2022, 7:39 PMprefect.utilities.importtools.import_object
directly to get your flow then just running it directly.David Cupp
09/30/2022, 7:48 PMimport_object
to test.
repro_job.py
from prefect import flow, get_run_logger
from prefect import get_run_logger
def info():
return {
"job_family": "ReproJob",
"dag_service": "pipelines_dashboard",
"dag_method": "get_dag",
"flow_function_name": "repro_flow",
}
@flow()
def repro_flow():
logger = get_run_logger()
<http://logger.info|logger.info>("about to call config lib")
aiq_repro.get_config().get_svc_port(None, None)
<http://logger.info|logger.info>("DONE")
The library which is installed to the virtual env:
aiq_repro/__init__.py
import logging
import logging.config
from pyhocon import ConfigFactory
class ConfigContainer:
config_instance = None
class Config:
def __init__(self):
logging.config.dictConfig({ "version": 1 })
self._conf = ConfigFactory.from_dict({})
def get_svc_port(self, customer_id, service_name):
raise Exception("repro test")
def get_config():
if not ConfigContainer.config_instance:
ConfigContainer.config_instance = Config()
return ConfigContainer.config_instance
My environment:
pip freeze | grep pyhocon
pyhocon==0.3.59
python --version
Python 3.7.9
prefect --version
2.4.0
The configuration I'm using:
#!/usr/bin/env python3
import getpass
from prefect.filesystems import S3
from prefect.infrastructure.process import Process
def register_storage():
block = S3(
bucket_path="BUCKET PATH",
aws_access_key_id=input("enter AWS access id>"),
aws_secret_access_key=getpass.getpass("enter AWS secret key>")
)
block.save("secondblock", overwrite=True)
def register_process():
process = Process()
process.save("process-block1", overwrite=True)
if __name__ == "__main__":
register_storage()
register_process()