Jacob Blanco
08/13/2020, 1:59 AMrun on the parameter in the flow definition is clearly wrong.
The Parameter instance/s are passed in as a list since the users of the Task need to be able to pass an arbitrary number of unnamed parameters. Are there any examples of unpacking the parameters in an efficient way?
So far I have
params = [param.run() if isinstance(param, Parameter) else param for param in params]
I've also tried running the Parameter through a @task function but when I pass the results of that task to DoSomething it complains that FunctionalTask is not compatible.Chris White
Jacob Blanco
08/13/2020, 4:24 AMfrom InternalTasks import DoSomething
with Flow("my_flow") as flow:
DoSomething([Parameter("my_parameter"), "HardcodedParameter"])Jacob Blanco
08/13/2020, 4:25 AMChris White
from prefect import Task, Flow, Parameter
name = Parameter("name", default="p") # initialization step
with Flow("bad flow") as flow:
t = Task(name=p) # initializing Task, will fail
vs.
name = Parameter("name", default="p") # initialization step
t = Task(name="p") # also an initialization step, happens imemdiately
with Flow("bad flow") as flow:
t(upstream_tasks=[p]) # calling here sets a deferred dependencyChris White
class MyTask(Task):
def __init__(self, **kwargs):
# this method always has to run immediately, at build time
super().__init__(**kwargs)
def run(self, **kwargs):
# this method is the one that is deferred and only run at flow-run timeJacob Blanco
08/13/2020, 4:31 AMname and p to mean the Parameter variable there, is that right?
So the correct approach is to initialize the class-based Task outside of the flow definition, then call the instance of the Task as a function passing in the parameter?Jacob Blanco
08/13/2020, 4:33 AMfrom InternalTasks import DoSomething
do_something = DoSomething()
p = Parameter("my_parameter")
with Flow("my_flow") as flow:
do_something([p, "HardcodedParameter"])
Assuming the parameters are taken in from the run function in the DoSomething classChris White
Chris White
@task actually initializes a task class under the hood for you, passing in any **kwargs to the initialization step of the task. That’s one of the reasons the two APIs can feel quite differentJacob Blanco
08/13/2020, 4:36 AMChris White
@task step to an already initialized task.
@task
def my_task():
pass
# within a Flow context
Task(my_task) # bad
t = Task()
# within a Flow context
t(my_task) # goodJacob Blanco
08/13/2020, 4:38 AMChris White
Jacob Blanco
08/13/2020, 4:38 AMChris White
Jacob Blanco
08/13/2020, 4:41 AMJacob Blanco
08/13/2020, 5:11 AMrun method you use self.host instead of host, doesn't that mean you will never be able to override it at run time? If I get it right the default_from_attrs decorator replaces the default of the arguments in the function signature for run the attributes of the class set at initialisation, right?
https://github.com/PrefectHQ/prefect/blob/54f6ade4d41c91fa6d0a20382331c0100d01b94f/src/prefect/tasks/postgres/postgres.py#L76Chris White
host at runtime though if you need it!Jacob Blanco
08/13/2020, 5:13 AMhost in when calling run, nevermind.Chris White
Marvin
08/13/2020, 4:28 PM