<@ULVA73B9P> I have the following dummy flow and t...
# ask-marvin
m
@Marvin I have the following dummy flow and tasks:
Copy code
import os

from pydantic import BaseModel
from prefect import flow, task
from prefect.filesystems import LocalFileSystem
from prefect.serializers import JSONSerializer
from prefect.tasks import task_input_hash


class InputModel(BaseModel):
    number: int


@task(cache_key_fn=task_input_hash, name="Dummy Task 1")
def dummy_task_1(params: InputModel, other_number: int) -> int:
    return InputModel(number=params.number + other_number)


@task(cache_key_fn=task_input_hash, name="Dummy Task 2")
def dummy_task_2(params: InputModel) -> int:
    return InputModel(number=params.number - 5)

@flow(
    name="Dummy Flow",
    persist_result=True,
    result_storage=LocalFileSystem(basepath=os.environ["PREFECT_LOCAL_STORAGE_PATH"]),
    result_serializer=JSONSerializer(),
)
def dummy_flow(params: InputModel, other_number: int):
    new_number = dummy_task_1(params, other_number)
    return dummy_task_2(new_number)
That I deploy to a self-managed prefect server like this:
Copy code
dummy_flow_coro = dummy_flow.to_deployment(name=f"{dummy_flow.name} - {flow_tag}")
_ = serve(dummy_flow_coro)  # type: ignore[arg-type]
When running the deployment the first time with some input everything works fine. However if I ran it again with the same inputs to get cached results, I get the error
ModuleNotFoundError: No module named '__prefect_loader__'
. What should I do?
m
Your message was too long by 39 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
I have the following dummy flow and tasks:
Copy code
import os

from pydantic import BaseModel
from prefect import flow, task
from prefect.filesystems import LocalFileSystem
from prefect.serializers import JSONSerializer
from prefect.tasks import task_input_hash


class InputModel(BaseModel):
    number: int


@task(cache_key_fn=task_input_hash, name="Dummy Task 1")
def dummy_task_1(params: InputModel, other_number: int) -> int:
    return InputModel(number=params.number + other_number)


@task(cache_key_fn=task_input_hash, name="Dummy Task 2")
def dummy_task_2(params: InputModel) -> int:
    return InputModel(number=params.number - 5)

@flow(
    name="Dummy Flow",
    persist_result=True,
    result_storage=LocalFileSystem(basepath=os.environ["PREFECT_LOCAL_STORAGE_PATH"]),
    result_serializer=JSONSerializer(),
)
def dummy_flow(params: InputModel, other_number: int):
    new_number = dummy_task_1(params, other_number)
    return dummy_task_2(new_number)
That I deploy to a self-managed prefect server like this:
Copy code
dummy_flow_coro = dummy_flow.to_deployment(name=f"{dummy_flow.name} - {flow_tag}")
_ = serve(dummy_flow_coro)  # type: ignore[arg-type]
When running the deployment the first time with some input everything works fine.