HI! I’m trying to convert existing ETL pipeline in...
# ask-community
j
HI! I’m trying to convert existing ETL pipeline into prefect so I’m using prefect cloud as a server and using one of my ec2 server as an agent. On the ec2 agent I registered the existing job as flow and task and ran flow.register() to register on my server on the etl task, I do have a custom class that I import from local path and it seems like when i run the task from prefect ui, it can’t refer from the local python class. Following is the error message
Copy code
Failed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n  ModuleNotFoundError("No module named \'common\'",)\nThis may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.',)
Again thank you so much!
k
Hey @jake lee, hope you got the UI working? Most of the storage classes only keep the flow code and not the other files because of the way
cloudpickle
works. You need to package your dependencies into a Docker container and use Docker Storage so that the agent can pull the container with the dependencies. If you are using Local agent and LocalRun, there are two workarounds. First, is you can start the agent in the directory where the flow is so it has access to those files when it runs the flow. Second is that
LocalRun
takes in a
working_dir
where you can specify where the agent will run the flow from.
🙌 1
j
Yeah for the UI, we decided to try with the cloud first, and thank you~! will take a look and try and see what i how i can resolve the dependency issue!
so I have following structure • common ◦ Util.py • job ◦ test.py and in test.py I am trying to call class from Util so original ETL code was importing it as
Copy code
import sys
sys.path.append("..")
from common.Util import Util
so for the job I have set my directory as
Copy code
flow.run_config = LocalRun(working_dir="/job")
but the task can’t figure out the common.Util, I’m sure this is a pretty basic question but please help a brother out! thank you so much!
k
Of course! Happy to help! First of all, when you want to package dependencies of your Flow like this, we normally recommend packaging it up in a Docker container. This setup seems like it should work though since you added the
sys.path.append()
. It might be your working directory. Is that an absolute path? Also, you can try running your agent in the
job
directory and that might work. I think it’s just a matter of getting that path right.
🙌 1
j
forgot to say thank you! I did figure out how to configure local env with your help! thanks alot Kevin!
k
Of course! No problem!
🙌 1