https://prefect.io logo
r

Rob Fowler

11/05/2020, 10:25 PM
Is there a way to use a simple function in a flow, for example when I am going to call the function many times. I can easily make it work by making my function a 'task', but I can't see why it should have all the scheduling overhead:
Copy code
@task
def get_host_from_kv(opts):
    hid = Schema({
        'host': str,
        Optional('identity', default=0): int
    }).validate(opts.kvopts)
    return HostParam(identity=hid['identity'], host=hid['host'])


with Flow("Query a single host") as flow:
    opts = Parameter('opts')

    result = winrm_powershell(host=get_host_from_kv(opts),
                              command=load_script(opts, 'win_ps_version.ps1'),
                              opts=opts)
I don't really need get_host_from_kv to be a task.
The task can run locally.
n

nicholas

11/05/2020, 11:02 PM
Hm, you could handle this in a few ways: ā€¢ you could package that method and treat it as you would other dependencies ā€¢ build it into an extendable task class, which you can further extend downstream, something like this:
Copy code
from prefect import Task

class KVHost(Task):
  def get_host_from_kv(self, opts):
    # your method logic here

  def run(self):
    super(KVHost).run()

class AnotherTask(KVHost):
  def run(self):
    host = self.get_host_from_kv()
    # do something with host
    super(AnotherTask).run()
(i'm sure there might be some other solutions, those are just the first two that come to mind)
r

Rob Fowler

11/05/2020, 11:34 PM
Thanks for the hints @nicholas That's quite interesting.
as much as I rarely approve of multiple inheritance, I could also package my utilities into a second class that I also derive from that can be shared with tasks.
n

nicholas

11/05/2020, 11:38 PM
For sure - to me this feels like less of a Prefect issue and more of a code organization/modularization issue; I think personal/org preference and what level of observability you need will largely influence how you want to handle something like this.
r

Rob Fowler

11/06/2020, 3:49 AM
It's more of a prefect use as prefect is building the flow when it loads the file.
šŸ‘ 1
not running it obviously
I can't get this to work, I can't get it into a flow. Do you know of any examples of using a 'class XX(Task):' with a Parameter passed in a flow in the documentation? There is only that one incomplete example in the Task section. something like this
Copy code
class OptedTask(prefect.Task):
    def __init__(self, opts, **kwargs):
        self.opts = opts
        super().__init__(kwargs)

    def get_host_from_kv(self, opts):
        return opts.kwopts.host


class Useful(OptedTask):
    def run(self):
        host = self.get_host_from_kv(self.opts)
        return super().run(host)



with Flow("Query a single host") as flow:
    opts = Parameter('opts')

    apt = Useful(opts())
    result = apt()
n

nicholas

11/06/2020, 2:53 PM
@Rob Fowler you're missing a run method on the
OptedTask
, which means
Useful
is calling the base
Task
class run method which doesn't do anything in this case because that's where task logic goes. Try something like this instead:
Copy code
import prefect
from prefect import Flow, Task, Parameter


class OptedTask(Task):
    def __init__(self, opts=None, **kwargs):
        # This lets you instantitate opts at build time
        self.opts = opts
        super().__init__(kwargs)

    def get_host_from_kv(self):
        return self.opts["kwopts"]["host"]


class Useful(OptedTask):
    def run(self, opts):
        # This lets you instantiate opts at runtime
        self.opts = opts
        host = self.get_host_from_kv()
        return host


with Flow("Query a single host") as flow:
    opts_param = Parameter("opts", default={"kwopts": {"host": "host"}})

    apt = Useful()

    result = apt(opts=opts_param)

flow.run()
r

Rob Fowler

11/06/2020, 10:46 PM
Awesome, thanks so much for this. I'll return the effort with some sort of license. You don't have a great use case of license for us. We are a cloud provider, I am using prefect to orchestrate the construction of custom compute environments for mid end and government customers that don't have the expertise to run their own infrastructure on prem or in the cloud.
so 'self.opts = opts ' is so I could omit the opts if I use the same task multiple times in the flow?
n

nicholas

11/09/2020, 4:20 PM
@Rob Fowler I'd be happy to put you in touch with one of our account managers - they can work with you on a license that makes sense for your work and scale.
And that's one reason; if you
init
with
self.opts
you can ostensibly use the class without passing
opts
again, but the most valuable use is the availability of
self.opts
in other class methods without having to pass around the variable as an argument.
r

Rob Fowler

11/09/2020, 10:02 PM
Yes, I am using the self.opts from the constructor,
Copy code
from prefect import Task

class OptTask(Task):
    def __init__(self,  opts, **kwargs):
        self.opts = opts
        super().__init__(kwargs)
and deriving all my local tasks from that.
šŸ‘ 1
n

nicholas

11/10/2020, 3:11 PM
@Rob Fowler would you like me to put you in touch with one of our account managers as I mentioned above?
r

Rob Fowler

11/10/2020, 10:09 PM
not at this stage.
n

nicholas

11/11/2020, 1:19 AM
Ok no worries! šŸ™‚