Nachi Mehta
10/31/2023, 10:35 PM├── app
│ ├── flows
│ │ ├── AbstractFlow.py
| | ├── ConcreteFlow1.py
| | ├── ConcreteFlow2.py
| | ├── __init__.py
│ ├── deployments.py
# AbstractFlow.py
class AbstractFlow:
@task
@staticmethod
def task1():
pass
@task
@staticmethod
def task2():
pass
@classmethod
def runconfig(self, workspace_id):
self.task1()
self.task2()
# ConcreteFlow1.py
class ConcreteFlow1(AbstractFlow):
@task(name='concrete-flow-task-1')
@staticmethod
def task1():
return 1
@task(name='concrete-flow-task-2')
@staticmethod
def task2():
return 2
from prefect.deployments import Deployment
from prefect import flow
from prefect.filesystems import LocalFileSystem
from flows.ConcreteFlow1 import ConcreteFlow1
block_storage = LocalFileSystem(basepath="/flows")
block_storage.save("local-storage", overwrite=True)
deployment = Deployment.build_from_flow(
name="Concrete Deployment",
flow=flow(name="concreteflow1")(ConcreteFlow1.runconfig),
storage=LocalFileSystem.load("local-storage"),
work_queue_name="default",
)
if __name__ == "__main__":
deployment.apply()
I get the following error when I try to run my deployment:
Flow could not be retrieved from deployment.
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/prefect/flows.py", line 973, in load_flow_from_entrypoint
flow = import_object(entrypoint)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/utilities/importtools.py", line 214, in import_object
return getattr(module, object_name)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: module '__prefect_loader__' has no attribute 'runconfig'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 394, in retrieve_flow_then_begin_flow_run
flow = await load_flow_from_flow_run(flow_run, client=client)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/client/utilities.py", line 51, in with_injected_client
return await fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/deployments/deployments.py", line 221, in load_flow_from_flow_run
flow = await run_sync_in_worker_thread(load_flow_from_entrypoint, str(import_path))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/anyio/to_thread.py", line 33, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 877, in run_sync_in_worker_thread
return await future
^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 807, in run
result = context.run(func, *args)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/flows.py", line 975, in load_flow_from_entrypoint
raise MissingFlowError(
prefect.exceptions.MissingFlowError: Flow function with name 'runconfig' not found in 'flows/AbstractFlow.py'.
For the life of me, I cannot figure out the issue here. Does anyone see an immediate issue? Thank you very much in advance.Kevin Grismore
11/01/2023, 1:44 AMAbstractFlow.py
, but in the AbstractFlow
class. The deployment you created is looking for runconfig
in flows/AbstractFlow.py
because the entrypoint constructed by build_from_flow()
is flows/AbstractFlow.py:runconfig
. In general, flow functions are expected not to be class or instance methods.Kevin Grismore
11/01/2023, 1:46 AMrunconfig
simply call the tasks in order, without being a flow.
# AbstractFlow.py
from prefect import task
class AbstractFlow:
@staticmethod
@task
def task1():
pass
@staticmethod
@task
def task2():
pass
@classmethod
def runconfig(self):
self.task1()
self.task2()
Then, in another file, you could define any number of flows that utilize classes that implement the tasks, and run, serve, or deploy from there.
from prefect import flow
from flows.ConcreteFlow1 import ConcreteFlow1
@flow
def my_flow():
ConcreteFlow1.runconfig()
if __name__ == "__main__":
my_flow()
Nachi Mehta
11/01/2023, 2:12 AM