Hi all! Is there a way I can run a flow with a cus...
# ask-community
g
Hi all! Is there a way I can run a flow with a custom CloudFlowRunner without having to modify the agent code?
k
Hey @Gustavo de Paula, what are you trying to do with that would need a custom FlowRunner?
g
Hi @Kevin Kho, I have custom Flow class that does some tasks when its run method is called. When I trigger the flow run through prefect cloud, this method doesn't get called . So one way I found to deal with it was to create this custom CloudFlowRunner and extend the run method to do those tasks.
k
Could you show me the code for the custom Flow class? That would be the better one to get working than going to the FlowRunner
g
Sure
Copy code
class My_Flow(prefect.Flow):
    def __init__(self, name, schedule=None, passwords: List[str] = None, **kwargs):

        if isinstance(schedule, My_Schedule):
            schedule = schedule.return_prefect_schedule()
        self.passwords = {}
        if passwords is not None:
            for key in passwords:
                self.passwords[key] = None
        super(My_Flow, self).__init__(name, schedule, **kwargs)

    def run(self, local: bool = True, **kwargs):
        # setup passwords
        if local:
            for key in self.passwords:
                self.passwords[key] = getpass(prompt='Value of "'+key+'": ')
        else:
            for key in self.passwords:
                self.passwords[key] = _getpassapi(key)

        # setup logger
        log_path = f"{os.environ.get('FLOW_PATH', './')}/logfile"
        log_format = "%(asctime)-15s %(levelname)-8s %(message)s"
        logging.basicConfig(level=<http://logging.INFO|logging.INFO>, filename=log_path, filemode="a+", format=log_format)
        root = logging.getLogger()
        root.setLevel(<http://logging.INFO|logging.INFO>)
        handler = logging.StreamHandler(sys.stdout)
        handler.setLevel(logging.DEBUG)
        formatter = logging.Formatter(log_format)
        handler.setFormatter(formatter)
        root.addHandler(handler)
        self.logger = logging
        with prefect.context(passwords=self.passwords, local=local):
            return super(My_Flow, self).run(**kwargs)

    def add_task(self, task: prefect.core.task.Task) -> prefect.core.task.Task:
        task = super(My_Flow, self).add_task(task)
        task.flow = self
        return task
k
I think something like this will be a lot easier to manage:
Copy code
from prefect import Flow
def CustomFlow(*args, **kwargs):
    kwargs.setdefault("state_handler": my_state_handler)
    return Flow(*args, **kwargs)
Why do you need
add_task
? Are you using the imperative API? The functional API would make this easier as well.