Hi again. I am using custom tasks defined as an ex...
# prefect-community
t
Hi again. I am using custom tasks defined as an extension of Task class (see extract.py) and would like to take advantage of the settings available for
@task
like _trigger=all_successful , log_stdout=True, max_retries_ etc. How can I set it up for these tasks? Would maybe using functions with decorators be advised as best practice here instead?
a
You can pass those arguments when you initialize the class in your flow e.g.:
Copy code
extract_task = FromRdb(key="", query="", trigger=all_successful , log_stdout=True)

with Flow("name") as flow:
    extract_task()
The reason why you can do that is that your custom task class inherits those arguments from the Task class.
You were also asking about best practices: we recommend using the @task decorator when possible because this is the supported syntax going forward in Orion.
k
You need to accept
**kwargs
in
init
and pass the
**kwargs
to the
super().__ _init___
call. These pass it to the base
Task
class
a
ups, Kevin is right, I missed that one. @Tomek Florek if you need an example to test it, here is one:
Copy code
from prefect import task, Task, Flow
from prefect.triggers import all_finished


@task
def fail_this():
    raise ValueError("Bad value")


@task  # functional
def say_hello(person: str) -> None:
    print("Hello, {}!".format(person))


class AddTask(Task):  # imperative
    def __init__(self, default: int, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.default = default

    def run(self, x: int, y: int = None) -> int:
        if y is None:
            y = self.default
        return x + y


# initialize the task instance
add = AddTask(default=1, trigger=all_finished)  # imperative

with Flow("My first flow!") as flow:  # functional
    failing = fail_this()
    first_result = add(1, y=2, upstream_tasks=[failing])
    second_result = add(x=first_result, y=100)


if __name__ == "__main__":
    flow.run()
t
Thanks a lot @Anna Geller & @Kevin Kho, that’s really helpful! 🙏 Seeing that Orion will eventually replace the current Cloud offering, is there a way to overwrite settings of tasks defined with a decorator? In a scenario where I have a reusable set of tasks/functions in some module, but depending on the flow they’re used in the settings should be different.
k
Yes! I think this should work
Copy code
@task  
def say_hello(person: str) -> None:
    print("Hello, {}!".format(person))

say_hello.max_retries = ...
say_hello.state_handlers = ...
🙏 1
a
I think it very much depends on what do you mean when you say settings. If you mean e.g. default parameter values and schedules, then you could write a function that would provide a default
DeploymentSpec
configuration. But if you mean retries and caching, then you could write your own wrapper around task to reuse the same configuration across tasks
t
Thank you both! My intention was exactly what Kevin posted. I went ahead implementing it, but have a little issue. I’m trying to use the defined task in another module, like:
Copy code
in file task_library.py:

@task
def say_hello(person: str) -> None:
    print("Hello, {}!".format(person))

in file flow.py:
from task_library import say_hello

say_hello("Prefect").
This fails with
TypeError: 'module' object is not callable
on the import, as long as there’s a decorator somewhere in the imported module. Is that expected?
k
This is not expected. Some Prefect tasks are written and imported this way
I think naming your file
flow.py
might cause some issues because of naming resolution?
t
that was just for the example, it actually has a different name, like
get_charge_points_from_asset.py
🧐 any thoughts what could it be? It’s failing the same way even in a simple scenario, I create test.py:
Copy code
from prefect.core import task

@task
def print_me():
    print("Hello")
and then run
import test
in python shell in the same dir.
🤦‍♂️ 1
I’m sorry for the confusion, it was pycharms smart suggestion to import the wrong task.
k
Oh ok thanks for mentioning. I was thinking but was a bit stumped.
t
Thank you. I was importing it from
prefect.core
instead of
prefect