Jacob Goldberg
05/23/2023, 10:01 PMfrom prefect.storage import Docker
from prefect.run_configs import DockerRun
import os
from prefect.utilities.storage import extract_flow_from_file
# locate all flows and extract from files
flow_directory = "..."
flow_file_names = [...]
all_flows = [
extract_flow_from_file(os.path.join(flow_directory, file_name))
for file_name in flow_file_names
]
# define flow storage/config
STORAGE = Docker(...)
RUN_CONFIG = DockerRun(...)
def build_and_register_all_flows(
flow_list,
storage=STORAGE,
run_config=RUN_CONFIG,
):
# assign run config and storage to all flows
for flow in flow_list:
flow.run_config = run_config
storage.add_flow(flow)
# build the Docker Image
storage = storage.build()
for flow in flow_list:
# Reassign the new storage object to each Flow
flow.storage = storage
# Register each flow without building a second time
flow.register(...)
if __name__ == "__main__":
build_and_register_all_flows(all_flows)
This pattern relied heavily on the extract_flow_from_file
utility from prefect 1.x which I am not sure exists in prefect 2.x
I have found this deployment tutorial on the prefect site that shows a reasonably familiar pattern in the “Deployment creation with Python” section that I hope to follow. However, it only applies to one flow.
Can someone help me understand how to apply this to multiple flows? is there a utility similar to the extract_flow_from_file
utility from prefect 1.x that can help me to bundle up a bunch of flows and deploy them together?Javier Ebrero
05/24/2023, 10:26 AMdef build_flows(register=False):
env_vars = get_env_vars()
# Iterate over each file in the folder
for filename in os.listdir(FLOWS_PATH):
if filename.endswith(".py"):
module_name = filename[:-3] # Remove the .py extension
module_path = os.path.join(FLOWS_PATH, filename)
try:
current_flow = load_flow_from_script(path=module_path)
# Register the flow with the project
if register:
deployment = Deployment.build_from_flow(
flow=current_flow,
name="My deployment",
infra_overrides={"env": env_vars},
apply=True,
)
deployment.apply(upload=True)
print(f"Registered flow: {module_name}")
except Exception as e:
logging.exception(f"There is an error trying to build flow {module_name}: \n{e}")
But I am getting the following error during the deployment:
10:17:59.024 | ERROR | root - There is an error trying to build flow
No module named '__prefect_loader__'
Traceback (most recent call last):
File "/usr/src/app/arma_flows/build_flows.py", line 37, in build_flows
deployment = Deployment.build_from_flow(
File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 260, in coroutine_wrapper
return call()
File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 245, in __call__
return self.result()
File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
return self.future.result(timeout=timeout)
File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 439, in result
return self.__get_result()
File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
raise self._exception
File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
result = await coro
File "/usr/local/lib/python3.9/site-packages/prefect/deployments.py", line 772, in build_from_flow
module = importlib.import_module(mod_name)
File "/usr/local/lib/python3.9/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1030, in _gcd_import
File "<frozen importlib._bootstrap>", line 1007, in _find_and_load
File "<frozen importlib._bootstrap>", line 984, in _find_and_load_unlocked
ModuleNotFoundError: No module named '__prefect_loader__'
Jacob Goldberg
05/24/2023, 5:05 PMTraceback (most recent call last):
File "/Users/jacob/Desktop/work/arable_codebase/calval-etl/calval_etl/prefect_flows/support_functions/flow_registry.py", line 29, in <module>
deployment = Deployment.build_from_flow(
File "/Users/jacob/.pyenv/versions/3.10.5/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 260, in coroutine_wrapper
return call()
File "/Users/jacob/.pyenv/versions/3.10.5/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 245, in __call__
return self.result()
File "/Users/jacob/.pyenv/versions/3.10.5/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
return self.future.result(timeout=timeout)
File "/Users/jacob/.pyenv/versions/3.10.5/lib/python3.10/concurrent/futures/_base.py", line 439, in result
return self.__get_result()
File "/Users/jacob/.pyenv/versions/3.10.5/lib/python3.10/concurrent/futures/_base.py", line 391, in __get_result
raise self._exception
File "/Users/jacob/.pyenv/versions/3.10.5/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
result = await coro
File "/Users/jacob/.pyenv/versions/3.10.5/lib/python3.10/site-packages/prefect/deployments.py", line 772, in build_from_flow
module = importlib.import_module(mod_name)
File "/Users/jacob/.pyenv/versions/3.10.5/lib/python3.10/importlib/__init__.py", line 126, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1050, in _gcd_import
File "<frozen importlib._bootstrap>", line 1027, in _find_and_load
File "<frozen importlib._bootstrap>", line 1004, in _find_and_load_unlocked
ModuleNotFoundError: No module named '__prefect_loader__'
Javier Ebrero
05/25/2023, 7:29 AM