https://prefect.io logo
a

Abhas P

09/22/2021, 1:10 AM
Hi ! Background : I was beginning to write a few basic tests for my flows which basically extract data from db, transform and load data back into DB. Intention : I want to test my custom flow in term of : composition, serialization and tasks in terms of : high level structuring, input/output, mapping. Is there another place or point of reference for unit testing(think of CI) real time custom flows apart from link in this page (this places essentially points out how prefect unit tests its own platform if I am not wrong).
k

Kevin Kho

09/22/2021, 2:32 PM
Hey @Abhas P, I don’t think there are other references for unit testing. The tasks in the task library have some unit tests that show how to mock connections to databases. That may be a good place to start? Some users spin up server for local testing before moving to cloud. The snowflake tests snow how to monkeypatch
Maybe you can also check this
a

Abhas P

09/22/2021, 9:32 PM
Thanks for the resources. Can you guide me as to how to mock a task in a flow? Suppose its a mongo connect task like :
Copy code
@task
def mongo_connect():    
    db = get_db() // wrapper on top of pymongoclient to connect to the specified db
    collection = get_collection(db). // wrapper on top of pymongoclient to get a specified collection form the db
    docs = collection.find()
    return docs

@task
def transform(docs):
    //performs some data manipulation

with Flow("flow1") as flow:
    docs = mongo_connect()
    result = transform(docs)
I want to mock this task to return a certain set of documents, so I can test my entire flow run and assert the expected output.
b

Ben Muller

09/22/2021, 10:37 PM
Hey @Abhas P - @Kevin Kho asked me to weigh in here as I faced a similar issue. The way I got around it was to have a separate file with all of my tasks. eg
tasks.py
and another file
functions.py.
In your example you could have
Copy code
#functions.py

def mongo_connect():    
    db = get_db() // wrapper on top of pymongoclient to connect to the specified db
    collection = get_collection(db). // wrapper on top of pymongoclient to get a specified collection form the db
    docs = collection.find()
    return docs

#tasks.py

from prefect import task
from functions import mongo_connect

@task
def mongo_connect_task():
   return mongo_connect()
now when you do a mock in your tests you can simply do something like
@mock.patch("tasks.mongo_connect")
and this will mock the functionality of the task
let me know if that makes sense for you
a

Abhas P

09/22/2021, 10:42 PM
Hey @Ben Muller thank you for this. Can I not keep the flow code intact and directly mock the mongo_connect task , instead of moving the constituent functions out ? Just curious as to why it won't work as expected.
b

Ben Muller

09/22/2021, 10:44 PM
you may be able to, but I was not able to. Something to do with the decorator running the code before the mocking was able to take place. If you are able to crack it I am all ears!
I think the main key is that your tasks would need to be in a different file to the flow
that is how it can be done
a

Abhas P

09/22/2021, 10:48 PM
Sure thing, let me try that 🙂
🙌 1
m

Mary Clair Thompson

10/06/2021, 12:42 PM
I had a similar problem to Abhas, and tried this solution (essentially factoring out bodies of tasks into a separate functions file for easier testing). However I'm running into an error that's due to having this extra file in the picture: Failed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n ModuleNotFoundError("No module named \'functions\'")\nThis may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.')
Did you all find a way around this issue?