a

    Abhas P

    1 year ago
    Hi ! Background : I was beginning to write a few basic tests for my flows which basically extract data from db, transform and load data back into DB.Intention : I want to test my custom flow in term of : composition, serialization and tasks in terms of : high level structuring, input/output, mapping. Is there another place or point of reference for unit testing(think of CI) real time custom flows apart from link in this page (this places essentially points out how prefect unit tests its own platform if I am not wrong).
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey @Abhas P, I don’t think there are other references for unit testing. The tasks in the task library have some unit tests that show how to mock connections to databases. That may be a good place to start? Some users spin up server for local testing before moving to cloud. The snowflake tests snow how to monkeypatch
    Maybe you can also check this
    a

    Abhas P

    1 year ago
    Thanks for the resources. Can you guide me as to how to mock a task in a flow? Suppose its a mongo connect task like :
    @task
    def mongo_connect():    
        db = get_db() // wrapper on top of pymongoclient to connect to the specified db
        collection = get_collection(db). // wrapper on top of pymongoclient to get a specified collection form the db
        docs = collection.find()
        return docs
    
    @task
    def transform(docs):
        //performs some data manipulation
    
    with Flow("flow1") as flow:
        docs = mongo_connect()
        result = transform(docs)
    I want to mock this task to return a certain set of documents, so I can test my entire flow run and assert the expected output.
    Ben Muller

    Ben Muller

    1 year ago
    Hey @Abhas P - @Kevin Kho asked me to weigh in here as I faced a similar issue. The way I got around it was to have a separate file with all of my tasks. eg
    tasks.py
    and another file
    functions.py.
    In your example you could have
    #functions.py
    
    def mongo_connect():    
        db = get_db() // wrapper on top of pymongoclient to connect to the specified db
        collection = get_collection(db). // wrapper on top of pymongoclient to get a specified collection form the db
        docs = collection.find()
        return docs
    
    #tasks.py
    
    from prefect import task
    from functions import mongo_connect
    
    @task
    def mongo_connect_task():
       return mongo_connect()
    now when you do a mock in your tests you can simply do something like
    @mock.patch("tasks.mongo_connect")
    and this will mock the functionality of the task
    let me know if that makes sense for you
    a

    Abhas P

    1 year ago
    Hey @Ben Muller thank you for this. Can I not keep the flow code intact and directly mock the mongo_connect task , instead of moving the constituent functions out ? Just curious as to why it won't work as expected.
    Ben Muller

    Ben Muller

    1 year ago
    you may be able to, but I was not able to. Something to do with the decorator running the code before the mocking was able to take place. If you are able to crack it I am all ears!
    I think the main key is that your tasks would need to be in a different file to the flow
    that is how it can be done
    a

    Abhas P

    1 year ago
    Sure thing, let me try that 🙂
    Mary Clair Thompson

    Mary Clair Thompson

    11 months ago
    I had a similar problem to Abhas, and tried this solution (essentially factoring out bodies of tasks into a separate functions file for easier testing). However I'm running into an error that's due to having this extra file in the picture: Failed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n ModuleNotFoundError("No module named 'functions'")\nThis may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.')
    Did you all find a way around this issue?