Tomek Florek
01/20/2022, 5:13 PM@task
like _trigger=all_successful , log_stdout=True, max_retries_ etc. How can I set it up for these tasks? Would maybe using functions with decorators be advised as best practice here instead?Anna Geller
extract_task = FromRdb(key="", query="", trigger=all_successful , log_stdout=True)
with Flow("name") as flow:
extract_task()
Kevin Kho
**kwargs
in init
and pass the **kwargs
to the super().__ _init___
call. These pass it to the base Task
classAnna Geller
from prefect import task, Task, Flow
from prefect.triggers import all_finished
@task
def fail_this():
raise ValueError("Bad value")
@task # functional
def say_hello(person: str) -> None:
print("Hello, {}!".format(person))
class AddTask(Task): # imperative
def __init__(self, default: int, *args, **kwargs):
super().__init__(*args, **kwargs)
self.default = default
def run(self, x: int, y: int = None) -> int:
if y is None:
y = self.default
return x + y
# initialize the task instance
add = AddTask(default=1, trigger=all_finished) # imperative
with Flow("My first flow!") as flow: # functional
failing = fail_this()
first_result = add(1, y=2, upstream_tasks=[failing])
second_result = add(x=first_result, y=100)
if __name__ == "__main__":
flow.run()
Tomek Florek
01/21/2022, 4:54 PMKevin Kho
@task
def say_hello(person: str) -> None:
print("Hello, {}!".format(person))
say_hello.max_retries = ...
say_hello.state_handlers = ...
Anna Geller
DeploymentSpec
configuration. But if you mean retries and caching, then you could write your own wrapper around task to reuse the same configuration across tasksTomek Florek
01/24/2022, 5:10 PMin file task_library.py:
@task
def say_hello(person: str) -> None:
print("Hello, {}!".format(person))
in file flow.py:
from task_library import say_hello
say_hello("Prefect").
This fails with TypeError: 'module' object is not callable
on the import, as long as there’s a decorator somewhere in the imported module. Is that expected?Kevin Kho
flow.py
might cause some issues because of naming resolution?Tomek Florek
01/24/2022, 5:15 PMget_charge_points_from_asset.py
from prefect.core import task
@task
def print_me():
print("Hello")
and then run import test
in python shell in the same dir.Kevin Kho
Tomek Florek
01/24/2022, 6:05 PMprefect.core
instead of prefect