https://prefect.io logo
#prefect-community
Title
# prefect-community
j

Jacob Blanco

08/13/2020, 1:59 AM
Hey folks, we've been putting together a bunch of custom Tasks for internal use. Lets say one of those is called DoSomething. I'm wondering what the correct approach is for dealing with Task parameters. Providing the task with a Parameter in the flow fails because the task doesn't know what to do with it, and calling
run
on the parameter in the flow definition is clearly wrong. The Parameter instance/s are passed in as a list since the users of the Task need to be able to pass an arbitrary number of unnamed parameters. Are there any examples of unpacking the parameters in an efficient way? So far I have
Copy code
params = [param.run() if isinstance(param, Parameter) else param for param in params]
I've also tried running the Parameter through a
@task
function but when I pass the results of that task to DoSomething it complains that FunctionalTask is not compatible.
c

Chris White

08/13/2020, 4:22 AM
Hi @Jacob Blanco - I’m having trouble following the issue you’re describing; can you produce a self-contained code snippet that highlights what you’ve tried and the error you get?
j

Jacob Blanco

08/13/2020, 4:24 AM
Copy code
from InternalTasks import DoSomething

with Flow("my_flow") as flow:
    DoSomething([Parameter("my_parameter"), "HardcodedParameter"])
Something like the above @Chris White I tried wrapping the parameter list creation as a FunctionalTask but I get an error from inside DoSomething that said that FunctionalTask does not have length.
c

Chris White

08/13/2020, 4:28 AM
Ah! OK I see. So when you are using the class-based approach, the first call of the class initializes the task as a task object. The second call is the one that is actually deferred and a part of the Flow’s task graph.
Copy code
from prefect import Task, Flow, Parameter

name = Parameter("name", default="p") # initialization step

with Flow("bad flow") as flow:
    t = Task(name=p) # initializing Task, will fail
vs.
Copy code
name = Parameter("name", default="p") # initialization step
t = Task(name="p") # also an initialization step, happens imemdiately

with Flow("bad flow") as flow:
    t(upstream_tasks=[p]) # calling here sets a deferred dependency
Copy code
class MyTask(Task):
    def __init__(self, **kwargs):
        # this method always has to run immediately, at build time
        super().__init__(**kwargs)

    def run(self, **kwargs):
        # this method is the one that is deferred and only run at flow-run time
j

Jacob Blanco

08/13/2020, 4:31 AM
Just confirming one thing, you seem to mix
name
and
p
to mean the Parameter variable there, is that right? So the correct approach is to initialize the class-based Task outside of the flow definition, then call the instance of the Task as a function passing in the parameter?
Copy code
from InternalTasks import DoSomething

do_something = DoSomething()
p = Parameter("my_parameter")

with Flow("my_flow") as flow:
    do_something([p, "HardcodedParameter"])
Assuming the parameters are taken in from the
run
function in the DoSomething class
c

Chris White

08/13/2020, 4:33 AM
Yup! exactly
@task
actually initializes a task class under the hood for you, passing in any
**kwargs
to the initialization step of the task. That’s one of the reasons the two APIs can feel quite different
j

Jacob Blanco

08/13/2020, 4:36 AM
OK makes sense. How does this impact passing the results of an @task wrapped step into a class-based Task?
c

Chris White

08/13/2020, 4:37 AM
so you can only pass the results of an
@task
step to an already initialized task.
Copy code
@task
def my_task():
    pass

# within a Flow context
Task(my_task) # bad

t = Task()

# within a Flow context
t(my_task) # good
j

Jacob Blanco

08/13/2020, 4:38 AM
Gotcha and following that structure as the Task "subclass" developer I don't need to do anything to "unpack" the results my_task?
c

Chris White

08/13/2020, 4:38 AM
nope, Prefect takes care of making sure the raw return value is passed to the next task
j

Jacob Blanco

08/13/2020, 4:38 AM
Awesome-sauce thank you
c

Chris White

08/13/2020, 4:39 AM
👍 anytime, glad I could help
j

Jacob Blanco

08/13/2020, 4:41 AM
@Arsenii This is the solution my great comrade! ^
👀 1
@Chris White Related to this, I am looking at the Tasks library for inspiration. In one of the Postgres task's
run
method you use
self.host
instead of host, doesn't that mean you will never be able to override it at run time? If I get it right the default_from_attrs decorator replaces the default of the arguments in the function signature for run the attributes of the class set at initialisation, right? https://github.com/PrefectHQ/prefect/blob/54f6ade4d41c91fa6d0a20382331c0100d01b94f/src/prefect/tasks/postgres/postgres.py#L76
c

Chris White

08/13/2020, 5:13 AM
Yea that’s true for that task; we could easily tweak that task to accept a
host
at runtime though if you need it!
j

Jacob Blanco

08/13/2020, 5:13 AM
Oh damn, I'm silly I didn't look at the function signature...you never take
host
in when calling run, nevermind.
👍 1
c

Chris White

08/13/2020, 4:28 PM
@Marvin archive “Help passing parameters to custom task class”