Hi everyone! I’m following the steps outlined in t...
# ask-community
f
Hi everyone! I’m following the steps outlined in the docs to test a flow’s composition: https://docs.prefect.io/core/idioms/testing-flows.html#testing-flow-composition I’ve noticed that
flow.tasks
returns not just tasks but parameters as well! How would you go about checking those? I’ve tried the following without success:
Copy code
from prefect import Parameter
self.assertTrue(Parameter('parameter_name') in flow.tasks)
# returns False
e
Hi Fina! You are creating a new
Parameter
object in your
assertTrue
statement, and trying to compare to tasks that are already created and set-up in your flow.
Edge
objects seem to support this type of comparison, but
Task
objects do not.
Task
s, and therefore
Parameter
s compare by their python object ids. Either use the same
Parameter
object that you used to setup your flow, or search for a specific
Parameter
by comparing task names, rather than entire task objects.
upvote 1
f
Hi @emre! That sounds reasonable but the curious thing is that it works perfectly fine for tasks, e.g.:
Copy code
from our_tasks.task_1 import task_1
self.assertTrue(task_1 in flow.tasks)
# returns True
It doesn’t seem to need the reference to the specific Task instance, just any Task instance with the right name. 🤔 That’s what made me think it should work the same for Parameters! How do you/your team test the flow composition?
e
I can't reproduce this, maybe I'm missing something:
Copy code
@task
def add_1(a):
    return a + 1
with Flow("assert") as f2:
    addo = add_1(2)
if __name__ == "__main__":
    print(f2.tasks)  # {<Task: add_1>}
    print(add_1 in f2.tasks)  # False
    print(addo in f2.tasks)   # True
add_1
, which is initialized but not called yet, doesn't seem to be in the flow.
addo
is registered to the flow. The same logic goes for
Parameter
as well, as far as I tried.
upvote 1
Could you, by any chance, be initializing AND calling
task_1
in your module?
🤔 1
a
@Fina Silva-Santisteban not sure if this is exactly what you try to accomplish, but here is a small example how you could test both parameter and non parameter task:
Copy code
from prefect import task, Flow, Parameter


@task(log_stdout=True)
def hello_world(param: str):
    result = f"Hello {param}!"
    print(result)
    return result


with Flow("flow-test") as flow:
    param_task = Parameter("param", default="World")
    hello_task = hello_world(param_task)


if __name__ == "__main__":
    assert param_task in flow.tasks
    assert hello_task in flow.tasks
