Thread
#prefect-community
    Rob Fowler

    Rob Fowler

    1 year ago
    Is there a way to use a simple function in a flow, for example when I am going to call the function many times. I can easily make it work by making my function a 'task', but I can't see why it should have all the scheduling overhead:
    @task
    def get_host_from_kv(opts):
        hid = Schema({
            'host': str,
            Optional('identity', default=0): int
        }).validate(opts.kvopts)
        return HostParam(identity=hid['identity'], host=hid['host'])
    
    
    with Flow("Query a single host") as flow:
        opts = Parameter('opts')
    
        result = winrm_powershell(host=get_host_from_kv(opts),
                                  command=load_script(opts, 'win_ps_version.ps1'),
                                  opts=opts)
    I don't really need get_host_from_kv to be a task.
    The task can run locally.
    nicholas

    nicholas

    1 year ago
    Hm, you could handle this in a few ways: ā€¢ you could package that method and treat it as you would other dependencies ā€¢ build it into an extendable task class, which you can further extend downstream, something like this:
    from prefect import Task
    
    class KVHost(Task):
      def get_host_from_kv(self, opts):
        # your method logic here
    
      def run(self):
        super(KVHost).run()
    
    class AnotherTask(KVHost):
      def run(self):
        host = self.get_host_from_kv()
        # do something with host
        super(AnotherTask).run()
    (i'm sure there might be some other solutions, those are just the first two that come to mind)
    Rob Fowler

    Rob Fowler

    1 year ago
    Thanks for the hints @nicholas That's quite interesting.
    as much as I rarely approve of multiple inheritance, I could also package my utilities into a second class that I also derive from that can be shared with tasks.
    nicholas

    nicholas

    1 year ago
    For sure - to me this feels like less of a Prefect issue and more of a code organization/modularization issue; I think personal/org preference and what level of observability you need will largely influence how you want to handle something like this.
    Rob Fowler

    Rob Fowler

    1 year ago
    It's more of a prefect use as prefect is building the flow when it loads the file.
    not running it obviously
    I can't get this to work, I can't get it into a flow. Do you know of any examples of using a 'class XX(Task):' with a Parameter passed in a flow in the documentation? There is only that one incomplete example in the Task section. something like this
    class OptedTask(prefect.Task):
        def __init__(self, opts, **kwargs):
            self.opts = opts
            super().__init__(kwargs)
    
        def get_host_from_kv(self, opts):
            return opts.kwopts.host
    
    
    class Useful(OptedTask):
        def run(self):
            host = self.get_host_from_kv(self.opts)
            return super().run(host)
    
    
    
    with Flow("Query a single host") as flow:
        opts = Parameter('opts')
    
        apt = Useful(opts())
        result = apt()
    nicholas

    nicholas

    1 year ago
    @Rob Fowler you're missing a run method on the
    OptedTask
    , which means
    Useful
    is calling the base
    Task
    class run method which doesn't do anything in this case because that's where task logic goes. Try something like this instead:
    import prefect
    from prefect import Flow, Task, Parameter
    
    
    class OptedTask(Task):
        def __init__(self, opts=None, **kwargs):
            # This lets you instantitate opts at build time
            self.opts = opts
            super().__init__(kwargs)
    
        def get_host_from_kv(self):
            return self.opts["kwopts"]["host"]
    
    
    class Useful(OptedTask):
        def run(self, opts):
            # This lets you instantiate opts at runtime
            self.opts = opts
            host = self.get_host_from_kv()
            return host
    
    
    with Flow("Query a single host") as flow:
        opts_param = Parameter("opts", default={"kwopts": {"host": "host"}})
    
        apt = Useful()
    
        result = apt(opts=opts_param)
    
    flow.run()
    Rob Fowler

    Rob Fowler

    1 year ago
    Awesome, thanks so much for this. I'll return the effort with some sort of license. You don't have a great use case of license for us. We are a cloud provider, I am using prefect to orchestrate the construction of custom compute environments for mid end and government customers that don't have the expertise to run their own infrastructure on prem or in the cloud.
    so 'self.opts = opts ' is so I could omit the opts if I use the same task multiple times in the flow?
    nicholas

    nicholas

    1 year ago
    @Rob Fowler I'd be happy to put you in touch with one of our account managers - they can work with you on a license that makes sense for your work and scale.
    And that's one reason; if you
    init
    with
    self.opts
    you can ostensibly use the class without passing
    opts
    again, but the most valuable use is the availability of
    self.opts
    in other class methods without having to pass around the variable as an argument.
    Rob Fowler

    Rob Fowler

    1 year ago
    Yes, I am using the self.opts from the constructor,
    from prefect import Task
    
    class OptTask(Task):
        def __init__(self,  opts, **kwargs):
            self.opts = opts
            super().__init__(kwargs)
    and deriving all my local tasks from that.
    nicholas

    nicholas

    1 year ago
    @Rob Fowler would you like me to put you in touch with one of our account managers as I mentioned above?
    Rob Fowler

    Rob Fowler

    1 year ago
    not at this stage.
    nicholas

    nicholas

    1 year ago
    Ok no worries! šŸ™‚