Matic Pecovnik
03/20/2024, 8:53 AMimport os
from pydantic import BaseModel
from prefect import flow, task
from prefect.filesystems import LocalFileSystem
from prefect.serializers import JSONSerializer
from prefect.tasks import task_input_hash
class InputModel(BaseModel):
number: int
@task(cache_key_fn=task_input_hash, name="Dummy Task 1")
def dummy_task_1(params: InputModel, other_number: int) -> int:
return InputModel(number=params.number + other_number)
@task(cache_key_fn=task_input_hash, name="Dummy Task 2")
def dummy_task_2(params: InputModel) -> int:
return InputModel(number=params.number - 5)
@flow(
name="Dummy Flow",
persist_result=True,
result_storage=LocalFileSystem(basepath=os.environ["PREFECT_LOCAL_STORAGE_PATH"]),
result_serializer=JSONSerializer(),
)
def dummy_flow(params: InputModel, other_number: int):
new_number = dummy_task_1(params, other_number)
return dummy_task_2(new_number)
That I deploy to a self-managed prefect server like this:
dummy_flow_coro = dummy_flow.to_deployment(name=f"{dummy_flow.name} - {flow_tag}")
_ = serve(dummy_flow_coro) # type: ignore[arg-type]
When running the deployment the first time with some input everything works fine. However if I ran it again with the same inputs to get cached results, I get the error ModuleNotFoundError: No module named '__prefect_loader__'
. What should I do?Marvin
03/20/2024, 8:53 AMI have the following dummy flow and tasks:
import os
from pydantic import BaseModel
from prefect import flow, task
from prefect.filesystems import LocalFileSystem
from prefect.serializers import JSONSerializer
from prefect.tasks import task_input_hash
class InputModel(BaseModel):
number: int
@task(cache_key_fn=task_input_hash, name="Dummy Task 1")
def dummy_task_1(params: InputModel, other_number: int) -> int:
return InputModel(number=params.number + other_number)
@task(cache_key_fn=task_input_hash, name="Dummy Task 2")
def dummy_task_2(params: InputModel) -> int:
return InputModel(number=params.number - 5)
@flow(
name="Dummy Flow",
persist_result=True,
result_storage=LocalFileSystem(basepath=os.environ["PREFECT_LOCAL_STORAGE_PATH"]),
result_serializer=JSONSerializer(),
)
def dummy_flow(params: InputModel, other_number: int):
new_number = dummy_task_1(params, other_number)
return dummy_task_2(new_number)
That I deploy to a self-managed prefect server like this:
dummy_flow_coro = dummy_flow.to_deployment(name=f"{dummy_flow.name} - {flow_tag}")
_ = serve(dummy_flow_coro) # type: ignore[arg-type]
When running the deployment the first time with some input everything works fine.