f
@emre @Anna Geller (old account) I’m using the imperative prefect api to add tasks to flows, so e.g.:
Copy code
flow = Flow(name='flow name')
flow.set_dependencies(
            upstream_tasks=[task_1],
            task=task_2,
            keyword_tasks=dict(
                param_1=Parameter('some_param')
        )
The following tests works out fine:
Copy code
self.assertTrue(task_1 in flow.tasks) #returns True
self.assertTrue(task_2 in flow.tasks) #returns True
When I do a print it shows me all tasks and parameters:
Copy code
print(flows.tasks) #{<Task task1>, <Task task2>, <Parameter some_param>}
What doesn’t work is this, even though it is in the set:
Copy code
self.assertTrue(Parameter('some_param') in flow.tasks) #returns False
I’m not sure what prefect does under the hood when it uses the imperative api, but it seems to be different than whatever happens when using the functional api?
a
when you assign the Parameter() task to a variable, it creates a copy of a task. So when you create your test, you should reference this variable, e.g.:
Copy code
param_task = Parameter("param", default="World")
...
self.assertTrue(param_task in flow.tasks)
f
@Anna Geller (old account) can you pls try out your code snippet and let me know if that works for you? (Pls remember I’m using the imperative api)
a
Can you perhaps show your entire test that calls self.assertTrue? it's possible that all you need to change is:
Copy code
# instead:
self.assertTrue(Parameter('some_param') in flow.tasks)
# try the one from your keyword_tasks:
self.assertTrue(param_1 in flow.tasks)
This was my attempt using imperative API for that:
Copy code
from prefect import Task, Flow, Parameter


class HelloTask(Task):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def run(self, param: str):
        result = f"Hello {param}!"
        print(result)
        return result


hello_world = HelloTask()


with Flow("flow-test") as flow:
    param_task = Parameter("param", default="World")
    hello_task = hello_world(param_task)


if __name__ == "__main__":
    assert param_task in flow.tasks
    assert hello_task in flow.tasks
e
@Fina Silva-Santisteban I did a little digging in the source code. Imperative api does not make copies of the task instances, unlike functional api. It adds the exact same task instance to the flow. In your example, your
task_1
and
task_2
have to be initialized
Task
instances, they can't be classes that implement task. This needs to be applied to
Parameter
as well. Essentially what Anna said. store your initialized
Parameter
instance in a var, and check against it. Because that is what you are doing with your
task_1
inside your module as well. It has to be an initialized instance, it is just initialized inside your module. Otherwise prefect would throw a fit.
upvote 1
f
@Anna Geller (old account) in your example you’re still using the functional api! Have a look at the docs to see the difference between functional and imperative api: https://docs.prefect.io/core/concepts/flows.html#imperative-api @emre In my example I’m using the task decorator to create tasks, they’re not classes that implement Task. Using the imperative api I have no need to store parameters in variables, I simply pass them as arguments to whatever task needs it through the
keyword_tasks
argument as shown in my snippet above.^ I feel like we where closer to the core issue at beginning of the thread with @emre’s first reply and somehow the fact that I’m using the imperative api became a red herring. At the core of the matter is: a set comparison,
flow.tasks
returns a Set containing Task and Parameter instances. As Emre said the set comparison needs the exact Parameter instance to return True otherwise it returns False. My comparisons with the Task instances return True it seems because I’m importing the Task instance I’ve added to the flow, not just any old Task instance with the right name. Is there a way to import the Parameter instance so that I can do a set comparison with that? Or do I have to do a string comparison for Parameters? Here’s the whole setup, I hope this makes it easier to follow: File `tasks.py`:
Copy code
from prefect import Task

@task
def do_a(param_a):
....

@task
def do_b(a_result):
....
File `flow_a.py`:
Copy code
from prefect import Flow, Parameter
from tasks.py import do_a, do_b

flow = Flow('flow a')
flow.set_dependencies(
            upstream_tasks=[Parameter('param_a')],
            task=do_a,
            keyword_tasks=dict(
                param_a=Parameter('param_a')
        )
flow.set_dependencies(
            upstream_tasks=[do_a],
            task=do_b,
            keyword_tasks=dict(
                a_result=do_a
        )
File
test.py
:
Copy code
import unittest
from flow_a.py import flow
from tasks.py import do_a, do_b

print(flow.tasks) # returns {<Parameter some_param>, <Task do_a>, <Task do_b>}

self.assertTrue(do_a in flow.tasks) #returns True, I seem to have imported the same task instance then?
self.assertTrue(do_b in flow.tasks) # returns True
self.assertTrue(Parameter('some_param') in flow.tasks) # Returns false since it's definitely not the same instance.
👍 1
k
If the question is just how to get this to work, it would be (with some slight modification to your code). Thoughts to follow. finastasks.py
Copy code
from prefect import task

@task
def do_a(param_a):
    return param_a + 1

@task
def do_b(a_result):
    return 2
flow_a.py
Copy code
from prefect import Flow, Parameter
from prefect.core import parameter
from finastasks import do_a, do_b

flow = Flow('flow a')
param_a = Parameter("param_a")
flow.set_dependencies(
            upstream_tasks=[param_a],
            task=do_a,
            keyword_tasks=dict(
                param_a=param_a
        )
)
flow.set_dependencies(
            upstream_tasks=[do_a],
            task=do_b,
            keyword_tasks=dict(
                a_result=do_a
        )
)
finastest.py
Copy code
from prefect.core.task import Parameter
from flow_a import flow, param_a
from finastasks import do_a, do_b

print(flow.tasks) # returns {<Parameter some_param>, <Task do_a>, <Task do_b>}

assert do_a in flow.tasks
assert do_b in flow.tasks
assert param_a in flow.tasks
I think the question here is why you don’t need to create references to
do_a
and
do_b
, but you need to do it for
param_a
. I’ve been looking into it a bit. I have a clue, but not a concrete answer. But at the very least, I do have a consistent mental model I think. First,
do_a
and
do_b
are actually classes. Wrapping them with the
@task
decorator return a
FunctionTask
class which inherits
Task
. Check the following behavior for
Task
Copy code
from prefect import Task 
class RunMeFirst(Task):
    def run(self):
        print("I'm running first!")

assert RunMeFirst == RunMeFirst
assert RunMeFirst().run() == RunMeFirst().run()
assert RunMeFirst() == RunMeFirst()
The first assert will work because you are comparing the Classes. The second assert will work because you are comparing run outputs, but the third assert with fail. When you do
RunMeFirst()
, we call the
___init___
method, so something after the init causes it to fail. So back to Parameters, it’s because their definition inherent calls the
___init___
method.
Parameter("test")
already instantiates it because you can’t even do:
Copy code
assert Parameter("test") == Parameter("test")
this will fail. This is why you need to pass by reference for the Parameter. What is it in Task init that breaks these? I don’t know yet. Maybe @emre does haha.
🌟 1
💯 1
Will ask the team what is it that breaks the equality for tasks
e
@Kevin Kho
assert RunMeFirst() == RunMeFirst()
is intended to fail, that is how python compares class instances by default. This creates two different instances and ask if they are the same instance.
@task
version is essentially
task_inst = RunMeFirst()
and then
assert task_inst == task_inst
. I think it makes sense that
assert RunMeFirst("task_name") == RunMeFirst("task_name")
fails. You can multiple tasks that function the same thing in a single flow, and have them do different things depending on different runtime inputs, and even with different upstream and downstream dependencies.
Parameter
currently follows the same logic. Maybe it should default to name based checks, because for the case of
Parameter
, the name defines its job. Maybe
Flow
could have a convenience function
has_param(name: str)
to ease testing.
upvote 1
f
@Kevin Kho ohh ok, so the only workaround for importing the Parameter is saving it in a variable (even though I don’t need to do that for my flow to work). That worked out fine, thanks so much! +1 on @emre’s suggestion of a convenience function 
has_param(name: str)
! Thank so much for your time and help everyone! 🙏
👍 1