Hi team, really struggling to figure out how to bu...
# ask-community
n
Hi team, really struggling to figure out how to build flows with inheritance so we can enforce a structure for similar flows. Running Prefect 2.14.2 At the moment my code is in this file structure with the following snippets:
Copy code
├── app
│   ├── flows
│   │   ├── AbstractFlow.py
|   |   ├── ConcreteFlow1.py
|   |   ├── ConcreteFlow2.py
|   |   ├── __init__.py
│   ├── deployments.py
Copy code
# AbstractFlow.py

class AbstractFlow:

    @task
    @staticmethod
    def task1():
        pass

    @task
    @staticmethod
    def task2():
        pass

    @classmethod
    def runconfig(self, workspace_id):
        self.task1()
        self.task2()
Copy code
# ConcreteFlow1.py

class ConcreteFlow1(AbstractFlow):

    @task(name='concrete-flow-task-1')
    @staticmethod
    def task1():
        return 1

    @task(name='concrete-flow-task-2')
    @staticmethod
    def task2():
        return 2
Copy code
from prefect.deployments import Deployment
from prefect import flow
from prefect.filesystems import LocalFileSystem
from flows.ConcreteFlow1 import ConcreteFlow1

block_storage = LocalFileSystem(basepath="/flows")
block_storage.save("local-storage", overwrite=True)

deployment = Deployment.build_from_flow(
    name="Concrete Deployment",
    flow=flow(name="concreteflow1")(ConcreteFlow1.runconfig),
    storage=LocalFileSystem.load("local-storage"),
    work_queue_name="default",
)

if __name__ == "__main__":
    deployment.apply()
I get the following error when I try to run my deployment:
Copy code
Flow could not be retrieved from deployment.
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/prefect/flows.py", line 973, in load_flow_from_entrypoint
    flow = import_object(entrypoint)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/importtools.py", line 214, in import_object
    return getattr(module, object_name)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: module '__prefect_loader__' has no attribute 'runconfig'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 394, in retrieve_flow_then_begin_flow_run
    flow = await load_flow_from_flow_run(flow_run, client=client)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/client/utilities.py", line 51, in with_injected_client
    return await fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/deployments/deployments.py", line 221, in load_flow_from_flow_run
    flow = await run_sync_in_worker_thread(load_flow_from_entrypoint, str(import_path))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/anyio/to_thread.py", line 33, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 877, in run_sync_in_worker_thread
    return await future
           ^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 807, in run
    result = context.run(func, *args)
             ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/flows.py", line 975, in load_flow_from_entrypoint
    raise MissingFlowError(
prefect.exceptions.MissingFlowError: Flow function with name 'runconfig' not found in 'flows/AbstractFlow.py'.
For the life of me, I cannot figure out the issue here. Does anyone see an immediate issue? Thank you very much in advance.
1
k
The flow function can't be found because it's not an attribute of
AbstractFlow.py
, but in the
AbstractFlow
class. The deployment you created is looking for
runconfig
in
flows/AbstractFlow.py
because the entrypoint constructed by
build_from_flow()
is
flows/AbstractFlow.py:runconfig
. In general, flow functions are expected not to be class or instance methods.
An alternative you could try is having
runconfig
simply call the tasks in order, without being a flow.
Copy code
# AbstractFlow.py
from prefect import task

class AbstractFlow:
    @staticmethod
    @task
    def task1():
        pass
    
    @staticmethod
    @task
    def task2():
        pass
    
    @classmethod
    def runconfig(self):
        self.task1()
        self.task2()
Then, in another file, you could define any number of flows that utilize classes that implement the tasks, and run, serve, or deploy from there.
Copy code
from prefect import flow
from flows.ConcreteFlow1 import ConcreteFlow1

@flow
def my_flow():
    ConcreteFlow1.runconfig()

if __name__ == "__main__":
    my_flow()
🙌 1
n
fantastic, thanks Kevin. Happy Halloween!
👻 1