https://prefect.io logo
Title
m

Michelle Brochmann

05/04/2022, 4:56 PM
I am trying to run two versions of a Flow on a Docker container using Docker Storage - the only (intended) difference is that one is built from tasks created via the @task decorator, and the other is built from tasks subclassed from the Task object. The decorator version runs fine, but the subclassed version results in this error when the Docker container is being built:
/opt/prefect/healthcheck.py:130: UserWarning: Flow uses module which is not importable. Refer to documentation on how to import custom modules <https://docs.prefect.io/api/latest/storage.html#docker>
flows = cloudpickle_deserialization_check(flow_file_paths)
Is there an obvious reason why this would happen? My tasks are defined in a module that I’m importing into the file where the Flows are defined. I can provide more info if needed!
The subclassed tasks look like this:
class WriteMessageToFileWithDelayTask(Task):
def run(self, message: str, delay: int, filepath: str) -> bool:
return write_message_to_file_with_delay.run(message, delay, filepath)
Could the problem be the way I am calling the decorated task in the subclassed task?!
(Update: I tried putting the task code directly into the run method instead of calling the decorated task but that didn’t seem to help. 😞)
k

Kevin Kho

05/04/2022, 5:03 PM
Are you importing this task from another file?
m

Michelle Brochmann

05/04/2022, 5:04 PM
Yes.
k

Kevin Kho

05/04/2022, 5:04 PM
Could you show me what
Docker
storage looks like?
m

Michelle Brochmann

05/04/2022, 5:06 PM
Hope I understand the question correctly - I’m setting the storage/run_config like this, and setting it in the Flow:
docker_storage = Docker(image_name=docker_image_name, image_tag=docker_image_tag)
docker_run_config = DockerRun(image=docker_image_name)
flow.run_config = docker_run_config
flow.storage = docker_storage
(And I register the flow via
flow.register
with a project name and some labels - nothing else - this step is where the container appears to be built and I see the error.)
k

Kevin Kho

05/04/2022, 5:10 PM
Yeah so is that import you are doing inside the container also?
m

Michelle Brochmann

05/04/2022, 5:15 PM
Hmm, perhaps not (I’m not 100% sure I understand the question) because I also see this error:
Traceback (most recent call last):
File "/opt/prefect/healthcheck.py", line 130, in <module>
flows = cloudpickle_deserialization_check(flow_file_paths)
File "/opt/prefect/healthcheck.py", line 43, in cloudpickle_deserialization_check
flows.append(cloudpickle.loads(flow_bytes))
ModuleNotFoundError: No module named 'example_dags'
Apologies for not including it earlier - I figured it was downstream of the initial problem. When you say “import inside the container” are you asking if I’ve done anything extra to install needed modules inside the container? If so the answer is no - I’ve assumed that everything imported to define Flow would be automatically included when the Flow is stored.
k

Kevin Kho

05/04/2022, 5:17 PM
Yeah that’s what I mean. The dependencies need to be in the container also. So I think what is happening here is that
import example_dags
is working when you import locally, but inside the container this import can’t resolve. There is a snippet on custom modules here
m

Michelle Brochmann

05/04/2022, 5:17 PM
When I’m not using a subclassed task, I have imports through the same chain of files but I don’t see the same problem - would we expect things to be different in this case?
I’ll take a look at the link, thanks! (It looks promising!)
k

Kevin Kho

05/04/2022, 5:19 PM
Were you during DockerStorage too?
m

Michelle Brochmann

