Hi again. I am using custom tasks defined as an ex...
# prefect-community
Hi again. I am using custom tasks defined as an extension of Task class (see extract.py) and would like to take advantage of the settings available for
like _trigger=all_successful , log_stdout=True, max_retries_ etc. How can I set it up for these tasks? Would maybe using functions with decorators be advised as best practice here instead?
You can pass those arguments when you initialize the class in your flow e.g.:
Copy code
extract_task = FromRdb(key="", query="", trigger=all_successful , log_stdout=True)

with Flow("name") as flow:
The reason why you can do that is that your custom task class inherits those arguments from the Task class.
You were also asking about best practices: we recommend using the @task decorator when possible because this is the supported syntax going forward in Orion.
You need to accept
and pass the
to the
super().__ _init___
call. These pass it to the base
ups, Kevin is right, I missed that one. @Tomek Florek if you need an example to test it, here is one:
Copy code
from prefect import task, Task, Flow
from prefect.triggers import all_finished

def fail_this():
    raise ValueError("Bad value")

@task  # functional
def say_hello(person: str) -> None:
    print("Hello, {}!".format(person))

class AddTask(Task):  # imperative
    def __init__(self, default: int, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.default = default

    def run(self, x: int, y: int = None) -> int:
        if y is None:
            y = self.default
        return x + y

# initialize the task instance
add = AddTask(default=1, trigger=all_finished)  # imperative

with Flow("My first flow!") as flow:  # functional
    failing = fail_this()
    first_result = add(1, y=2, upstream_tasks=[failing])
    second_result = add(x=first_result, y=100)

if __name__ == "__main__":
Thanks a lot @Anna Geller & @Kevin Kho, that’s really helpful! 🙏 Seeing that Orion will eventually replace the current Cloud offering, is there a way to overwrite settings of tasks defined with a decorator? In a scenario where I have a reusable set of tasks/functions in some module, but depending on the flow they’re used in the settings should be different.
Yes! I think this should work
Copy code
def say_hello(person: str) -> None:
    print("Hello, {}!".format(person))

say_hello.max_retries = ...
say_hello.state_handlers = ...
🙏 1
I think it very much depends on what do you mean when you say settings. If you mean e.g. default parameter values and schedules, then you could write a function that would provide a default
configuration. But if you mean retries and caching, then you could write your own wrapper around task to reuse the same configuration across tasks
Thank you both! My intention was exactly what Kevin posted. I went ahead implementing it, but have a little issue. I’m trying to use the defined task in another module, like:
Copy code
in file task_library.py:

def say_hello(person: str) -> None:
    print("Hello, {}!".format(person))

in file flow.py:
from task_library import say_hello

This fails with
TypeError: 'module' object is not callable
on the import, as long as there’s a decorator somewhere in the imported module. Is that expected?
This is not expected. Some Prefect tasks are written and imported this way
I think naming your file
might cause some issues because of naming resolution?
that was just for the example, it actually has a different name, like
🧐 any thoughts what could it be? It’s failing the same way even in a simple scenario, I create test.py:
Copy code
from prefect.core import task

def print_me():
and then run
import test
in python shell in the same dir.
🤦‍♂️ 1
I’m sorry for the confusion, it was pycharms smart suggestion to import the wrong task.
Oh ok thanks for mentioning. I was thinking but was a bit stumped.
Thank you. I was importing it from
instead of