Tomek Florek

    Tomek Florek

    8 months ago
    Hi again. I am using custom tasks defined as an extension of Task class (see extract.py) and would like to take advantage of the settings available for
    @task
    like trigger=all_successful , log_stdout=True, max_retries etc. How can I set it up for these tasks? Would maybe using functions with decorators be advised as best practice here instead?
    Anna Geller

    Anna Geller

    8 months ago
    You can pass those arguments when you initialize the class in your flow e.g.:
    extract_task = FromRdb(key="", query="", trigger=all_successful , log_stdout=True)
    
    with Flow("name") as flow:
        extract_task()
    The reason why you can do that is that your custom task class inherits those arguments from the Task class.
    You were also asking about best practices: we recommend using the @task decorator when possible because this is the supported syntax going forward in Orion.
    Kevin Kho

    Kevin Kho

    8 months ago
    You need to accept
    **kwargs
    in
    init
    and pass the
    **kwargs
    to the
    super().__ _init___
    call. These pass it to the base
    Task
    class
    Anna Geller

    Anna Geller

    8 months ago
    ups, Kevin is right, I missed that one. @Tomek Florek if you need an example to test it, here is one:
    from prefect import task, Task, Flow
    from prefect.triggers import all_finished
    
    
    @task
    def fail_this():
        raise ValueError("Bad value")
    
    
    @task  # functional
    def say_hello(person: str) -> None:
        print("Hello, {}!".format(person))
    
    
    class AddTask(Task):  # imperative
        def __init__(self, default: int, *args, **kwargs):
            super().__init__(*args, **kwargs)
            self.default = default
    
        def run(self, x: int, y: int = None) -> int:
            if y is None:
                y = self.default
            return x + y
    
    
    # initialize the task instance
    add = AddTask(default=1, trigger=all_finished)  # imperative
    
    with Flow("My first flow!") as flow:  # functional
        failing = fail_this()
        first_result = add(1, y=2, upstream_tasks=[failing])
        second_result = add(x=first_result, y=100)
    
    
    if __name__ == "__main__":
        flow.run()
    Tomek Florek

    Tomek Florek

    8 months ago
    Thanks a lot @Anna Geller & @Kevin Kho, that’s really helpful! 🙏 Seeing that Orion will eventually replace the current Cloud offering, is there a way to overwrite settings of tasks defined with a decorator? In a scenario where I have a reusable set of tasks/functions in some module, but depending on the flow they’re used in the settings should be different.
    Kevin Kho

    Kevin Kho

    8 months ago
    Yes! I think this should work
    @task  
    def say_hello(person: str) -> None:
        print("Hello, {}!".format(person))
    
    say_hello.max_retries = ...
    say_hello.state_handlers = ...
    Anna Geller

    Anna Geller

    8 months ago
    I think it very much depends on what do you mean when you say settings. If you mean e.g. default parameter values and schedules, then you could write a function that would provide a default
    DeploymentSpec
    configuration. But if you mean retries and caching, then you could write your own wrapper around task to reuse the same configuration across tasks
    Tomek Florek

    Tomek Florek

    8 months ago
    Thank you both! My intention was exactly what Kevin posted. I went ahead implementing it, but have a little issue. I’m trying to use the defined task in another module, like:
    in file task_library.py:
    
    @task
    def say_hello(person: str) -> None:
        print("Hello, {}!".format(person))
    
    in file flow.py:
    from task_library import say_hello
    
    say_hello("Prefect").
    This fails with
    TypeError: 'module' object is not callable
    on the import, as long as there’s a decorator somewhere in the imported module. Is that expected?
    Kevin Kho

    Kevin Kho

    8 months ago
    This is not expected. Some Prefect tasks are written and imported this way
    I think naming your file
    flow.py
    might cause some issues because of naming resolution?
    Tomek Florek

    Tomek Florek

    8 months ago
    that was just for the example, it actually has a different name, like
    get_charge_points_from_asset.py
    🧐 any thoughts what could it be? It’s failing the same way even in a simple scenario, I create test.py:
    from prefect.core import task
    
    @task
    def print_me():
        print("Hello")
    and then run
    import test
    in python shell in the same dir.
    I’m sorry for the confusion, it was pycharms smart suggestion to import the wrong task.
    Kevin Kho

    Kevin Kho

    8 months ago
    Oh ok thanks for mentioning. I was thinking but was a bit stumped.
    Tomek Florek

    Tomek Florek

    8 months ago
    Thank you. I was importing it from
    prefect.core
    instead of
    prefect