05/04/2022, 5:20 PM
Yes - everything exactly the same as far as I can tell.
Sorry for the code dump but maybe this will be useful - this shows the way I define the two flows. Where I register/run the flow programmatically, I switch between “get_file_io_flow” and “get_serializable_file_io_flow” and see the problem only in the first version. NOTE - functions are defined module
file_io_flows
- I import this in yet a third file, where I set up the storage and register the flow as described above.
from example_dags.file_io_tasks import (
ReadMessageFromFileWithDelayTask,
WriteMessageToFileWithDelayTask,
read_message_from_file_with_delay,
write_message_to_file_with_delay
)
def get_file_io_flow(name: str = _DEFAULT_FILE_IO_FLOW_NAME) -> Tuple[Flow, Task]:
"""Imperative style Flow demonstrating use of subclassed Tasks."""
read_delay = Parameter('read_delay', default=0)
write_delay = Parameter('write_delay', default=0)
message = Parameter('message', default='HELLO')
filepath = Parameter('filepath', default='./test_file.txt')
read_message = ReadMessageFromFileWithDelayTask()
write_message = WriteMessageToFileWithDelayTask()
flow = Flow(name, tasks = [read_message, write_message])
flow.set_dependencies(task=read_message,
upstream_tasks=[write_message, read_delay, filepath],
keyword_tasks={'delay': read_delay,
'filepath': filepath})
flow.set_dependencies(task=write_message,
upstream_tasks=[write_delay, filepath, message],
keyword_tasks={'delay': write_delay,
'filepath': filepath,
'message': message})
return flow, read_message
def get_serializable_file_io_flow(name: str = _DEFAULT_FILE_IO_FLOW_NAME) -> Tuple[Flow, Task]:
"""Functional Flow that uses decorated tasks.
Using this for Docker storage run since the imperative get_file_io_flow resulted in an error
that something was not serializable.
"""
with Flow(name) as flow:
read_delay = Parameter('read_delay', default=0)
write_delay = Parameter('write_delay', default=0)
message = Parameter('message', default='HELLO')
filepath = Parameter('filepath', default='./test_file.txt')
write_message = write_message_to_file_with_delay(message, write_delay, filepath)
read_message = read_message_from_file_with_delay(read_delay, filepath)
flow.set_dependencies(task=read_message, upstream_tasks=[write_message])
return flow, read_message
And here is an example of how I define the subclassed vs decorated flow in `file_io_tasks`:
@task
def write_message_to_file_with_delay(message: str,
delay: int,
filepath: str) ->bool:
"""Writes message to file after delay."""
sleep(delay)
with open(filepath, 'w', encoding='utf-8') as stream:
stream.write(message)
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f'*~*~* wrote message to file after {delay} second(s) *~*~*')
return True
class WriteMessageToFileWithDelayTask(Task):
def run(self, message: str, delay: int, filepath: str) -> bool:
sleep(delay)
with open(filepath, 'w', encoding='utf-8') as stream:
stream.write(message)
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f'*~*~* wrote message to file after {delay} second(s) *~*~*')
return True
k

Kevin Kho

05/04/2022, 5:29 PM
If I have to guess, this is because the class has a separated
init
and
run
. for example,
mytask = MyTask(..)
with Flow(...) as flow:
    mytask()
the first is the
init
and the second is the
run
. The
init
is made during flow registration and the run is deferred so you need the dependency to both register and run. The second one though
@task
def mytask(x):
    return x

with Flow(...) as flow:
    mytask()
doesn’t have the same built time
init
I think the
@task
is serialized along with the Flow but looks like the custom Task class is not
Did you try running the ones defined with a decorator and seeing if it works? I wouldn’t be surprised if it failed at run time.
m

Michelle Brochmann

05/04/2022, 5:31 PM
The ones defined with a decorator run without problems and spit out the result I expect.
k

Kevin Kho

05/04/2022, 5:32 PM
Ah ok I guess it does get serialized completely along with the Flow while the class does not.
m

Michelle Brochmann

05/04/2022, 5:32 PM
I suspect the info you provided will help me find the solution. I’m going to take some time to chew on it and will update after I explore a bit further. Thanks so much for your help! 🙂
k

Kevin Kho

05/04/2022, 5:33 PM
The thing is the class implementation is instantiated and saved during registration time. And then the runtime side needs to de-serialize that instantiation during the same library. Does that make sense?
m

Michelle Brochmann

05/04/2022, 5:57 PM
I think so - is this right? The class-based Flow contains instances of the subclassed tasks, so, when serialized, the Flow will not contain the code that defines the tasks. (So the definitions need to be available, via the module, in the Executor.) The functional Flow contains the decorated tasks, which does slurp up the code used to define the tasks - so when deserialized on the Executor, there is no need to look for the module as the code is right there. (EDIT: I think it wasn’t exactly right - my second stab at it is below. 🙂 )
k

Kevin Kho

05/04/2022, 5:59 PM
That is my belief right now yeah
m

Michelle Brochmann

05/04/2022, 5:59 PM
Awesome. An interesting problem. I will play around with it some more. 🙂
👍 1
For posterity: I think my first interpretation wasn’t exactly right: I stepped through the serialization in the debugger and noticed that the type of the serialized decorated task is
'prefect.tasks.core.function.FunctionTask'
, but the type of the serialized subclassed task is
'example_dags.file_io_tasks.WriteMessageToFileWithDelayTask'
. The first is defined in the executor but the second is not. To deserialize, the type must be defined! So I think it might be more correct to say that both are serialized correctly, but the “shape of the vessel” that they get deserialized to is available in the decorated case but not in the subclassed case.
k

Kevin Kho

05/05/2022, 2:47 AM
Wow nice digging! and well explained!
👍 1