Question on testing and referring to task results....
# prefect-community
j
Question on testing and referring to task results. This example is contrived but maybe useful for discussion. Simplified class that defines a flow:
Copy code
@task
def times_two(x):
    return x * 2

@task
def add(items):
    return sum(items)

class SimpleFlow(SRMPrefectFlow):
    @property
    def flow(self) -> Flow:
        with Flow("SimpleFlow", environment=env) as flow:
            x = Parameter("x", default=[1, 2, 3])

            times_two_task_result = times_two.map(x)
            flow_result = add(times_two_task_result)
        return flow
And a simple test to run the flow & check the last task's result:
Copy code
def test_flow_run_result():
    flow = SimpleFlow().flow
    fr = flow.run()
    assert list(fr.result.values())[2].result == 12
The
list(fr.result.values())[2].result
works, but is fragile. We'd rather
fr.result[flow_result].result
but
flow_result
isn't available outside of the function that defines the flow. Is there a better approach that people have used?
For more context, we use
SRMPrefectFlow
to handle common CICD aspects for flows, e.g.
save()
,
register()
, etc. (We want all Flows to conform to this so that we can add them to docker images and register them with Cloud in the same way.)
j
That’s an interesting question @Joe Schmid, and I see where you’re coming from. There are a couple ways you could grab
flow_result
according to a convention (you may know these)
Depending on how strict the structure of your flows are, if they have one terminal task you could use
flow_result = list(flow.terminal_tasks())[0]
If you have multiple terminal tasks, you could define a single one of them as the flow’s
reference_task
(if appropriate for your workflow), and use
flow.reference_tasks()
instead
Alternatively, if you have a convention of naming that task, you could use
flow.get_task(name='key task name')
to retrieve it
None of these suggestions are as concise as having
flow_result
available in local scope, but perhaps they let you recreate it programmatically, or by convention?
You could also assign
flow.flow_result = flow_result
inside the function, so you could access it easily on the returned flow object but that feels a bit hackier
j
Thanks @Jeremiah
flow.get_task(name='key task name')
should work well or I like
flow.flow_result = flow_result
also. Much appreciated!
j
👌 just note that since names are not guaranteed unique,
get_task(name=...)
will return a list, so as long as you choose a good name, you’ll know what the one item is (which is why this depends slightly on adopting a convention)
j
Makes sense. I suspect for most reasonably complex Flows we won't typically be testing them as a complete run (instead we'd unit test portions) but for simple flows this might apply. It's also nice that we have lots of example tests to refer to as part of Prefect itself.
j
If you come up with a good idiom here let us know 🙂
j
Will do!
i
Tags? I use them to get a particular task result of Flow when I stitch two three Flows together for organizational purposes.
Copy code
res = my_special_task.get_tasks(tags=["special_tag"])[0]
df = flow_state.result[res].result
    pprint(f"Number of Definitions fron secondary sources: {df.shape}")
👍 1