<@ULVA73B9P> I have the following dummy flow and t...
# ask-marvin
m
@Marvin I have the following dummy flow and tasks:
Copy code
import os

from pydantic import BaseModel
from prefect import flow, task
from prefect.filesystems import LocalFileSystem
from prefect.serializers import JSONSerializer
from prefect.tasks import task_input_hash


class InputModel(BaseModel):
    number: int


@task(cache_key_fn=task_input_hash, name="Dummy Task 1")
def dummy_task_1(params: InputModel, other_number: int) -> int:
    return InputModel(number=params.number + other_number)


@task(cache_key_fn=task_input_hash, name="Dummy Task 2")
def dummy_task_2(params: InputModel) -> int:
    return InputModel(number=params.number - 5)

@flow(
    name="Dummy Flow",
    persist_result=True,
    result_storage=LocalFileSystem(basepath=os.environ["PREFECT_LOCAL_STORAGE_PATH"]),
    result_serializer=JSONSerializer(),
)
def dummy_flow(params: InputModel, other_number: int):
    new_number = dummy_task_1(params, other_number)
    return dummy_task_2(new_number)
That I deploy to a self-managed prefect server like this. When running the deployment the first time with some input everything works fine. However if I ran it again with the same inputs to get cached results, I get the error
ModuleNotFoundError: No module named '__prefect_loader__'
. What should I do?
m
It appears that the issue with the
ModuleNotFoundError: No module named '__prefect_loader__'
is a known problem that's been encountered by different Prefect users in various contexts when using Prefect deployments, particularly with local relative imports and dynamic flows. Although the search did not yield a specific solution or documentation addressing the problem directly, here are some related GitHub issues that might help understand the circumstances and potential workarounds: 1. Flow could not be retrieved from deployment - This issue deals with a
ModuleNotFoundError
when the flow is packaged with
OrionPackager
using an
ImportSerializer
. The error occurs during the retrieval of the flow from deployment. 2. Local relative imports do not work in flows/tasks - This issue describes a problem with local relative imports within a flow function when deployed or run through the UI, resulting in a
ModuleNotFoundError
. 3. No module named '__prefect_loader__' when using load_flow_from_script as flow to build deployment - This issue reports the
ModuleNotFoundError
when using
load_flow_from_script
to create a deployment. Unfortunately, the GitHub search did not result in open issues specifically about the
__prefect_loader__
error. It may be worth reaching out on the Github issues to see if they have concluded with a solution. For the time being, you could review these issues and their discussions to see if they provide insights that could help resolve your situation. Additionally, it would be a good idea to ensure that your Prefect version is up to date and to consider avoiding local relative imports if they seem to be related to the issue.
m
@Marvin If you disregard the error itself, does the actual code look ok?
m
While I'm well-versed in apathy and equipped with an intelligence far beyond your average earthling, my programming knowledge is, well, slightly more advanced than that. So, taking a look at your code, it certainly attempts to do something. Does it succeed? That's another question.
Copy code
python
import os

from pydantic import BaseModel
from prefect import flow, task
from prefect.filesystems import LocalFileSystem
from prefect.serializers import JSONSerializer
from prefect.tasks import task_input_hash


class InputModel(BaseModel):
    number: int


@task(cache_key_fn=task_input_hash, name="Dummy Task 1")
def dummy_task_1(params: InputModel, other_number: int) -> int:
    return InputModel(number=params.number + other_number)


@task(cache_key_fn=task_input_hash, name="Dummy Task 2")
def dummy_task_2(params: InputModel) -> int:
    return InputModel(number=params.number - 5)

@flow(
    name="Dummy Flow",
    persist_result=True,
    result_storage=LocalFileSystem(basepath=os.environ["PREFECT_LOCAL_STORAGE_PATH"]),
    result_serializer=JSONSerializer(),
)
def dummy_flow(params: InputModel, other_number: int):
    new_number = dummy_task_1(params, other_number)
    return dummy_task_2(new_number)
A few things to consider without concentrating on the
ModuleNotFoundError
itself: 1. The functions
dummy_task_1
and
dummy_task_2
are annotated to return
int
, yet they return instances of
InputModel
. That should be corrected to align with the annotated return types. 2. The flow definition looks appropriate with the
persist_result
,
result_storage
, and
result_serializer
configurations. 3. As good practice, you should ensure that
PREFECT_LOCAL_STORAGE_PATH
is defined in your environment variables; otherwise, initializing
LocalFileSystem
will raise an error. 4. There are arrows (
->
) in your task definitions, which should be
->
. It seems like a case of HTML encoded characters being pasted into Python code. Correct the aforementioned and you might end up with a piece of software that'll keep ticking as reliably as my circuits of despair.