Gustavo de Paula
08/04/2021, 2:31 PMKevin Kho
Gustavo de Paula
08/04/2021, 2:48 PMKevin Kho
Gustavo de Paula
08/04/2021, 3:05 PMGustavo de Paula
08/04/2021, 3:11 PMclass My_Flow(prefect.Flow):
def __init__(self, name, schedule=None, passwords: List[str] = None, **kwargs):
if isinstance(schedule, My_Schedule):
schedule = schedule.return_prefect_schedule()
self.passwords = {}
if passwords is not None:
for key in passwords:
self.passwords[key] = None
super(My_Flow, self).__init__(name, schedule, **kwargs)
def run(self, local: bool = True, **kwargs):
# setup passwords
if local:
for key in self.passwords:
self.passwords[key] = getpass(prompt='Value of "'+key+'": ')
else:
for key in self.passwords:
self.passwords[key] = _getpassapi(key)
# setup logger
log_path = f"{os.environ.get('FLOW_PATH', './')}/logfile"
log_format = "%(asctime)-15s %(levelname)-8s %(message)s"
logging.basicConfig(level=<http://logging.INFO|logging.INFO>, filename=log_path, filemode="a+", format=log_format)
root = logging.getLogger()
root.setLevel(<http://logging.INFO|logging.INFO>)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter(log_format)
handler.setFormatter(formatter)
root.addHandler(handler)
self.logger = logging
with prefect.context(passwords=self.passwords, local=local):
return super(My_Flow, self).run(**kwargs)
def add_task(self, task: prefect.core.task.Task) -> prefect.core.task.Task:
task = super(My_Flow, self).add_task(task)
task.flow = self
return task
Kevin Kho
from prefect import Flow
def CustomFlow(*args, **kwargs):
kwargs.setdefault("state_handler": my_state_handler)
return Flow(*args, **kwargs)
Why do you need add_task
? Are you using the imperative API? The functional API would make this easier as well.