Michelle Brochmann
05/04/2022, 4:56 PM/opt/prefect/healthcheck.py:130: UserWarning: Flow uses module which is not importable. Refer to documentation on how to import custom modules <https://docs.prefect.io/api/latest/storage.html#docker>
flows = cloudpickle_deserialization_check(flow_file_paths)
Is there an obvious reason why this would happen? My tasks are defined in a module that I’m importing into the file where the Flows are defined. I can provide more info if needed!class WriteMessageToFileWithDelayTask(Task):
def run(self, message: str, delay: int, filepath: str) -> bool:
return write_message_to_file_with_delay.run(message, delay, filepath)
Could the problem be the way I am calling the decorated task in the subclassed task?!Kevin Kho
05/04/2022, 5:03 PMMichelle Brochmann
05/04/2022, 5:04 PMKevin Kho
05/04/2022, 5:04 PMDocker
storage looks like?Michelle Brochmann
05/04/2022, 5:06 PMdocker_storage = Docker(image_name=docker_image_name, image_tag=docker_image_tag)
docker_run_config = DockerRun(image=docker_image_name)
flow.run_config = docker_run_config
flow.storage = docker_storage
(And I register the flow via flow.register
with a project name and some labels - nothing else - this step is where the container appears to be built and I see the error.)Kevin Kho
05/04/2022, 5:10 PMMichelle Brochmann
05/04/2022, 5:15 PMTraceback (most recent call last):
File "/opt/prefect/healthcheck.py", line 130, in <module>
flows = cloudpickle_deserialization_check(flow_file_paths)
File "/opt/prefect/healthcheck.py", line 43, in cloudpickle_deserialization_check
flows.append(cloudpickle.loads(flow_bytes))
ModuleNotFoundError: No module named 'example_dags'
Apologies for not including it earlier - I figured it was downstream of the initial problem.
When you say “import inside the container” are you asking if I’ve done anything extra to install needed modules inside the container? If so the answer is no - I’ve assumed that everything imported to define Flow would be automatically included when the Flow is stored.Kevin Kho
05/04/2022, 5:17 PMimport example_dags
is working when you import locally, but inside the container this import can’t resolve. There is a snippet on custom modules hereMichelle Brochmann
05/04/2022, 5:17 PMKevin Kho
05/04/2022, 5:19 PMMichelle Brochmann
05/04/2022, 5:20 PMfile_io_flows
- I import this in yet a third file, where I set up the storage and register the flow as described above.
from example_dags.file_io_tasks import (
ReadMessageFromFileWithDelayTask,
WriteMessageToFileWithDelayTask,
read_message_from_file_with_delay,
write_message_to_file_with_delay
)
def get_file_io_flow(name: str = _DEFAULT_FILE_IO_FLOW_NAME) -> Tuple[Flow, Task]:
"""Imperative style Flow demonstrating use of subclassed Tasks."""
read_delay = Parameter('read_delay', default=0)
write_delay = Parameter('write_delay', default=0)
message = Parameter('message', default='HELLO')
filepath = Parameter('filepath', default='./test_file.txt')
read_message = ReadMessageFromFileWithDelayTask()
write_message = WriteMessageToFileWithDelayTask()
flow = Flow(name, tasks = [read_message, write_message])
flow.set_dependencies(task=read_message,
upstream_tasks=[write_message, read_delay, filepath],
keyword_tasks={'delay': read_delay,
'filepath': filepath})
flow.set_dependencies(task=write_message,
upstream_tasks=[write_delay, filepath, message],
keyword_tasks={'delay': write_delay,
'filepath': filepath,
'message': message})
return flow, read_message
def get_serializable_file_io_flow(name: str = _DEFAULT_FILE_IO_FLOW_NAME) -> Tuple[Flow, Task]:
"""Functional Flow that uses decorated tasks.
Using this for Docker storage run since the imperative get_file_io_flow resulted in an error
that something was not serializable.
"""
with Flow(name) as flow:
read_delay = Parameter('read_delay', default=0)
write_delay = Parameter('write_delay', default=0)
message = Parameter('message', default='HELLO')
filepath = Parameter('filepath', default='./test_file.txt')
write_message = write_message_to_file_with_delay(message, write_delay, filepath)
read_message = read_message_from_file_with_delay(read_delay, filepath)
flow.set_dependencies(task=read_message, upstream_tasks=[write_message])
return flow, read_message
@task
def write_message_to_file_with_delay(message: str,
delay: int,
filepath: str) ->bool:
"""Writes message to file after delay."""
sleep(delay)
with open(filepath, 'w', encoding='utf-8') as stream:
stream.write(message)
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f'*~*~* wrote message to file after {delay} second(s) *~*~*')
return True
class WriteMessageToFileWithDelayTask(Task):
def run(self, message: str, delay: int, filepath: str) -> bool:
sleep(delay)
with open(filepath, 'w', encoding='utf-8') as stream:
stream.write(message)
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f'*~*~* wrote message to file after {delay} second(s) *~*~*')
return True
Kevin Kho
05/04/2022, 5:29 PMinit
and run
. for example,
mytask = MyTask(..)
with Flow(...) as flow:
mytask()
the first is the init
and the second is the run
. The init
is made during flow registration and the run is deferred so you need the dependency to both register and run.
The second one though
@task
def mytask(x):
return x
with Flow(...) as flow:
mytask()
doesn’t have the same built time init
@task
is serialized along with the Flow but looks like the custom Task class is notMichelle Brochmann
05/04/2022, 5:31 PMKevin Kho
05/04/2022, 5:32 PMMichelle Brochmann
05/04/2022, 5:32 PMKevin Kho
05/04/2022, 5:33 PMMichelle Brochmann
05/04/2022, 5:57 PMKevin Kho
05/04/2022, 5:59 PMMichelle Brochmann
05/04/2022, 5:59 PM'prefect.tasks.core.function.FunctionTask'
, but the type of the serialized subclassed task is 'example_dags.file_io_tasks.WriteMessageToFileWithDelayTask'
. The first is defined in the executor but the second is not. To deserialize, the type must be defined!
So I think it might be more correct to say that both are serialized correctly, but the “shape of the vessel” that they get deserialized to is available in the decorated case but not in the subclassed case.Kevin Kho
05/05/2022, 2:47 AM