Jacob Blanco
08/13/2020, 1:59 AMrun
on the parameter in the flow definition is clearly wrong.
The Parameter instance/s are passed in as a list since the users of the Task need to be able to pass an arbitrary number of unnamed parameters. Are there any examples of unpacking the parameters in an efficient way?
So far I have
params = [param.run() if isinstance(param, Parameter) else param for param in params]
I've also tried running the Parameter through a @task
function but when I pass the results of that task to DoSomething it complains that FunctionalTask is not compatible.Chris White
Jacob Blanco
08/13/2020, 4:24 AMfrom InternalTasks import DoSomething
with Flow("my_flow") as flow:
DoSomething([Parameter("my_parameter"), "HardcodedParameter"])
Chris White
from prefect import Task, Flow, Parameter
name = Parameter("name", default="p") # initialization step
with Flow("bad flow") as flow:
t = Task(name=p) # initializing Task, will fail
vs.
name = Parameter("name", default="p") # initialization step
t = Task(name="p") # also an initialization step, happens imemdiately
with Flow("bad flow") as flow:
t(upstream_tasks=[p]) # calling here sets a deferred dependency
class MyTask(Task):
def __init__(self, **kwargs):
# this method always has to run immediately, at build time
super().__init__(**kwargs)
def run(self, **kwargs):
# this method is the one that is deferred and only run at flow-run time
Jacob Blanco
08/13/2020, 4:31 AMname
and p
to mean the Parameter variable there, is that right?
So the correct approach is to initialize the class-based Task outside of the flow definition, then call the instance of the Task as a function passing in the parameter?from InternalTasks import DoSomething
do_something = DoSomething()
p = Parameter("my_parameter")
with Flow("my_flow") as flow:
do_something([p, "HardcodedParameter"])
Assuming the parameters are taken in from the run
function in the DoSomething classChris White
@task
actually initializes a task class under the hood for you, passing in any **kwargs
to the initialization step of the task. That’s one of the reasons the two APIs can feel quite differentJacob Blanco
08/13/2020, 4:36 AMChris White
@task
step to an already initialized task.
@task
def my_task():
pass
# within a Flow context
Task(my_task) # bad
t = Task()
# within a Flow context
t(my_task) # good
Jacob Blanco
08/13/2020, 4:38 AMChris White
Jacob Blanco
08/13/2020, 4:38 AMChris White
Jacob Blanco
08/13/2020, 4:41 AMrun
method you use self.host
instead of host, doesn't that mean you will never be able to override it at run time? If I get it right the default_from_attrs decorator replaces the default of the arguments in the function signature for run the attributes of the class set at initialisation, right?
https://github.com/PrefectHQ/prefect/blob/54f6ade4d41c91fa6d0a20382331c0100d01b94f/src/prefect/tasks/postgres/postgres.py#L76Chris White
host
at runtime though if you need it!Jacob Blanco
08/13/2020, 5:13 AMhost
in when calling run, nevermind.Chris White
Marvin
08/13/2020, 4:28 PM