Fina Silva-Santisteban
10/11/2021, 1:48 AMflow.tasks
returns not just tasks but parameters as well! How would you go about checking those? I’ve tried the following without success:
from prefect import Parameter
self.assertTrue(Parameter('parameter_name') in flow.tasks)
# returns False
emre
10/11/2021, 7:19 AMParameter
object in your assertTrue
statement, and trying to compare to tasks that are already created and set-up in your flow. Edge
objects seem to support this type of comparison, but Task
objects do not. Task
s, and therefore Parameter
s compare by their python object ids.
Either use the same Parameter
object that you used to setup your flow, or search for a specific Parameter
by comparing task names, rather than entire task objects.Fina Silva-Santisteban
10/11/2021, 12:38 PMfrom our_tasks.task_1 import task_1
self.assertTrue(task_1 in flow.tasks)
# returns True
It doesn’t seem to need the reference to the specific Task instance, just any Task instance with the right name. 🤔 That’s what made me think it should work the same for Parameters! How do you/your team test the flow composition?emre
10/11/2021, 1:31 PM@task
def add_1(a):
return a + 1
with Flow("assert") as f2:
addo = add_1(2)
if __name__ == "__main__":
print(f2.tasks) # {<Task: add_1>}
print(add_1 in f2.tasks) # False
print(addo in f2.tasks) # True
add_1
, which is initialized but not called yet, doesn't seem to be in the flow. addo
is registered to the flow. The same logic goes for Parameter
as well, as far as I tried.emre
10/11/2021, 1:31 PMtask_1
in your module?Anna Geller
from prefect import task, Flow, Parameter
@task(log_stdout=True)
def hello_world(param: str):
result = f"Hello {param}!"
print(result)
return result
with Flow("flow-test") as flow:
param_task = Parameter("param", default="World")
hello_task = hello_world(param_task)
if __name__ == "__main__":
assert param_task in flow.tasks
assert hello_task in flow.tasks
Fina Silva-Santisteban
10/11/2021, 1:40 PMflow = Flow(name='flow name')
flow.set_dependencies(
upstream_tasks=[task_1],
task=task_2,
keyword_tasks=dict(
param_1=Parameter('some_param')
)
The following tests works out fine:
self.assertTrue(task_1 in flow.tasks) #returns True
self.assertTrue(task_2 in flow.tasks) #returns True
When I do a print it shows me all tasks and parameters:
print(flows.tasks) #{<Task task1>, <Task task2>, <Parameter some_param>}
What doesn’t work is this, even though it is in the set:
self.assertTrue(Parameter('some_param') in flow.tasks) #returns False
I’m not sure what prefect does under the hood when it uses the imperative api, but it seems to be different than whatever happens when using the functional api?Anna Geller
param_task = Parameter("param", default="World")
...
self.assertTrue(param_task in flow.tasks)
Fina Silva-Santisteban
10/11/2021, 1:52 PMAnna Geller
# instead:
self.assertTrue(Parameter('some_param') in flow.tasks)
# try the one from your keyword_tasks:
self.assertTrue(param_1 in flow.tasks)
This was my attempt using imperative API for that:
from prefect import Task, Flow, Parameter
class HelloTask(Task):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def run(self, param: str):
result = f"Hello {param}!"
print(result)
return result
hello_world = HelloTask()
with Flow("flow-test") as flow:
param_task = Parameter("param", default="World")
hello_task = hello_world(param_task)
if __name__ == "__main__":
assert param_task in flow.tasks
assert hello_task in flow.tasks
emre
10/11/2021, 2:16 PMtask_1
and task_2
have to be initialized Task
instances, they can't be classes that implement task.
This needs to be applied to Parameter
as well. Essentially what Anna said. store your initialized Parameter
instance in a var, and check against it. Because that is what you are doing with your task_1
inside your module as well. It has to be an initialized instance, it is just initialized inside your module. Otherwise prefect would throw a fit.Fina Silva-Santisteban
10/12/2021, 12:05 AMkeyword_tasks
argument as shown in my snippet above.^
I feel like we where closer to the core issue at beginning of the thread with @emre’s first reply and somehow the fact that I’m using the imperative api became a red herring. At the core of the matter is: a set comparison, flow.tasks
returns a Set containing Task and Parameter instances. As Emre said the set comparison needs the exact Parameter instance to return True otherwise it returns False. My comparisons with the Task instances return True it seems because I’m importing the Task instance I’ve added to the flow, not just any old Task instance with the right name. Is there a way to import the Parameter instance so that I can do a set comparison with that? Or do I have to do a string comparison for Parameters? Here’s the whole setup, I hope this makes it easier to follow:
File `tasks.py`:
from prefect import Task
@task
def do_a(param_a):
....
@task
def do_b(a_result):
....
File `flow_a.py`:
from prefect import Flow, Parameter
from tasks.py import do_a, do_b
flow = Flow('flow a')
flow.set_dependencies(
upstream_tasks=[Parameter('param_a')],
task=do_a,
keyword_tasks=dict(
param_a=Parameter('param_a')
)
flow.set_dependencies(
upstream_tasks=[do_a],
task=do_b,
keyword_tasks=dict(
a_result=do_a
)
File test.py
:
import unittest
from flow_a.py import flow
from tasks.py import do_a, do_b
print(flow.tasks) # returns {<Parameter some_param>, <Task do_a>, <Task do_b>}
self.assertTrue(do_a in flow.tasks) #returns True, I seem to have imported the same task instance then?
self.assertTrue(do_b in flow.tasks) # returns True
self.assertTrue(Parameter('some_param') in flow.tasks) # Returns false since it's definitely not the same instance.
Kevin Kho
from prefect import task
@task
def do_a(param_a):
return param_a + 1
@task
def do_b(a_result):
return 2
flow_a.py
from prefect import Flow, Parameter
from prefect.core import parameter
from finastasks import do_a, do_b
flow = Flow('flow a')
param_a = Parameter("param_a")
flow.set_dependencies(
upstream_tasks=[param_a],
task=do_a,
keyword_tasks=dict(
param_a=param_a
)
)
flow.set_dependencies(
upstream_tasks=[do_a],
task=do_b,
keyword_tasks=dict(
a_result=do_a
)
)
finastest.py
from prefect.core.task import Parameter
from flow_a import flow, param_a
from finastasks import do_a, do_b
print(flow.tasks) # returns {<Parameter some_param>, <Task do_a>, <Task do_b>}
assert do_a in flow.tasks
assert do_b in flow.tasks
assert param_a in flow.tasks
I think the question here is why you don’t need to create references to do_a
and do_b
, but you need to do it for param_a
. I’ve been looking into it a bit. I have a clue, but not a concrete answer. But at the very least, I do have a consistent mental model I think.
First, do_a
and do_b
are actually classes. Wrapping them with the @task
decorator return a FunctionTask
class which inherits Task
. Check the following behavior for Task
from prefect import Task
class RunMeFirst(Task):
def run(self):
print("I'm running first!")
assert RunMeFirst == RunMeFirst
assert RunMeFirst().run() == RunMeFirst().run()
assert RunMeFirst() == RunMeFirst()
The first assert will work because you are comparing the Classes. The second assert will work because you are comparing run outputs, but the third assert with fail. When you do RunMeFirst()
, we call the ___init___
method, so something after the init causes it to fail.
So back to Parameters, it’s because their definition inherent calls the ___init___
method. Parameter("test")
already instantiates it because you can’t even do:
assert Parameter("test") == Parameter("test")
this will fail. This is why you need to pass by reference for the Parameter.
What is it in Task init that breaks these? I don’t know yet. Maybe @emre does haha.Kevin Kho
emre
10/12/2021, 7:22 AMassert RunMeFirst() == RunMeFirst()
is intended to fail, that is how python compares class instances by default. This creates two different instances and ask if they are the same instance.
@task
version is essentially task_inst = RunMeFirst()
and then assert task_inst == task_inst
.
I think it makes sense that assert RunMeFirst("task_name") == RunMeFirst("task_name")
fails. You can multiple tasks that function the same thing in a single flow, and have them do different things depending on different runtime inputs, and even with different upstream and downstream dependencies.
Parameter
currently follows the same logic. Maybe it should default to name based checks, because for the case of Parameter
, the name defines its job. Maybe Flow
could have a convenience function has_param(name: str)
to ease testing.Fina Silva-Santisteban
10/12/2021, 9:55 AMhas_param(name: str)
! Thank so much for your time and help everyone! 🙏