https://prefect.io logo
Title
e

Edmondo Porcu

05/07/2022, 1:51 AM
Quick question about Prefect. I have run a
with Flow(''') as flow:
in the body of the script. However, since there are parameters that set from env variable in the main like so, that was causing an exception
if __name__ == '__main__':
    flow  = build_flow()
    flow.executor = LocalDaskExecutor()
    project_name = os.environ['PROJECT_NAME']
    spark_version = os.environ['SPARK_VERSION']
    github_repo = os.environ['GITHUB_REPO']
    git_ref = os.environ['GIT_REF']
    flow.run(
        project_name=project_name,
        spark_version=spark_version,
        github_repo=github_repo,
        git_ref=git_ref
    )
and now I have wrapped my flow definition in a function. Is that a reasonable thing to do?
:discourse: 1
k

Kevin Kho

05/07/2022, 1:52 AM
Yes it is an a lot of people use this set-up to deploy register one flow multiple times with different parameters/configurations. You can have a flow factory that creates variations on a Flow
e

Edmondo Porcu

05/07/2022, 2:06 AM
unfortunately, surprising, that code fails
Ok I am a bad person. I removed a @task and it broked it all
k

Kevin Kho

05/07/2022, 2:09 AM
Lol no worries. You good now?
e

Edmondo Porcu

05/07/2022, 2:19 AM
Yes. Need to check if the flow is successful, it should be in the docs
Is there any easy way? It seems flow.run returns a collection of states for each task
k

Kevin Kho

05/07/2022, 2:20 AM
Like this ? Check
testing state
e

Edmondo Porcu

05/07/2022, 2:23 AM
Great. • Is there a way to mock the result of a task to test the flow logic? • If I use a task object within a function annotated with task. should I invoke the run method ?
k

Kevin Kho

05/07/2022, 2:24 AM
You could use the run method to test yeah. Would you still need to mock the result if you use that?
e

Edmondo Porcu

05/07/2022, 3:12 AM
I have updated my question
k

Kevin Kho

05/07/2022, 3:19 AM
Yes to the second, but it’s not a task anymore. It’s just normal Python
e

Edmondo Porcu

05/07/2022, 3:21 AM
This is still confusing, how to deal with tasks. Is it changing with Orion ? Is the run automatically invoked for all tasks created within a flow context manager ?
For the first question I am sure you understand there is value in testing complex workflows logic without executing the real tasks
k

Kevin Kho

05/07/2022, 3:35 AM
Yes the run is invoked for all tasks in the Flow context manager
You could see the tasks library tests for some examples of how to mock the output when calling
task.run()
Actually the aws tests might be nicer
In Orion, there is no
task.run()
yet. It might be
task.fn()
e

Edmondo Porcu

05/07/2022, 4:20 PM
This is a test of the tasks, what about the test of the flow?
k

Kevin Kho

05/07/2022, 4:22 PM
The flow doesn’t have a return? What kind of logic are you trying to test?
e

Edmondo Porcu

05/07/2022, 4:26 PM
Take these two flows: • A task returns a number, a second task is executed N times where N = return value of first task • Branching logic
k

Kevin Kho

05/07/2022, 4:27 PM
Ok will ask the team for how to test stuff like that
May be Monday before I get a response
a

Anna Geller

05/07/2022, 4:34 PM
@Edmondo Porcu what behavior exactly would you want to test? ignoring Prefect for a moment, how would you test that just using Python?
e

Edmondo Porcu

05/07/2022, 4:36 PM
def func1()
    perform complex sql query on google big table

def func2(input):
   whatever

def func3(input):
   whatever

def func4(input):
   whatever

def flow():
    res1 = func1()
    res2 = [ func2(res) for res in res1]
    if res2 > 10:
        func3()
    else:
       func4()
a

Anna Geller

05/07/2022, 4:38 PM
so this is a flow graph representing the business logic, what would you want to test? perhaps testing data with a data validation test would be more helpful than a unit test given the big table query?
e

Edmondo Porcu

05/07/2022, 4:38 PM
How do you test this? I can think of two ways: • Dependency injection (making func1,func2,func3,func4) input arguments of the flow function so you can pass mock implementation • using MagicMock to override local module members
a

Anna Geller

05/07/2022, 4:38 PM
I want to find out what would you want to test here before we try to figure out how to test it
e

Edmondo Porcu

05/07/2022, 4:39 PM
The workflow logic: loops and conditionals. Probably also the DAG itself
a

Anna Geller

05/07/2022, 4:42 PM
I see, I think testing data pipelines is tricky in general because it requires testing both: • code (business logic) • data so it may be actually more helpful to test tasks themselves to test the logic e.g. when you do some data transformations, test it with some tiny data to ensure that your transformation functions/tasks do what you intended them to do and then testing the data itself with something like Pandera, GE or sth similar
for the workflow logic, unit testing seems a bit less helpful? because building a unit test for each flow is a kind of negative engineering itself that Prefect tries to eliminate - you use Prefect so that you don't need to test every single DAG, because Prefect will ensure that state transition happens as you intended without you having to worry about it and write unit tests to ensure that a workflow transitions between states the way you designed it
for Prefect building blocks such as tasks or conditional logic, Prefect has already extensive suite of unit tests so that you can rely on those, or use those as examples if you build custom components extending those abstractions - you can check the tests in the prefect repo, as Kevin recommended, but it's definitely worth looking at what would be your end goal with those tests
for testing conditional logic, if you choose to do it, you could do sth as in the tests Kevin referenced:
@pytest.mark.parametrize("branch", ["a", "b", "c"])
    def test_case_execution(self, branch):
        with Flow("test") as flow:
            cond = identity(branch)
            with case(cond, "a"):
                a = identity(1)
                b = inc(a)

            with case(cond, "b"):
                c = identity(3)
                d = inc(c)

            e = merge(b, d)

        state = flow.run()

        if branch == "a":
            assert state.result[a].result == 1
            assert state.result[b].result == 2
            assert state.result[c].is_skipped()
            assert state.result[d].is_skipped()
            assert state.result[e].result == 2
        elif branch == "b":
            assert state.result[a].is_skipped()
            assert state.result[b].is_skipped()
            assert state.result[c].result == 3
            assert state.result[d].result == 4
            assert state.result[e].result == 4
        elif branch == "c":
            for t in [a, b, c, d, e]:
                assert state.result[t].is_skipped()
e

Edmondo Porcu

05/07/2022, 5:04 PM
I think the idea that you need to run a DAG that can takes 4 hours to check if you are combining tasks correctly scares me
a

Anna Geller

05/07/2022, 5:05 PM
to check if you are combining tasks correctly, you could do:
flow.visualize()
I think this is not a problem related to Prefect but to data engineering workflows in general - it's hard because it's more than just testing whether your code works, you need to test whether the business logic is correct and whether your data is correct, and the data volume these days doesn't make it any easier
👍 1
e

Edmondo Porcu

05/07/2022, 5:09 PM
that's why being able to mock the tasks is critical
a

Anna Geller

05/07/2022, 5:10 PM
I 100% agree about testing the tasks - but testing the DAG is negative engineering IMO
e

Edmondo Porcu

05/07/2022, 5:18 PM
Why? Testing something that can be wrong is negative engineering?
If you are using the DAG to combine tasks that helps you predict diseases and will trigger billion of dollars of investments, you'd rather want to be reassured that the DAG to have the right logic :D
a

Anna Geller

05/07/2022, 5:36 PM
I definitely understand your sentiment and know what you mean - the only thing is that you are using Prefect so that you don't have to think about testing your DAG. As long as you: • test your business logic/your tasks • define the order of execution through state and data dependencies Prefect will ensure that the tasks in your flow will run in the right order and that if something goes wrong in between, your flow will fail. And you can define what to do when any task in the flow fails, and when to consider the flow run as successful. So if you use Prefect, you only need to test your tasks and define the orchestration logic in your flow and Prefect will ensure that it works the way you designed it. Of course, nobody will probably complain about having more tests (?) so you can do that, but you need to decide whether you want to do it. I wouldn't - I would much rather spend this time building data validation tests, this would give me more confidence in my data than testing DAGs and branching logic
z

Zach Schumacher

05/07/2022, 7:59 PM
Testing a flow is an integration test, not a unit test. If you're trying to test that composing a flow works as you expect, you are just reimplementing tests the framework (prefect) you're using already tests
💯 2
a

Anna Geller

05/07/2022, 11:21 PM
thanks Zach, I guess I had the same intuition as you but you put it really well in words 😄 it's indeed an integration test!
z

Zach Schumacher

05/08/2022, 2:16 PM
when editing/adding flows, we usually ask the engineer to put the results of
flow.visualize
in the PR
a

Anna Geller

05/08/2022, 2:27 PM
that's a good idea to confirm that the data flow is built the right way 👍
e

Edmondo Porcu

05/08/2022, 8:04 PM
@Zach Schumacher isn't a human reviewing the result of flow.visualize is a manual human test of the flow logic?
I am just saying that the review should be expressed in code: • are task dependencies the one we expect? • are conditional leading to the right code path? • are loops exercised correctly? This can all be unit tested, or if you are very strict about the meaning of unit, this can tested in isolation, if you mock the tasks. If you have real tasks, it's an integration test.
The argument that the flow logic should not be tested because it's tested by the unit tests of the framework it's equivalent to say that one cannot use a correct framework in an incorrect manner. It's like saying that you cannot write a flow that's wrong with respect to requirements, because the framework is right. I disagree: • The flow can have the wrong flow logic, and it is desirable to verify it in isolation. Because it's fast and quick. • This does not replace integration testing, but it provides a "fast and cheap" partial verification of correctness, which is valuable because it's fast and cheap. As we adopted infrastructure as code, the need to test infrastructure as code emerged. https://docs.microsoft.com/en-us/azure/developer/terraform/best-practices-testing-overview
@Chris White what do you think?
z

Zach Schumacher

05/08/2022, 8:14 PM
Definitely not equivalent and pointing to terraform is not a good example at all. A flow is just glue.
:upvote: 1
e

Edmondo Porcu

05/08/2022, 8:14 PM
which cannot be wrongly glueing tasks?
anything that's non trivial can be wrong
a

Alvaro Durán Tovar

05/08/2022, 8:17 PM
sure, it depends on the use case / context
:upvote: 1
you speaking about integration tests because that's what the flows does basically
I'm not managing billions so I'm happy with not bothering with using a staging bigquery database to ensure my flow is writing the correct content to the correct table etc Although I can test intermediate tasks (unit test) to make me feel safe
e

Edmondo Porcu

05/08/2022, 8:18 PM
Why do you need to run the "real task" to verify loops, conditional and task dependencies?
a

Alvaro Durán Tovar

05/08/2022, 8:20 PM
probably there are ways to run the whole flow with mocks, but then you aren't testing the final flow, testing the flow (or any other integration test) isn't always an option or the complexity is just too high
e

Edmondo Porcu

05/08/2022, 8:20 PM
@Alvaro Durán Tovar you are right, they are two complementary techniques.
a

Anna Geller

05/08/2022, 8:23 PM
@Edmondo Porcu why did you tag Prefect's CTO? Chris is not responsible for answering questions in the community. If you have some emergency situation, you can tag me I'm not sure if you've seen our Code of Conduct, but we have a rule that you should not tag other users - we always respond even if it takes some time. If you need SLA-based support, you may contact our Professional Services
:upvote: 1
e

Edmondo Porcu

05/08/2022, 8:23 PM
Thank you @Anna Geller, I didn't know that. Apologies
👍 1
a

Anna Geller

05/08/2022, 8:23 PM
no worries, just for the next time
e

Edmondo Porcu

05/08/2022, 8:24 PM
@Alvaro Durán Tovar we have tasks that can run for hours or days, so running the real task is not an option, if you see what I mean. You basically have code that builds a DAG, and then the DAG is going to be executed. You want to test separately: • does the DAG looks like what is supposed to ? unit test • does the DAG execution produce the right result? integration test The second one is much more powerful, expensive, and complicated. You might have a single integration environment, or executing certain tasks cannot always be possible.
def build_flow(p1,p2,p3): Flow
def test_flow_1():
flow = build_flow(2,3,4)
assert flow.n_tasks = 72
By the way, our average task cost 15k$ as it is a multi-thousands of hours big data job. It takes anywhere from 2 days to a week of computing even on large scale HPC. I cannot spend 15k$ every time I change a line of code in the flow, I hope you understand that 😄
a

Alvaro Durán Tovar

05/08/2022, 8:25 PM
I don't know what's the deal here, I just like random flame wars lol can't you set an env var, say ENVIRONMENT=test then modify the flow execution to mock certain parts?
c

Cole Murray

05/08/2022, 8:26 PM
@Edmondo Porcu You can provide test data and validate your flow is running correctly. In our case, we have very long-running jobs which we provide a test set of data to process as part of our integration tests. Through our CI/CD pipeline, any new flows get automatically tested against test data, assuming pass then promoted into production
💯 1
e

Edmondo Porcu

05/08/2022, 8:26 PM
@Alvaro Durán Tovar that's right! That's exactly what I think should be done.
@Cole Murray do you use
unittest.MagicMock
to replace real tasks with mocked tasks?\
a

Alvaro Durán Tovar

05/08/2022, 8:27 PM
what you should do you mean? @Edmondo Porcu
c

Cole Murray

05/08/2022, 8:27 PM
No i mean:
@task
def myDataDownloadTask(data_url):
  ...
and passing in parameterized data url that is different when running tests
a

Alvaro Durán Tovar

05/08/2022, 8:30 PM
Here you have a random example with mocks, testing tasks, etc, pretty sure prefect has many more better than this on the github repo Nothing stops you from calling
flow.run()
to run the whole flow But then the intermediate code probably will get pretty complex
e

Edmondo Porcu

05/08/2022, 8:31 PM
@Alvaro Durán Tovar that was what I was looking for. Mocking the tasks to run the flow
Using monkey patch. That’s what I was expecting as well! Somehow the discussion here led to “why would you ever do something like this?”
c

Cole Murray

05/08/2022, 8:35 PM
@Edmondo Porcu, given that the cost of failure is $15k per run, I personally would not stake correctness on monkey patching & mocked tasks. For me, running the real flow in a staging environment with a sample data will provide you near 1-1 (aside from scaling constraints on memory/cpu) of your actual task flow. As an example of risk of monkey patching / mocking:
@task
def my_missing_dependency(a):
  import pandas as pd
  df = pd.read_csv(a)
  ....
Your flow linkage may be correct, your unit tests on the tasks may be correct (depending on your test env, this may pass) but this will still fail at runtime and take the $15k with it
☝️ 2
e

Edmondo Porcu

05/09/2022, 1:27 AM
@Cole Murray I don't see unit testing and integration testing as alternatives, I think they are both needed
a

Alvaro Durán Tovar

05/09/2022, 7:19 AM
When possible I like to cover most of the code with few integration tests, then the more complex things with unit tests Anyway, we are doing something like this you can try
def flow_name(name: Text) -> Text:
    return f"{name}-{get_environment()}"

with Flow(flow_name(FLOW)) as flow:
    flow code...
That is flow name is changed to reflect it is configured for a specific environment (prod, qa, etc). Then configure flow appropriately (qa DB, qa buckets, etc) having a less data to crunch when executing the flow as mentioned by @Cole Murray Then trigger the flow and make assertions after finishes