Hi guys, has anyone done testing for mapped flow? ...
# prefect-community
s
Hi guys, has anyone done testing for mapped flow? I'm trying to write a test to verify if a task is available in the flow but I can't seem to access the tasks that are inside the mapped flow
t
Hi Scarlett, would you be able to show us the mapped flow? Or a minimal reproducible example of your flow?
s
Copy code
from prefect import Flow, task, case, apply_map
from prefect.tasks.control_flow import merge

@task
def inc(x):
    return x + 1

@task
def negate(x):
    return -x

@task
def is_even(x):
    return x % 2 == 0

def inc_or_negate(x):
    cond = is_even(x)
    # If x is even, increment it
    with case(cond, True):
        res1 = inc(x)
    # If x is odd, negate it
    with case(cond, False):
        res2 = negate(x)
    return merge(res1, res2)

with Flow("apply-map example") as flow:
    result = apply_map(inc_or_negate, range(4))
So if you have a simple flow like in the example above which I copied from the docs for Mapping, when I tried (below codes), it doesn't seem to work. How do I access the tasks that are part of the mapped flow?
Copy code
assert cond in flow.tasks
t
Just double checking that you are using prefect 1.0 correct?
a
Apply map and mapping in general works a bit differently since it dynamically spins up new child task runs at runtime but those don't exist in the static DAG structure. I think you could do: assert result in flow.tasks But then, it will dynamically generate mapped tasks
What problem are you trying to solve that way? Given that usually with mapping you don't know in advance what child tasks you'll have in the end, it might be beneficial to only test the basic tasks that you map over - e.g. here the is_even task represents business logic which makes sense to write a test for, mapping less so
s
Yes, so I'm trying to make sure the right tasks are added to inc_or_negate(). I don't care about the child tasks but just the fact that cond, res1 and res2 are there and the dependencies between each task are correct.
Basically, I want to check that the function that we use for mapped flow is composed correctly
a
This is what Prefect tests already do as part of the tests available in the Prefect repository - you can check mapping tests there. But I'm not sure how helpful this would be in your use case - this way you are testing Prefect core functionality while unit tests of your flows should rather test your business logic I encourage you to examine tests in our repo and then decide what makes sense to test
s
Let me try to be a bit clearer. I'm not testing the apply_map() function. What I'm trying to do, let's say I have the following tasks:
Copy code
def full_flow(table):
    e = extract(...)
    x = export(...)
    t = transform(...)
    l = load(...)
with Flow("example") as flow:
    list_of_tables = get_tables(...)
    run = apply_map(full_flow, table=list_of_tables)
What I want to test in this flow is that I expect the task e and x to happen before t can happen. I understand that we can set upstream_task(), but I want to make sure the flow composition is as we expect especially when we have more complicated flow than this one. I'm having problems accessing the e, x, t, l tasks in flow.tasks. I can see when I look into flow.tasks, I can see all of the tasks there including
list_of_tables
. However, if I try evaluate the following statements, only the first one return true
Copy code
list_of_tables in flow.tasks
e in flow.tasks
x in flow.tasks
t in flow.tasks
l in flow.tasks
I think the issue is I can't access e, x, t, or l because they are part of a mapped flow even though I can see them in flow.tasks.
a
I believe that your test is replicating Prefect tests, i.e. testing Prefect system rather than your dataflow logic
There is no need to test that dependencies are set correctly, you can trust Prefect to do it. Your tests should only test your workflow logic
and again, mapped child tasks are not included in the DAG because they are generated at runtime
s
Ok, thank you for your clarification
🙌 1